ORM Basics with SQLAlchemy

Object-Relational Mapping Using SQLAlchemy with PostgreSQL

Introduction to ORM and SQLAlchemy

Object-Relational Mapping (ORM) is a programming technique that converts data between incompatible type systems in object-oriented programming languages and relational databases. SQLAlchemy is the most powerful and flexible ORM library for Python.

Analogy: ORM as a Universal Translator

Think of an ORM like a universal translator in science fiction:

  • Your application speaks "Object-Oriented" - it thinks in terms of classes, attributes, methods, and inheritance
  • Your database speaks "Relational" - it understands tables, columns, rows, and foreign keys
  • The ORM serves as the translator between these two different "languages"
  • Without the translator, you'd have to manually convert your objects to SQL, then convert the SQL results back to objects
  • With the translator, you can work exclusively in your native "language" (Python objects), and the ORM handles all the translation behind the scenes

Just as a universal translator allows species from different planets to communicate seamlessly, an ORM lets your object-oriented code communicate with your relational database without having to understand the details of SQL.

Benefits of Using ORMs

Understanding SQLAlchemy's Architecture

graph TD SA[SQLAlchemy] --- Core[Core] SA --- ORM[ORM] Core --- E[Engine] Core --- SC[SQL Expressions] Core --- CON[Connection Pooling] Core --- SC2[Schema/Types] ORM --- M[Declarative Models] ORM --- S[Sessions] ORM --- Q[Query API] ORM --- R[Relationships] style SA fill:#336791,stroke:#333,stroke-width:2px,color:#fff style Core fill:#80b1d3,stroke:#333 style ORM fill:#8dd3c7,stroke:#333

SQLAlchemy consists of two main components:

This architecture gives SQLAlchemy its flexibility - you can use just the Core for direct SQL manipulation, just the ORM for object-relational mapping, or combine both as needed.

Setting Up SQLAlchemy with PostgreSQL

Installation

To get started with SQLAlchemy and PostgreSQL, you need to install:

Installing Required Packages:


# Install SQLAlchemy and PostgreSQL adapter
pip install sqlalchemy psycopg2-binary

# For development environments, using a virtual environment is recommended
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
pip install sqlalchemy psycopg2-binary
          

Creating an Engine

The SQLAlchemy Engine is the starting point for any SQLAlchemy application. It represents the core interface to the database, managing a connection or pool of connections.

Creating a SQLAlchemy Engine:


from sqlalchemy import create_engine

# Connection string format: postgresql://username:password@host:port/database
engine = create_engine('postgresql://pythonapp:secure_password@localhost:5432/python_app_db')

# Testing the connection
try:
    connection = engine.connect()
    print("Connection successful!")
    connection.close()
except Exception as e:
    print(f"Error connecting to PostgreSQL: {e}")
          

Engine with Configuration Options:


from sqlalchemy import create_engine
import os

# Get database credentials from environment variables for better security
db_user = os.environ.get('DB_USER', 'pythonapp')
db_password = os.environ.get('DB_PASSWORD', 'secure_password')
db_host = os.environ.get('DB_HOST', 'localhost')
db_port = os.environ.get('DB_PORT', '5432')
db_name = os.environ.get('DB_NAME', 'python_app_db')

# Create engine with configuration options
engine = create_engine(
    f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}',
    pool_size=5,               # Maximum number of connections in the pool
    max_overflow=10,           # Maximum number of connections to create beyond pool_size
    pool_timeout=30,           # Seconds to wait before giving up on getting a connection
    pool_recycle=1800,         # Recycle connections after 30 minutes (1800 seconds)
    echo=True                  # Print all executed SQL (useful for debugging)
)
          

Engine Configuration Best Practices

  • Use Environment Variables: Never hardcode database credentials in your code
  • Set pool_size: Match your database connection pool to your application's concurrency needs
  • Enable pool_recycle: Prevents issues with stale connections being terminated by database servers
  • Use echo=True in Development: Helpful for debugging, but turn off in production
  • Consider echo_pool=True: For debugging connection pool issues
  • Set connect_args for SSL: In production, use SSL for secure connections

Defining Models with SQLAlchemy ORM

SQLAlchemy models define the mapping between Python classes and database tables. The Declarative system is the most common way to define models.

Basic Model Definition

Creating a Base Class:


from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
import datetime

# Create an engine
engine = create_engine('postgresql://pythonapp:secure_password@localhost:5432/python_app_db')

# Create a base class for declarative models
Base = declarative_base()

# Define a User model
class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    password_hash = Column(String(128), nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)
    
    # Define a relationship to the Post model (defined later)
    posts = relationship("Post", back_populates="author")
    
    def __repr__(self):
        return f"<User(username='{self.username}', email='{self.email}')>"

# Define a Post model with a relationship to User
class Post(Base):
    __tablename__ = 'posts'
    
    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    content = Column(String, nullable=False)
    user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    is_published = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
    
    # Define the relationship back to the User model
    author = relationship("User", back_populates="posts")
    
    def __repr__(self):
        return f"<Post(title='{self.title}', author_id={self.user_id})>"

# Create the tables in the database
Base.metadata.create_all(engine)
          

Column Types in SQLAlchemy

SQLAlchemy provides a wide range of column types that map to native database types:

SQLAlchemy Type Python Type PostgreSQL Type Description
Integer int INTEGER Whole numbers
BigInteger int BIGINT Large whole numbers
Float float REAL Floating-point numbers
Numeric decimal.Decimal NUMERIC Fixed-precision numbers
String str VARCHAR Variable-length strings
Text str TEXT Unlimited-length text
Boolean bool BOOLEAN True/False values
Date datetime.date DATE Calendar date
Time datetime.time TIME Time of day
DateTime datetime.datetime TIMESTAMP Date and time
Interval datetime.timedelta INTERVAL Time periods
Enum enum.Enum ENUM or VARCHAR Enumerated values
ARRAY list ARRAY Array of values
JSON dict/list JSON JSON data
JSONB dict/list JSONB Binary JSON data
UUID uuid.UUID UUID Universally Unique Identifier

Model Relationships

SQLAlchemy provides powerful tools for defining relationships between models:

One-to-Many Relationship:


# One-to-Many: One User has many Posts
class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    # Other fields...
    
    # One User has many Posts
    posts = relationship("Post", back_populates="author")

class Post(Base):
    __tablename__ = 'posts'
    
    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    # Other fields...
    
    # Foreign key to User
    user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    
    # Each Post belongs to one User
    author = relationship("User", back_populates="posts")
          

Many-to-Many Relationship:


# Association table for Many-to-Many relationship
post_tag = Table('post_tag', Base.metadata,
    Column('post_id', Integer, ForeignKey('posts.id'), primary_key=True),
    Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True)
)

class Post(Base):
    __tablename__ = 'posts'
    
    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    # Other fields...
    
    # Many-to-Many: A Post can have many Tags
    tags = relationship("Tag", secondary=post_tag, back_populates="posts")

class Tag(Base):
    __tablename__ = 'tags'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True, nullable=False)
    
    # Many-to-Many: A Tag can belong to many Posts
    posts = relationship("Post", secondary=post_tag, back_populates="tags")
          

One-to-One Relationship:


class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    # Other fields...
    
    # One-to-One: One User has one Profile
    profile = relationship("Profile", uselist=False, back_populates="user")

class Profile(Base):
    __tablename__ = 'profiles'
    
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'), unique=True, nullable=False)
    bio = Column(Text)
    # Other fields...
    
    # One-to-One: One Profile belongs to one User
    user = relationship("User", back_populates="profile")
          

Adding Constraints and Indexes

Using Constraints and Indexes:


from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey, Boolean, Text
from sqlalchemy import UniqueConstraint, CheckConstraint, Index
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Product(Base):
    __tablename__ = 'products'
    
    id = Column(Integer, primary_key=True)
    sku = Column(String(20), nullable=False)
    name = Column(String(100), nullable=False)
    price = Column(Float, nullable=False)
    stock = Column(Integer, default=0)
    category_id = Column(Integer, ForeignKey('categories.id'))
    description = Column(Text)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)
    
    # Define a unique constraint on SKU
    __table_args__ = (
        UniqueConstraint('sku', name='uix_product_sku'),
        
        # Define a check constraint for price and stock
        CheckConstraint('price >= 0', name='chk_product_price_positive'),
        CheckConstraint('stock >= 0', name='chk_product_stock_non_negative'),
        
        # Define indexes for common queries
        Index('idx_product_category', 'category_id'),
        Index('idx_product_name', 'name'),
        Index('idx_product_active_category', 'is_active', 'category_id'),
    )
    
    def __repr__(self):
        return f"<Product(name='{self.name}', price={self.price})>"
          

Working with SQLAlchemy Sessions

The Session is the central element of SQLAlchemy's ORM mechanism. It serves as a "workspace" for your database operations, maintaining a collection of objects to be inserted, updated, or deleted.

Creating a Session

Setting Up a Session Factory:


from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# Create an engine
engine = create_engine('postgresql://pythonapp:secure_password@localhost:5432/python_app_db')

# Create a session factory
Session = sessionmaker(bind=engine)

# Create a session
session = Session()

# Use the session for database operations

# When done, close the session
session.close()
          

Using the Session with a Context Manager:


from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# Create an engine and session factory
engine = create_engine('postgresql://pythonapp:secure_password@localhost:5432/python_app_db')
Session = sessionmaker(bind=engine)

# Use the session in a context manager (automatically handles closing)
with Session() as session:
    # Use the session for database operations
    # ...
    
    # Changes are committed when the context exits without errors
    # or rolled back if an exception occurs
          

Basic CRUD Operations

Creating (Inserting) Objects:


from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Base, User, Post  # Import your models

engine = create_engine('postgresql://pythonapp:secure_password@localhost:5432/python_app_db')
Session = sessionmaker(bind=engine)

# Create a new user
with Session() as session:
    new_user = User(
        username='johndoe',
        email='john@example.com',
        password_hash='hashed_password_value'
    )
    
    # Add the user to the session
    session.add(new_user)
    
    # Commit the transaction
    session.commit()
    
    # After commit, the ID is available
    print(f"Created user with ID: {new_user.id}")

# Create multiple objects
with Session() as session:
    users = [
        User(username='janedoe', email='jane@example.com', password_hash='hashed_password_1'),
        User(username='bobsmith', email='bob@example.com', password_hash='hashed_password_2'),
        User(username='alicejones', email='alice@example.com', password_hash='hashed_password_3')
    ]
    
    # Add all users to the session
    session.add_all(users)
    
    # Commit the transaction
    session.commit()
    
    # Print the IDs of all users
    for user in users:
        print(f"Created user: {user.username} with ID: {user.id}")
          

Reading (Querying) Objects:


# Get a single user by primary key
with Session() as session:
    user = session.query(User).get(1)
    if user:
        print(f"Found user: {user.username}, {user.email}")
    else:
        print("User not found")

# Filter query - find a user by username
with Session() as session:
    user = session.query(User).filter_by(username='johndoe').first()
    if user:
        print(f"Found user: {user.username}, {user.email}")
    else:
        print("User not found")

# More complex filtering with filter() 
with Session() as session:
    # Find active users with email domain example.com
    users = session.query(User).filter(
        User.is_active == True,
        User.email.like('%@example.com')
    ).all()
    
    print(f"Found {len(users)} active users with example.com emails:")
    for user in users:
        print(f"- {user.username} ({user.email})")

# Ordering results
with Session() as session:
    # Get 5 most recently created users
    recent_users = session.query(User).order_by(User.created_at.desc()).limit(5).all()
    
    print("Recent users:")
    for user in recent_users:
        print(f"- {user.username} (created: {user.created_at})")
          

Updating Objects:


# Update a single object by modifying its attributes
with Session() as session:
    user = session.query(User).filter_by(username='johndoe').first()
    if user:
        # Modify the user object
        user.email = 'john.doe@example.com'
        user.is_active = True
        
        # Commit the changes
        session.commit()
        print(f"Updated user: {user.username} with new email: {user.email}")
    else:
        print("User not found")

# Bulk update multiple objects at once
with Session() as session:
    # Deactivate all users with a specific email domain
    result = session.query(User).filter(
        User.email.like('%@oldcompany.com')
    ).update(
        {User.is_active: False},
        synchronize_session=False
    )
    
    # Commit the changes
    session.commit()
    print(f"Deactivated {result} users with @oldcompany.com email addresses")
          

Deleting Objects:


# Delete a single object
with Session() as session:
    user = session.query(User).filter_by(username='unwanted_user').first()
    if user:
        # Mark the object for deletion
        session.delete(user)
        
        # Commit the deletion
        session.commit()
        print(f"Deleted user: {user.username}")
    else:
        print("User not found")

# Bulk delete multiple objects at once
with Session() as session:
    # Delete all inactive users
    result = session.query(User).filter(
        User.is_active == False
    ).delete(synchronize_session=False)
    
    # Commit the changes
    session.commit()
    print(f"Deleted {result} inactive users")
          

Working with Relationships

Creating Related Objects:


# Create a user with posts (one-to-many relationship)
with Session() as session:
    # Create a new user
    new_user = User(
        username='blogger',
        email='blogger@example.com',
        password_hash='hashed_password_value'
    )
    
    # Create posts for this user
    post1 = Post(
        title="My First Post",
        content="This is the content of my first post.",
        author=new_user  # Use the relationship
    )
    
    post2 = Post(
        title="My Second Post",
        content="This is the content of my second post.",
        author=new_user  # Use the relationship
    )
    
    # Add user to the session (posts will be cascade-added)
    session.add(new_user)
    
    # Commit the transaction
    session.commit()
    
    print(f"Created user with ID: {new_user.id}")
    print(f"Created posts with IDs: {post1.id}, {post2.id}")

# Alternative approach using the relationship from the other side
with Session() as session:
    # Create a new user
    new_user = User(
        username='another_blogger',
        email='another@example.com',
        password_hash='hashed_password_value'
    )
    
    # Create posts and add them to the user's posts collection
    new_user.posts = [
        Post(title="First Post", content="Content of first post."),
        Post(title="Second Post", content="Content of second post.")
    ]
    
    # Add user to the session
    session.add(new_user)
    
    # Commit the transaction
    session.commit()
          

Querying with Relationships:


# Get a user and related posts (eager loading with joinedload)
from sqlalchemy.orm import joinedload

with Session() as session:
    user = session.query(User).options(joinedload(User.posts)).filter_by(username='blogger').first()
    
    if user:
        print(f"User: {user.username}")
        print(f"Post count: {len(user.posts)}")
        
        for post in user.posts:
            print(f"- {post.title}")
    else:
        print("User not found")

# Find posts with their authors
with Session() as session:
    posts = session.query(Post).join(Post.author).filter(User.username == 'blogger').all()
    
    print(f"Found {len(posts)} posts by blogger:")
    for post in posts:
        print(f"- {post.title} (by {post.author.username})")

# Find users with specific post criteria
with Session() as session:
    # Find users who have published posts in the last week
    from datetime import datetime, timedelta
    last_week = datetime.utcnow() - timedelta(days=7)
    
    users = session.query(User).join(User.posts).filter(
        Post.is_published == True,
        Post.created_at >= last_week
    ).distinct().all()
    
    print(f"Found {len(users)} users who published in the last week:")
    for user in users:
        print(f"- {user.username}")
          

Transactions and Error Handling

Transaction Management:


# Automatic transaction management with context manager
try:
    with Session() as session:
        # Create a user
        new_user = User(username='transaction_test', email='transaction@example.com')
        session.add(new_user)
        
        # Create a post for this user
        new_post = Post(title="Test Post", content="Test content", author=new_user)
        session.add(new_post)
        
        # Commit happens automatically when the context exits without errors
except Exception as e:
    print(f"An error occurred: {e}")
    # Rollback happens automatically when an exception occurs

# Manual transaction management
session = Session()
try:
    # Create a user
    new_user = User(username='manual_transaction', email='manual@example.com')
    session.add(new_user)
    
    # Create a post for this user
    new_post = Post(title="Manual Post", content="Test content", author=new_user)
    session.add(new_post)
    
    # Commit the transaction
    session.commit()
except Exception as e:
    # Rollback on error
    session.rollback()
    print(f"An error occurred, transaction rolled back: {e}")
finally:
    # Always close the session
    session.close()

# Nested transactions with savepoints
from sqlalchemy.orm import Session

with Session(engine) as session:
    # Start an outer transaction
    with session.begin():
        # Create a user
        new_user = User(username='savepoint_test', email='savepoint@example.com')
        session.add(new_user)
        
        # Create a nested transaction (savepoint)
        try:
            with session.begin_nested():
                # This operation might fail
                duplicate_user = User(username='savepoint_test', email='duplicate@example.com')
                session.add(duplicate_user)
                # This would raise an IntegrityError due to duplicate username
        except Exception as e:
            print(f"Inner transaction failed: {e}")
            # The outer transaction is still active
        
        # Continue with the outer transaction
        new_post = Post(title="Savepoint Post", content="Test content", author=new_user)
        session.add(new_post)
        
        # Outer transaction commits automatically at the end of the context
        # If the inner transaction failed, only its operations are rolled back
          

Advanced Querying with SQLAlchemy

Complex Filtering

Advanced Query Filters:


from sqlalchemy import and_, or_, not_

with Session() as session:
    # Using and_, or_, not_ operators
    query = session.query(User).filter(
        and_(
            User.is_active == True,
            or_(
                User.username.like('j%'),
                User.email.like('%@gmail.com')
            ),
            not_(User.username == 'admin')
        )
    )
    
    users = query.all()
    print(f"Found {len(users)} users matching complex criteria:")
    for user in users:
        print(f"- {user.username} ({user.email})")

    # Alternative syntax using &, |, ~ operators
    query = session.query(User).filter(
        (User.is_active == True) &
        ((User.username.like('j%')) | (User.email.like('%@gmail.com'))) &
        ~(User.username == 'admin')
    )
    
    # Both queries produce the same SQL
          

Querying with Functions

Using SQL Functions in Queries:


from sqlalchemy import func, desc, case

with Session() as session:
    # Count users by status
    user_counts = session.query(
        User.is_active,
        func.count(User.id).label('user_count')
    ).group_by(User.is_active).all()
    
    for status, count in user_counts:
        print(f"{'Active' if status else 'Inactive'} users: {count}")
    
    # Find users with the most posts
    top_posters = session.query(
        User,
        func.count(Post.id).label('post_count')
    ).join(User.posts).group_by(User).order_by(desc('post_count')).limit(5).all()
    
    print("\nTop posters:")
    for user, post_count in top_posters:
        print(f"- {user.username}: {post_count} posts")
    
    # Using CASE statements
    user_categories = session.query(
        User.username,
        case(
            [
                (func.count(Post.id) > 10, "Prolific"),
                (func.count(Post.id) > 5, "Active"),
                (func.count(Post.id) > 0, "Beginner")
            ],
            else_="Inactive"
        ).label('category')
    ).outerjoin(User.posts).group_by(User.username).all()
    
    print("\nUser categories:")
    for username, category in user_categories:
        print(f"- {username}: {category}")
          

Subqueries and Joins

Using Subqueries and Complex Joins:


from sqlalchemy import subquery

with Session() as session:
    # Subquery: Find users who have made a post in the last month
    from datetime import datetime, timedelta
    last_month = datetime.utcnow() - timedelta(days=30)
    
    recent_posters_subq = session.query(
        Post.user_id
    ).filter(
        Post.created_at >= last_month
    ).subquery()
    
    # Main query: Get all details of these users
    recent_posters = session.query(User).filter(
        User.id.in_(recent_posters_subq)
    ).all()
    
    print(f"Users who posted in the last month: {len(recent_posters)}")
    
    # Complex join: Find users and their most recent post
    most_recent_post_subq = session.query(
        Post.user_id,
        func.max(Post.created_at).label('max_date')
    ).group_by(Post.user_id).subquery('most_recent_post')
    
    user_with_recent_post = session.query(
        User,
        Post
    ).join(
        most_recent_post_subq,
        User.id == most_recent_post_subq.c.user_id
    ).join(
        Post,
        (Post.user_id == most_recent_post_subq.c.user_id) &
        (Post.created_at == most_recent_post_subq.c.max_date)
    ).all()
    
    print("\nUsers with their most recent posts:")
    for user, post in user_with_recent_post:
        print(f"- {user.username}: '{post.title}' ({post.created_at})")
          

Pagination

Implementing Pagination:


def get_paginated_posts(session, page=1, per_page=10):
    """
    Retrieve a paginated list of posts.
    
    Args:
        session: SQLAlchemy session
        page: Page number (1-indexed)
        per_page: Number of items per page
        
    Returns:
        Dictionary containing items, pagination metadata
    """
    # Calculate offset
    offset = (page - 1) * per_page
    
    # Get items for current page
    posts = session.query(Post).order_by(Post.created_at.desc()).offset(offset).limit(per_page).all()
    
    # Get total count for pagination metadata
    total = session.query(func.count(Post.id)).scalar()
    
    # Calculate pagination metadata
    total_pages = (total + per_page - 1) // per_page  # Ceiling division
    has_next = page < total_pages
    has_prev = page > 1
    
    return {
        'items': posts,
        'pagination': {
            'page': page,
            'per_page': per_page,
            'total': total,
            'total_pages': total_pages,
            'has_next': has_next,
            'has_prev': has_prev
        }
    }

# Example usage
with Session() as session:
    # Get first page of posts
    result = get_paginated_posts(session, page=1, per_page=5)
    
    print(f"Page {result['pagination']['page']} of {result['pagination']['total_pages']}")
    print(f"Showing {len(result['items'])} of {result['pagination']['total']} posts")
    
    for post in result['items']:
        print(f"- {post.title} ({post.created_at})")
    
    # Navigation links
    if result['pagination']['has_prev']:
        print("< Previous Page")
    
    if result['pagination']['has_next']:
        print("Next Page >")
          

PostgreSQL-Specific Features

Using PostgreSQL Array Types

Working with Array Columns:


from sqlalchemy import Column, Integer, String, ARRAY
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Product(Base):
    __tablename__ = 'products'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    tags = Column(ARRAY(String))  # Array of strings
    dimensions = Column(ARRAY(Integer))  # Array of integers
    
    def __repr__(self):
        return f"<Product(name='{self.name}', tags={self.tags})>"

# Create the table
Base.metadata.create_all(engine)

# Insert data with arrays
with Session() as session:
    new_product = Product(
        name="Ergonomic Keyboard",
        tags=["electronics", "office", "ergonomic"],
        dimensions=[45, 15, 2]
    )
    
    session.add(new_product)
    session.commit()
    
    print(f"Created product: {new_product.name}, ID: {new_product.id}")

# Query with array operators
with Session() as session:
    # Find products with a specific tag (using ANY)
    from sqlalchemy.sql.expression import any_
    
    ergonomic_products = session.query(Product).filter(
        any_(Product.tags) == "ergonomic"
    ).all()
    
    print(f"Found {len(ergonomic_products)} ergonomic products")
    
    # Check if array contains all values (using @>)
    from sqlalchemy.dialects.postgresql import ARRAY
    
    office_products = session.query(Product).filter(
        Product.tags.contains(["office", "ergonomic"])
    ).all()
    
    print(f"Found {len(office_products)} products tagged with both office AND ergonomic")
          

Using PostgreSQL JSON Types

Working with JSON/JSONB Columns:


from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.declarative import declarative_base
import datetime

Base = declarative_base()

class Event(Base):
    __tablename__ = 'events'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    data = Column(JSONB, nullable=False)  # JSONB for better performance and indexing
    created_at = Column(DateTime, default=datetime.datetime.utcnow)
    
    def __repr__(self):
        return f"<Event(name='{self.name}')>"

# Create the table
Base.metadata.create_all(engine)

# Insert data with JSON
with Session() as session:
    new_event = Event(
        name="User Login",
        data={
            "user_id": 123,
            "device": {
                "type": "mobile",
                "os": "iOS",
                "version": "15.2"
            },
            "location": {
                "country": "US",
                "city": "San Francisco"
            }
        }
    )
    
    session.add(new_event)
    session.commit()
    
    print(f"Created event: {new_event.name}, ID: {new_event.id}")

# Query with JSON operators
with Session() as session:
    # JSON path match operator: ->
    mobile_logins = session.query(Event).filter(
        Event.data['device']['type'].astext == 'mobile'
    ).all()
    
    print(f"Found {len(mobile_logins)} mobile login events")
    
    # JSON containment operator: @>
    from sqlalchemy.dialects.postgresql import JSONB
    
    ios_logins = session.query(Event).filter(
        Event.data.contains({"device": {"os": "iOS"}})
    ).all()
    
    print(f"Found {len(ios_logins)} iOS login events")
    
    # Update JSON data
    from sqlalchemy.dialects.postgresql.json import JsonbConcatenationOperator
    
    session.query(Event).filter(
        Event.id == new_event.id
    ).update(
        {Event.data: JsonbConcatenationOperator(
            Event.data,
            {"processed": True}
        )},
        synchronize_session=False
    )
    
    session.commit()
    
    # Verify the update
    updated_event = session.query(Event).get(new_event.id)
    print(f"Updated event data: {updated_event.data}")
          

Full-Text Search

Implementing Full-Text Search:


from sqlalchemy import Column, Integer, String, Text, Index
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import TSVECTOR
from sqlalchemy import func, text

Base = declarative_base()

class Article(Base):
    __tablename__ = 'articles'
    
    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    content = Column(Text, nullable=False)
    search_vector = Column(TSVECTOR)  # Column to store the search vector
    
    __table_args__ = (
        # Create GIN index on the search vector for fast searches
        Index('idx_article_search_vector', 'search_vector', postgresql_using='gin'),
    )
    
    def __repr__(self):
        return f"<Article(title='{self.title}')>"

# Create the table
Base.metadata.create_all(engine)

# Add trigger to automatically update search_vector (using raw SQL)
with engine.connect() as conn:
    conn.execute(text("""
        CREATE OR REPLACE FUNCTION article_search_vector_update() RETURNS trigger AS $$
        BEGIN
            NEW.search_vector = 
                setweight(to_tsvector('english', COALESCE(NEW.title, '')), 'A') ||
                setweight(to_tsvector('english', COALESCE(NEW.content, '')), 'B');
            RETURN NEW;
        END
        $$ LANGUAGE plpgsql;
        
        DROP TRIGGER IF EXISTS article_search_vector_update ON articles;
        
        CREATE TRIGGER article_search_vector_update
        BEFORE INSERT OR UPDATE ON articles
        FOR EACH ROW EXECUTE FUNCTION article_search_vector_update();
    """))

# Insert sample data
with Session() as session:
    articles = [
        Article(
            title="PostgreSQL Full-Text Search",
            content="PostgreSQL offers powerful full-text search capabilities that are often overlooked. "
                  "This feature allows natural language queries against text content."
        ),
        Article(
            title="SQLAlchemy ORM Tutorial",
            content="Learn how to use SQLAlchemy ORM to interact with your database. "
                  "This tutorial covers relationships, querying, and PostgreSQL features."
        ),
        Article(
            title="Web Development with Python",
            content="Python is an excellent language for web development. "
                  "Frameworks like Flask and Django make it easy to build powerful web applications."
        )
    ]
    
    session.add_all(articles)
    session.commit()
    
    print(f"Added {len(articles)} sample articles")

# Perform full-text search
with Session() as session:
    # Simple text search
    search_term = "postgresql"
    
    # Build the search query
    search_query = session.query(
        Article,
        func.ts_rank(Article.search_vector, func.plainto_tsquery('english', search_term)).label('rank')
    ).filter(
        Article.search_vector.op('@@')(func.plainto_tsquery('english', search_term))
    ).order_by(text('rank DESC'))
    
    results = search_query.all()
    
    print(f"\nSearch results for '{search_term}':")
    for article, rank in results:
        print(f"- {article.title} (Rank: {rank:.4f})")
        
    # More complex search
    search_term = "python web applications"
    
    # Build the search query with phrase search
    search_query = session.query(
        Article,
        func.ts_rank(Article.search_vector, func.to_tsquery('english', ' & '.join([f"{word}:*" for word in search_term.split()]))).label('rank')
    ).filter(
        Article.search_vector.op('@@')(func.to_tsquery('english', ' & '.join([f"{word}:*" for word in search_term.split()])))
    ).order_by(text('rank DESC'))
    
    results = search_query.all()
    
    print(f"\nSearch results for '{search_term}':")
    for article, rank in results:
        print(f"- {article.title} (Rank: {rank:.4f})")
          

SQLAlchemy Best Practices

Performance Optimization

Performance Optimization Examples:


from sqlalchemy.orm import joinedload, contains_eager, load_only

# N+1 Query Problem (BAD)
with Session() as session:
    users = session.query(User).all()
    
    # This will make a separate query for each user's posts
    for user in users:
        print(f"{user.username} has {len(user.posts)} posts")

# Solution: Eager Loading (GOOD)
with Session() as session:
    # Load users and their posts in a single query
    users = session.query(User).options(joinedload(User.posts)).all()
    
    # No additional queries needed
    for user in users:
        print(f"{user.username} has {len(user.posts)} posts")

# Select only needed columns
with Session() as session:
    # Instead of session.query(User).all()
    users = session.query(User).options(load_only(User.id, User.username, User.email)).all()

# Bulk Insert
with Session() as session:
    # Create 1000 sample users
    users = [
        User(username=f"user{i}", email=f"user{i}@example.com")
        for i in range(1, 1001)
    ]
    
    # Using add_all() vs bulk_save_objects()
    # session.add_all(users)  # Creates 1000 separate INSERT statements
    
    # Bulk insert (more efficient)
    session.bulk_save_objects(users)
    session.commit()
          

Connection and Session Management

Implementing a Session Factory Pattern:


# db.py - Database module
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
import os

# Get database URL from environment variable
DATABASE_URL = os.environ.get(
    'DATABASE_URL', 
    'postgresql://pythonapp:secure_password@localhost:5432/python_app_db'
)

# Create engine
engine = create_engine(DATABASE_URL, pool_size=5, max_overflow=10)

# Create base class for models
Base = declarative_base()

# Create session factory
session_factory = sessionmaker(bind=engine)

# Create scoped session for web applications
# This ties sessions to the current thread or request
SessionLocal = scoped_session(session_factory)

# Helper function to get a database session
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Create all tables
def init_db():
    Base.metadata.create_all(bind=engine)

# Example usage in a Flask application
"""
from flask import Flask
from db import get_db, init_db

app = Flask(__name__)

@app.before_first_request
def setup():
    init_db()

@app.route('/users')
def list_users():
    db = next(get_db())
    users = db.query(User).all()
    return {'users': [user.username for user in users]}
"""

# Example usage in a FastAPI application
"""
from fastapi import FastAPI, Depends
from db import get_db, init_db
from models import User

app = FastAPI()

@app.on_event("startup")
def startup_event():
    init_db()

@app.get('/users')
def list_users(db = Depends(get_db)):
    users = db.query(User).all()
    return {'users': [user.username for user in users]}
"""
          

Structuring SQLAlchemy Code

Repository Pattern Example:


# models.py - Define SQLAlchemy models
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from db import Base
import datetime

class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    password_hash = Column(String(128), nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)
    
    posts = relationship("Post", back_populates="author")
    
    def __repr__(self):
        return f"<User(username='{self.username}', email='{self.email}')>"

# repository.py - Repository classes for database operations
from sqlalchemy.orm import Session
from models import User
from typing import List, Optional, Dict, Any

class UserRepository:
    def __init__(self, session: Session):
        self.session = session
    
    def get_by_id(self, user_id: int) -> Optional[User]:
        return self.session.query(User).filter(User.id == user_id).first()
    
    def get_by_username(self, username: str) -> Optional[User]:
        return self.session.query(User).filter(User.username == username).first()
    
    def get_all(self, skip: int = 0, limit: int = 100) -> List[User]:
        return self.session.query(User).offset(skip).limit(limit).all()
    
    def create(self, user_data: Dict[str, Any]) -> User:
        user = User(**user_data)
        self.session.add(user)
        self.session.commit()
        self.session.refresh(user)
        return user
    
    def update(self, user_id: int, user_data: Dict[str, Any]) -> Optional[User]:
        user = self.get_by_id(user_id)
        if user:
            for key, value in user_data.items():
                setattr(user, key, value)
            self.session.commit()
            self.session.refresh(user)
        return user
    
    def delete(self, user_id: int) -> bool:
        user = self.get_by_id(user_id)
        if user:
            self.session.delete(user)
            self.session.commit()
            return True
        return False

# schema.py - Pydantic models for validation and serialization
from pydantic import BaseModel, EmailStr
from typing import Optional, List
from datetime import datetime

class UserBase(BaseModel):
    username: str
    email: EmailStr
    is_active: Optional[bool] = True

class UserCreate(UserBase):
    password: str

class UserUpdate(BaseModel):
    username: Optional[str] = None
    email: Optional[EmailStr] = None
    is_active: Optional[bool] = None

class UserResponse(UserBase):
    id: int
    created_at: datetime
    
    class Config:
        orm_mode = True

# service.py - Business logic layer
from repository import UserRepository
from schema import UserCreate, UserUpdate, UserResponse
from sqlalchemy.orm import Session
from typing import List, Optional
import hashlib

class UserService:
    def __init__(self, db: Session):
        self.repository = UserRepository(db)
    
    def get_user(self, user_id: int) -> Optional[UserResponse]:
        user = self.repository.get_by_id(user_id)
        if user:
            return UserResponse.from_orm(user)
        return None
    
    def get_users(self, skip: int = 0, limit: int = 100) -> List[UserResponse]:
        users = self.repository.get_all(skip, limit)
        return [UserResponse.from_orm(user) for user in users]
    
    def create_user(self, user_data: UserCreate) -> UserResponse:
        # Check if username already exists
        existing_user = self.repository.get_by_username(user_data.username)
        if existing_user:
            raise ValueError("Username already exists")
        
        # Hash password
        hashed_password = hashlib.sha256(user_data.password.encode()).hexdigest()
        
        # Create user
        user_dict = user_data.dict(exclude={"password"})
        user_dict["password_hash"] = hashed_password
        
        user = self.repository.create(user_dict)
        return UserResponse.from_orm(user)
    
    def update_user(self, user_id: int, user_data: UserUpdate) -> Optional[UserResponse]:
        user = self.repository.update(user_id, user_data.dict(exclude_unset=True))
        if user:
            return UserResponse.from_orm(user)
        return None
    
    def delete_user(self, user_id: int) -> bool:
        return self.repository.delete(user_id)

# Example usage
from db import SessionLocal

def main():
    db = SessionLocal()
    try:
        # Create user service
        user_service = UserService(db)
        
        # Create a user
        new_user = UserCreate(
            username="johndoe",
            email="john@example.com",
            password="secure_password"
        )
        user = user_service.create_user(new_user)
        print(f"Created user: {user.username}, ID: {user.id}")
        
        # Get all users
        users = user_service.get_users()
        print(f"Total users: {len(users)}")
        
        # Update user
        updated_user = user_service.update_user(
            user.id,
            UserUpdate(email="john.doe@example.com")
        )
        print(f"Updated user email: {updated_user.email}")
        
        # Delete user
        deleted = user_service.delete_user(user.id)
        print(f"User deleted: {deleted}")
    finally:
        db.close()

if __name__ == "__main__":
    main()
          

Practical Activities

Activity 1: Building a Blog API with SQLAlchemy

Create a simple blog API using SQLAlchemy with the following features:

  1. Define models for User, Post, and Comment with appropriate relationships
  2. Implement repository classes for each model
  3. Create API endpoints for:
    • User registration and authentication
    • Creating, reading, updating, and deleting posts
    • Adding and retrieving comments on posts
  4. Add validation for all input data
  5. Implement proper error handling and response formatting

Activity 2: Data Migration Script

Create a script that demonstrates data migration between different database schemas:

  1. Define an "old" schema with a simple users table
  2. Define a "new" schema with an enhanced users table (additional fields)
  3. Write a migration script that:
    • Reads data from the old schema
    • Transforms it as needed
    • Writes it to the new schema
    • Handles potential data conversion issues

Activity 3: PostgreSQL-Specific Features

Build an application that showcases PostgreSQL-specific features with SQLAlchemy:

  1. Implement a product catalog with:
    • JSON/JSONB columns for storing complex product attributes
    • Array columns for storing tags and categories
    • Full-text search capabilities for product descriptions
  2. Create a user interface or API endpoints to interact with the catalog
  3. Implement advanced filtering and searching functionality
  4. Add performance metrics to measure query execution time

Key Takeaways

Further Learning Resources