Introduction to ORM and SQLAlchemy
Object-Relational Mapping (ORM) is a programming technique that converts data between incompatible type systems in object-oriented programming languages and relational databases. SQLAlchemy is the most powerful and flexible ORM library for Python.
Analogy: ORM as a Universal Translator
Think of an ORM like a universal translator in science fiction:
- Your application speaks "Object-Oriented" - it thinks in terms of classes, attributes, methods, and inheritance
- Your database speaks "Relational" - it understands tables, columns, rows, and foreign keys
- The ORM serves as the translator between these two different "languages"
- Without the translator, you'd have to manually convert your objects to SQL, then convert the SQL results back to objects
- With the translator, you can work exclusively in your native "language" (Python objects), and the ORM handles all the translation behind the scenes
Just as a universal translator allows species from different planets to communicate seamlessly, an ORM lets your object-oriented code communicate with your relational database without having to understand the details of SQL.
Benefits of Using ORMs
- Productivity: Write less code to interact with databases
- Abstraction: Focus on your application's data models rather than SQL syntax
- Database Independence: Switch database systems with minimal code changes
- Type Safety: Work with native Python types instead of string manipulation
- Maintainability: Organize database access code in a structured, object-oriented way
- Security: Automatic protection against SQL injection vulnerabilities
- Query Building: Create complex queries programmatically
Understanding SQLAlchemy's Architecture
SQLAlchemy consists of two main components:
- SQLAlchemy Core: A SQL toolkit providing a SQL expression language, connection pooling, and schema manipulation tools
- SQLAlchemy ORM: The Object-Relational Mapper built on top of Core that provides object-relational mapping capabilities
This architecture gives SQLAlchemy its flexibility - you can use just the Core for direct SQL manipulation, just the ORM for object-relational mapping, or combine both as needed.
Setting Up SQLAlchemy with PostgreSQL
Installation
To get started with SQLAlchemy and PostgreSQL, you need to install:
- SQLAlchemy: The ORM library itself
- psycopg2: The PostgreSQL adapter that SQLAlchemy uses behind the scenes
Installing Required Packages:
# Install SQLAlchemy and PostgreSQL adapter
pip install sqlalchemy psycopg2-binary
# For development environments, using a virtual environment is recommended
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install sqlalchemy psycopg2-binary
Creating an Engine
The SQLAlchemy Engine is the starting point for any SQLAlchemy application. It represents the core interface to the database, managing a connection or pool of connections.
Creating a SQLAlchemy Engine:
from sqlalchemy import create_engine
# Connection string format: postgresql://username:password@host:port/database
engine = create_engine('postgresql://pythonapp:secure_password@localhost:5432/python_app_db')
# Testing the connection
try:
connection = engine.connect()
print("Connection successful!")
connection.close()
except Exception as e:
print(f"Error connecting to PostgreSQL: {e}")
Engine with Configuration Options:
from sqlalchemy import create_engine
import os
# Get database credentials from environment variables for better security
db_user = os.environ.get('DB_USER', 'pythonapp')
db_password = os.environ.get('DB_PASSWORD', 'secure_password')
db_host = os.environ.get('DB_HOST', 'localhost')
db_port = os.environ.get('DB_PORT', '5432')
db_name = os.environ.get('DB_NAME', 'python_app_db')
# Create engine with configuration options
engine = create_engine(
f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}',
pool_size=5, # Maximum number of connections in the pool
max_overflow=10, # Maximum number of connections to create beyond pool_size
pool_timeout=30, # Seconds to wait before giving up on getting a connection
pool_recycle=1800, # Recycle connections after 30 minutes (1800 seconds)
echo=True # Print all executed SQL (useful for debugging)
)
Engine Configuration Best Practices
- Use Environment Variables: Never hardcode database credentials in your code
- Set pool_size: Match your database connection pool to your application's concurrency needs
- Enable pool_recycle: Prevents issues with stale connections being terminated by database servers
- Use echo=True in Development: Helpful for debugging, but turn off in production
- Consider echo_pool=True: For debugging connection pool issues
- Set connect_args for SSL: In production, use SSL for secure connections
Defining Models with SQLAlchemy ORM
SQLAlchemy models define the mapping between Python classes and database tables. The Declarative system is the most common way to define models.
Basic Model Definition
Creating a Base Class:
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
import datetime
# Create an engine
engine = create_engine('postgresql://pythonapp:secure_password@localhost:5432/python_app_db')
# Create a base class for declarative models
Base = declarative_base()
# Define a User model
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
password_hash = Column(String(128), nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
# Define a relationship to the Post model (defined later)
posts = relationship("Post", back_populates="author")
def __repr__(self):
return f"<User(username='{self.username}', email='{self.email}')>"
# Define a Post model with a relationship to User
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(String, nullable=False)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
is_published = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
updated_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
# Define the relationship back to the User model
author = relationship("User", back_populates="posts")
def __repr__(self):
return f"<Post(title='{self.title}', author_id={self.user_id})>"
# Create the tables in the database
Base.metadata.create_all(engine)
Column Types in SQLAlchemy
SQLAlchemy provides a wide range of column types that map to native database types:
| SQLAlchemy Type | Python Type | PostgreSQL Type | Description |
|---|---|---|---|
| Integer | int | INTEGER | Whole numbers |
| BigInteger | int | BIGINT | Large whole numbers |
| Float | float | REAL | Floating-point numbers |
| Numeric | decimal.Decimal | NUMERIC | Fixed-precision numbers |
| String | str | VARCHAR | Variable-length strings |
| Text | str | TEXT | Unlimited-length text |
| Boolean | bool | BOOLEAN | True/False values |
| Date | datetime.date | DATE | Calendar date |
| Time | datetime.time | TIME | Time of day |
| DateTime | datetime.datetime | TIMESTAMP | Date and time |
| Interval | datetime.timedelta | INTERVAL | Time periods |
| Enum | enum.Enum | ENUM or VARCHAR | Enumerated values |
| ARRAY | list | ARRAY | Array of values |
| JSON | dict/list | JSON | JSON data |
| JSONB | dict/list | JSONB | Binary JSON data |
| UUID | uuid.UUID | UUID | Universally Unique Identifier |
Model Relationships
SQLAlchemy provides powerful tools for defining relationships between models:
One-to-Many Relationship:
# One-to-Many: One User has many Posts
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
# Other fields...
# One User has many Posts
posts = relationship("Post", back_populates="author")
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
# Other fields...
# Foreign key to User
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
# Each Post belongs to one User
author = relationship("User", back_populates="posts")
Many-to-Many Relationship:
# Association table for Many-to-Many relationship
post_tag = Table('post_tag', Base.metadata,
Column('post_id', Integer, ForeignKey('posts.id'), primary_key=True),
Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True)
)
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
# Other fields...
# Many-to-Many: A Post can have many Tags
tags = relationship("Tag", secondary=post_tag, back_populates="posts")
class Tag(Base):
__tablename__ = 'tags'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False)
# Many-to-Many: A Tag can belong to many Posts
posts = relationship("Post", secondary=post_tag, back_populates="tags")
One-to-One Relationship:
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
# Other fields...
# One-to-One: One User has one Profile
profile = relationship("Profile", uselist=False, back_populates="user")
class Profile(Base):
__tablename__ = 'profiles'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'), unique=True, nullable=False)
bio = Column(Text)
# Other fields...
# One-to-One: One Profile belongs to one User
user = relationship("User", back_populates="profile")
Adding Constraints and Indexes
Using Constraints and Indexes:
from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey, Boolean, Text
from sqlalchemy import UniqueConstraint, CheckConstraint, Index
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Product(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
sku = Column(String(20), nullable=False)
name = Column(String(100), nullable=False)
price = Column(Float, nullable=False)
stock = Column(Integer, default=0)
category_id = Column(Integer, ForeignKey('categories.id'))
description = Column(Text)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
# Define a unique constraint on SKU
__table_args__ = (
UniqueConstraint('sku', name='uix_product_sku'),
# Define a check constraint for price and stock
CheckConstraint('price >= 0', name='chk_product_price_positive'),
CheckConstraint('stock >= 0', name='chk_product_stock_non_negative'),
# Define indexes for common queries
Index('idx_product_category', 'category_id'),
Index('idx_product_name', 'name'),
Index('idx_product_active_category', 'is_active', 'category_id'),
)
def __repr__(self):
return f"<Product(name='{self.name}', price={self.price})>"
Working with SQLAlchemy Sessions
The Session is the central element of SQLAlchemy's ORM mechanism. It serves as a "workspace" for your database operations, maintaining a collection of objects to be inserted, updated, or deleted.
Creating a Session
Setting Up a Session Factory:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# Create an engine
engine = create_engine('postgresql://pythonapp:secure_password@localhost:5432/python_app_db')
# Create a session factory
Session = sessionmaker(bind=engine)
# Create a session
session = Session()
# Use the session for database operations
# When done, close the session
session.close()
Using the Session with a Context Manager:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# Create an engine and session factory
engine = create_engine('postgresql://pythonapp:secure_password@localhost:5432/python_app_db')
Session = sessionmaker(bind=engine)
# Use the session in a context manager (automatically handles closing)
with Session() as session:
# Use the session for database operations
# ...
# Changes are committed when the context exits without errors
# or rolled back if an exception occurs
Basic CRUD Operations
Creating (Inserting) Objects:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Base, User, Post # Import your models
engine = create_engine('postgresql://pythonapp:secure_password@localhost:5432/python_app_db')
Session = sessionmaker(bind=engine)
# Create a new user
with Session() as session:
new_user = User(
username='johndoe',
email='john@example.com',
password_hash='hashed_password_value'
)
# Add the user to the session
session.add(new_user)
# Commit the transaction
session.commit()
# After commit, the ID is available
print(f"Created user with ID: {new_user.id}")
# Create multiple objects
with Session() as session:
users = [
User(username='janedoe', email='jane@example.com', password_hash='hashed_password_1'),
User(username='bobsmith', email='bob@example.com', password_hash='hashed_password_2'),
User(username='alicejones', email='alice@example.com', password_hash='hashed_password_3')
]
# Add all users to the session
session.add_all(users)
# Commit the transaction
session.commit()
# Print the IDs of all users
for user in users:
print(f"Created user: {user.username} with ID: {user.id}")
Reading (Querying) Objects:
# Get a single user by primary key
with Session() as session:
user = session.query(User).get(1)
if user:
print(f"Found user: {user.username}, {user.email}")
else:
print("User not found")
# Filter query - find a user by username
with Session() as session:
user = session.query(User).filter_by(username='johndoe').first()
if user:
print(f"Found user: {user.username}, {user.email}")
else:
print("User not found")
# More complex filtering with filter()
with Session() as session:
# Find active users with email domain example.com
users = session.query(User).filter(
User.is_active == True,
User.email.like('%@example.com')
).all()
print(f"Found {len(users)} active users with example.com emails:")
for user in users:
print(f"- {user.username} ({user.email})")
# Ordering results
with Session() as session:
# Get 5 most recently created users
recent_users = session.query(User).order_by(User.created_at.desc()).limit(5).all()
print("Recent users:")
for user in recent_users:
print(f"- {user.username} (created: {user.created_at})")
Updating Objects:
# Update a single object by modifying its attributes
with Session() as session:
user = session.query(User).filter_by(username='johndoe').first()
if user:
# Modify the user object
user.email = 'john.doe@example.com'
user.is_active = True
# Commit the changes
session.commit()
print(f"Updated user: {user.username} with new email: {user.email}")
else:
print("User not found")
# Bulk update multiple objects at once
with Session() as session:
# Deactivate all users with a specific email domain
result = session.query(User).filter(
User.email.like('%@oldcompany.com')
).update(
{User.is_active: False},
synchronize_session=False
)
# Commit the changes
session.commit()
print(f"Deactivated {result} users with @oldcompany.com email addresses")
Deleting Objects:
# Delete a single object
with Session() as session:
user = session.query(User).filter_by(username='unwanted_user').first()
if user:
# Mark the object for deletion
session.delete(user)
# Commit the deletion
session.commit()
print(f"Deleted user: {user.username}")
else:
print("User not found")
# Bulk delete multiple objects at once
with Session() as session:
# Delete all inactive users
result = session.query(User).filter(
User.is_active == False
).delete(synchronize_session=False)
# Commit the changes
session.commit()
print(f"Deleted {result} inactive users")
Working with Relationships
Creating Related Objects:
# Create a user with posts (one-to-many relationship)
with Session() as session:
# Create a new user
new_user = User(
username='blogger',
email='blogger@example.com',
password_hash='hashed_password_value'
)
# Create posts for this user
post1 = Post(
title="My First Post",
content="This is the content of my first post.",
author=new_user # Use the relationship
)
post2 = Post(
title="My Second Post",
content="This is the content of my second post.",
author=new_user # Use the relationship
)
# Add user to the session (posts will be cascade-added)
session.add(new_user)
# Commit the transaction
session.commit()
print(f"Created user with ID: {new_user.id}")
print(f"Created posts with IDs: {post1.id}, {post2.id}")
# Alternative approach using the relationship from the other side
with Session() as session:
# Create a new user
new_user = User(
username='another_blogger',
email='another@example.com',
password_hash='hashed_password_value'
)
# Create posts and add them to the user's posts collection
new_user.posts = [
Post(title="First Post", content="Content of first post."),
Post(title="Second Post", content="Content of second post.")
]
# Add user to the session
session.add(new_user)
# Commit the transaction
session.commit()
Querying with Relationships:
# Get a user and related posts (eager loading with joinedload)
from sqlalchemy.orm import joinedload
with Session() as session:
user = session.query(User).options(joinedload(User.posts)).filter_by(username='blogger').first()
if user:
print(f"User: {user.username}")
print(f"Post count: {len(user.posts)}")
for post in user.posts:
print(f"- {post.title}")
else:
print("User not found")
# Find posts with their authors
with Session() as session:
posts = session.query(Post).join(Post.author).filter(User.username == 'blogger').all()
print(f"Found {len(posts)} posts by blogger:")
for post in posts:
print(f"- {post.title} (by {post.author.username})")
# Find users with specific post criteria
with Session() as session:
# Find users who have published posts in the last week
from datetime import datetime, timedelta
last_week = datetime.utcnow() - timedelta(days=7)
users = session.query(User).join(User.posts).filter(
Post.is_published == True,
Post.created_at >= last_week
).distinct().all()
print(f"Found {len(users)} users who published in the last week:")
for user in users:
print(f"- {user.username}")
Transactions and Error Handling
Transaction Management:
# Automatic transaction management with context manager
try:
with Session() as session:
# Create a user
new_user = User(username='transaction_test', email='transaction@example.com')
session.add(new_user)
# Create a post for this user
new_post = Post(title="Test Post", content="Test content", author=new_user)
session.add(new_post)
# Commit happens automatically when the context exits without errors
except Exception as e:
print(f"An error occurred: {e}")
# Rollback happens automatically when an exception occurs
# Manual transaction management
session = Session()
try:
# Create a user
new_user = User(username='manual_transaction', email='manual@example.com')
session.add(new_user)
# Create a post for this user
new_post = Post(title="Manual Post", content="Test content", author=new_user)
session.add(new_post)
# Commit the transaction
session.commit()
except Exception as e:
# Rollback on error
session.rollback()
print(f"An error occurred, transaction rolled back: {e}")
finally:
# Always close the session
session.close()
# Nested transactions with savepoints
from sqlalchemy.orm import Session
with Session(engine) as session:
# Start an outer transaction
with session.begin():
# Create a user
new_user = User(username='savepoint_test', email='savepoint@example.com')
session.add(new_user)
# Create a nested transaction (savepoint)
try:
with session.begin_nested():
# This operation might fail
duplicate_user = User(username='savepoint_test', email='duplicate@example.com')
session.add(duplicate_user)
# This would raise an IntegrityError due to duplicate username
except Exception as e:
print(f"Inner transaction failed: {e}")
# The outer transaction is still active
# Continue with the outer transaction
new_post = Post(title="Savepoint Post", content="Test content", author=new_user)
session.add(new_post)
# Outer transaction commits automatically at the end of the context
# If the inner transaction failed, only its operations are rolled back
Advanced Querying with SQLAlchemy
Complex Filtering
Advanced Query Filters:
from sqlalchemy import and_, or_, not_
with Session() as session:
# Using and_, or_, not_ operators
query = session.query(User).filter(
and_(
User.is_active == True,
or_(
User.username.like('j%'),
User.email.like('%@gmail.com')
),
not_(User.username == 'admin')
)
)
users = query.all()
print(f"Found {len(users)} users matching complex criteria:")
for user in users:
print(f"- {user.username} ({user.email})")
# Alternative syntax using &, |, ~ operators
query = session.query(User).filter(
(User.is_active == True) &
((User.username.like('j%')) | (User.email.like('%@gmail.com'))) &
~(User.username == 'admin')
)
# Both queries produce the same SQL
Querying with Functions
Using SQL Functions in Queries:
from sqlalchemy import func, desc, case
with Session() as session:
# Count users by status
user_counts = session.query(
User.is_active,
func.count(User.id).label('user_count')
).group_by(User.is_active).all()
for status, count in user_counts:
print(f"{'Active' if status else 'Inactive'} users: {count}")
# Find users with the most posts
top_posters = session.query(
User,
func.count(Post.id).label('post_count')
).join(User.posts).group_by(User).order_by(desc('post_count')).limit(5).all()
print("\nTop posters:")
for user, post_count in top_posters:
print(f"- {user.username}: {post_count} posts")
# Using CASE statements
user_categories = session.query(
User.username,
case(
[
(func.count(Post.id) > 10, "Prolific"),
(func.count(Post.id) > 5, "Active"),
(func.count(Post.id) > 0, "Beginner")
],
else_="Inactive"
).label('category')
).outerjoin(User.posts).group_by(User.username).all()
print("\nUser categories:")
for username, category in user_categories:
print(f"- {username}: {category}")
Subqueries and Joins
Using Subqueries and Complex Joins:
from sqlalchemy import subquery
with Session() as session:
# Subquery: Find users who have made a post in the last month
from datetime import datetime, timedelta
last_month = datetime.utcnow() - timedelta(days=30)
recent_posters_subq = session.query(
Post.user_id
).filter(
Post.created_at >= last_month
).subquery()
# Main query: Get all details of these users
recent_posters = session.query(User).filter(
User.id.in_(recent_posters_subq)
).all()
print(f"Users who posted in the last month: {len(recent_posters)}")
# Complex join: Find users and their most recent post
most_recent_post_subq = session.query(
Post.user_id,
func.max(Post.created_at).label('max_date')
).group_by(Post.user_id).subquery('most_recent_post')
user_with_recent_post = session.query(
User,
Post
).join(
most_recent_post_subq,
User.id == most_recent_post_subq.c.user_id
).join(
Post,
(Post.user_id == most_recent_post_subq.c.user_id) &
(Post.created_at == most_recent_post_subq.c.max_date)
).all()
print("\nUsers with their most recent posts:")
for user, post in user_with_recent_post:
print(f"- {user.username}: '{post.title}' ({post.created_at})")
Pagination
Implementing Pagination:
def get_paginated_posts(session, page=1, per_page=10):
"""
Retrieve a paginated list of posts.
Args:
session: SQLAlchemy session
page: Page number (1-indexed)
per_page: Number of items per page
Returns:
Dictionary containing items, pagination metadata
"""
# Calculate offset
offset = (page - 1) * per_page
# Get items for current page
posts = session.query(Post).order_by(Post.created_at.desc()).offset(offset).limit(per_page).all()
# Get total count for pagination metadata
total = session.query(func.count(Post.id)).scalar()
# Calculate pagination metadata
total_pages = (total + per_page - 1) // per_page # Ceiling division
has_next = page < total_pages
has_prev = page > 1
return {
'items': posts,
'pagination': {
'page': page,
'per_page': per_page,
'total': total,
'total_pages': total_pages,
'has_next': has_next,
'has_prev': has_prev
}
}
# Example usage
with Session() as session:
# Get first page of posts
result = get_paginated_posts(session, page=1, per_page=5)
print(f"Page {result['pagination']['page']} of {result['pagination']['total_pages']}")
print(f"Showing {len(result['items'])} of {result['pagination']['total']} posts")
for post in result['items']:
print(f"- {post.title} ({post.created_at})")
# Navigation links
if result['pagination']['has_prev']:
print("< Previous Page")
if result['pagination']['has_next']:
print("Next Page >")
PostgreSQL-Specific Features
Using PostgreSQL Array Types
Working with Array Columns:
from sqlalchemy import Column, Integer, String, ARRAY
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Product(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
tags = Column(ARRAY(String)) # Array of strings
dimensions = Column(ARRAY(Integer)) # Array of integers
def __repr__(self):
return f"<Product(name='{self.name}', tags={self.tags})>"
# Create the table
Base.metadata.create_all(engine)
# Insert data with arrays
with Session() as session:
new_product = Product(
name="Ergonomic Keyboard",
tags=["electronics", "office", "ergonomic"],
dimensions=[45, 15, 2]
)
session.add(new_product)
session.commit()
print(f"Created product: {new_product.name}, ID: {new_product.id}")
# Query with array operators
with Session() as session:
# Find products with a specific tag (using ANY)
from sqlalchemy.sql.expression import any_
ergonomic_products = session.query(Product).filter(
any_(Product.tags) == "ergonomic"
).all()
print(f"Found {len(ergonomic_products)} ergonomic products")
# Check if array contains all values (using @>)
from sqlalchemy.dialects.postgresql import ARRAY
office_products = session.query(Product).filter(
Product.tags.contains(["office", "ergonomic"])
).all()
print(f"Found {len(office_products)} products tagged with both office AND ergonomic")
Using PostgreSQL JSON Types
Working with JSON/JSONB Columns:
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.declarative import declarative_base
import datetime
Base = declarative_base()
class Event(Base):
__tablename__ = 'events'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
data = Column(JSONB, nullable=False) # JSONB for better performance and indexing
created_at = Column(DateTime, default=datetime.datetime.utcnow)
def __repr__(self):
return f"<Event(name='{self.name}')>"
# Create the table
Base.metadata.create_all(engine)
# Insert data with JSON
with Session() as session:
new_event = Event(
name="User Login",
data={
"user_id": 123,
"device": {
"type": "mobile",
"os": "iOS",
"version": "15.2"
},
"location": {
"country": "US",
"city": "San Francisco"
}
}
)
session.add(new_event)
session.commit()
print(f"Created event: {new_event.name}, ID: {new_event.id}")
# Query with JSON operators
with Session() as session:
# JSON path match operator: ->
mobile_logins = session.query(Event).filter(
Event.data['device']['type'].astext == 'mobile'
).all()
print(f"Found {len(mobile_logins)} mobile login events")
# JSON containment operator: @>
from sqlalchemy.dialects.postgresql import JSONB
ios_logins = session.query(Event).filter(
Event.data.contains({"device": {"os": "iOS"}})
).all()
print(f"Found {len(ios_logins)} iOS login events")
# Update JSON data
from sqlalchemy.dialects.postgresql.json import JsonbConcatenationOperator
session.query(Event).filter(
Event.id == new_event.id
).update(
{Event.data: JsonbConcatenationOperator(
Event.data,
{"processed": True}
)},
synchronize_session=False
)
session.commit()
# Verify the update
updated_event = session.query(Event).get(new_event.id)
print(f"Updated event data: {updated_event.data}")
Full-Text Search
Implementing Full-Text Search:
from sqlalchemy import Column, Integer, String, Text, Index
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import TSVECTOR
from sqlalchemy import func, text
Base = declarative_base()
class Article(Base):
__tablename__ = 'articles'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(Text, nullable=False)
search_vector = Column(TSVECTOR) # Column to store the search vector
__table_args__ = (
# Create GIN index on the search vector for fast searches
Index('idx_article_search_vector', 'search_vector', postgresql_using='gin'),
)
def __repr__(self):
return f"<Article(title='{self.title}')>"
# Create the table
Base.metadata.create_all(engine)
# Add trigger to automatically update search_vector (using raw SQL)
with engine.connect() as conn:
conn.execute(text("""
CREATE OR REPLACE FUNCTION article_search_vector_update() RETURNS trigger AS $$
BEGIN
NEW.search_vector =
setweight(to_tsvector('english', COALESCE(NEW.title, '')), 'A') ||
setweight(to_tsvector('english', COALESCE(NEW.content, '')), 'B');
RETURN NEW;
END
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS article_search_vector_update ON articles;
CREATE TRIGGER article_search_vector_update
BEFORE INSERT OR UPDATE ON articles
FOR EACH ROW EXECUTE FUNCTION article_search_vector_update();
"""))
# Insert sample data
with Session() as session:
articles = [
Article(
title="PostgreSQL Full-Text Search",
content="PostgreSQL offers powerful full-text search capabilities that are often overlooked. "
"This feature allows natural language queries against text content."
),
Article(
title="SQLAlchemy ORM Tutorial",
content="Learn how to use SQLAlchemy ORM to interact with your database. "
"This tutorial covers relationships, querying, and PostgreSQL features."
),
Article(
title="Web Development with Python",
content="Python is an excellent language for web development. "
"Frameworks like Flask and Django make it easy to build powerful web applications."
)
]
session.add_all(articles)
session.commit()
print(f"Added {len(articles)} sample articles")
# Perform full-text search
with Session() as session:
# Simple text search
search_term = "postgresql"
# Build the search query
search_query = session.query(
Article,
func.ts_rank(Article.search_vector, func.plainto_tsquery('english', search_term)).label('rank')
).filter(
Article.search_vector.op('@@')(func.plainto_tsquery('english', search_term))
).order_by(text('rank DESC'))
results = search_query.all()
print(f"\nSearch results for '{search_term}':")
for article, rank in results:
print(f"- {article.title} (Rank: {rank:.4f})")
# More complex search
search_term = "python web applications"
# Build the search query with phrase search
search_query = session.query(
Article,
func.ts_rank(Article.search_vector, func.to_tsquery('english', ' & '.join([f"{word}:*" for word in search_term.split()]))).label('rank')
).filter(
Article.search_vector.op('@@')(func.to_tsquery('english', ' & '.join([f"{word}:*" for word in search_term.split()])))
).order_by(text('rank DESC'))
results = search_query.all()
print(f"\nSearch results for '{search_term}':")
for article, rank in results:
print(f"- {article.title} (Rank: {rank:.4f})")
SQLAlchemy Best Practices
Performance Optimization
- Use Eager Loading: Load related objects in a single query to avoid the N+1 query problem
- Optimize Queries: Only select the columns you need instead of using SELECT *
- Use Bulk Operations: For inserting or updating multiple records, use bulk methods
- Create Proper Indexes: Add indexes for columns used in WHERE, JOIN, and ORDER BY clauses
- Use Connection Pooling: SQLAlchemy's connection pooling helps reuse database connections
- Profile Your Queries: Enable query logging to identify slow queries
Performance Optimization Examples:
from sqlalchemy.orm import joinedload, contains_eager, load_only
# N+1 Query Problem (BAD)
with Session() as session:
users = session.query(User).all()
# This will make a separate query for each user's posts
for user in users:
print(f"{user.username} has {len(user.posts)} posts")
# Solution: Eager Loading (GOOD)
with Session() as session:
# Load users and their posts in a single query
users = session.query(User).options(joinedload(User.posts)).all()
# No additional queries needed
for user in users:
print(f"{user.username} has {len(user.posts)} posts")
# Select only needed columns
with Session() as session:
# Instead of session.query(User).all()
users = session.query(User).options(load_only(User.id, User.username, User.email)).all()
# Bulk Insert
with Session() as session:
# Create 1000 sample users
users = [
User(username=f"user{i}", email=f"user{i}@example.com")
for i in range(1, 1001)
]
# Using add_all() vs bulk_save_objects()
# session.add_all(users) # Creates 1000 separate INSERT statements
# Bulk insert (more efficient)
session.bulk_save_objects(users)
session.commit()
Connection and Session Management
- Create a Session Factory: Don't create engine and session directly in route handlers
- Use Context Managers: Ensure sessions are properly closed
- Scoped Sessions: For web applications, use scoped sessions tied to request/response cycle
- Session Lifecycle: Be mindful of when objects are attached to sessions
Implementing a Session Factory Pattern:
# db.py - Database module
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
import os
# Get database URL from environment variable
DATABASE_URL = os.environ.get(
'DATABASE_URL',
'postgresql://pythonapp:secure_password@localhost:5432/python_app_db'
)
# Create engine
engine = create_engine(DATABASE_URL, pool_size=5, max_overflow=10)
# Create base class for models
Base = declarative_base()
# Create session factory
session_factory = sessionmaker(bind=engine)
# Create scoped session for web applications
# This ties sessions to the current thread or request
SessionLocal = scoped_session(session_factory)
# Helper function to get a database session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Create all tables
def init_db():
Base.metadata.create_all(bind=engine)
# Example usage in a Flask application
"""
from flask import Flask
from db import get_db, init_db
app = Flask(__name__)
@app.before_first_request
def setup():
init_db()
@app.route('/users')
def list_users():
db = next(get_db())
users = db.query(User).all()
return {'users': [user.username for user in users]}
"""
# Example usage in a FastAPI application
"""
from fastapi import FastAPI, Depends
from db import get_db, init_db
from models import User
app = FastAPI()
@app.on_event("startup")
def startup_event():
init_db()
@app.get('/users')
def list_users(db = Depends(get_db)):
users = db.query(User).all()
return {'users': [user.username for user in users]}
"""
Structuring SQLAlchemy Code
- Separation of Concerns: Keep models, database setup, and business logic separate
- Repository Pattern: Encapsulate database operations in repository classes
- Don't Expose ORM Objects: Convert to DTOs/serializable objects before returning from APIs
- Model Validation: Validate data before creating or updating models
Repository Pattern Example:
# models.py - Define SQLAlchemy models
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from db import Base
import datetime
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
password_hash = Column(String(128), nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
posts = relationship("Post", back_populates="author")
def __repr__(self):
return f"<User(username='{self.username}', email='{self.email}')>"
# repository.py - Repository classes for database operations
from sqlalchemy.orm import Session
from models import User
from typing import List, Optional, Dict, Any
class UserRepository:
def __init__(self, session: Session):
self.session = session
def get_by_id(self, user_id: int) -> Optional[User]:
return self.session.query(User).filter(User.id == user_id).first()
def get_by_username(self, username: str) -> Optional[User]:
return self.session.query(User).filter(User.username == username).first()
def get_all(self, skip: int = 0, limit: int = 100) -> List[User]:
return self.session.query(User).offset(skip).limit(limit).all()
def create(self, user_data: Dict[str, Any]) -> User:
user = User(**user_data)
self.session.add(user)
self.session.commit()
self.session.refresh(user)
return user
def update(self, user_id: int, user_data: Dict[str, Any]) -> Optional[User]:
user = self.get_by_id(user_id)
if user:
for key, value in user_data.items():
setattr(user, key, value)
self.session.commit()
self.session.refresh(user)
return user
def delete(self, user_id: int) -> bool:
user = self.get_by_id(user_id)
if user:
self.session.delete(user)
self.session.commit()
return True
return False
# schema.py - Pydantic models for validation and serialization
from pydantic import BaseModel, EmailStr
from typing import Optional, List
from datetime import datetime
class UserBase(BaseModel):
username: str
email: EmailStr
is_active: Optional[bool] = True
class UserCreate(UserBase):
password: str
class UserUpdate(BaseModel):
username: Optional[str] = None
email: Optional[EmailStr] = None
is_active: Optional[bool] = None
class UserResponse(UserBase):
id: int
created_at: datetime
class Config:
orm_mode = True
# service.py - Business logic layer
from repository import UserRepository
from schema import UserCreate, UserUpdate, UserResponse
from sqlalchemy.orm import Session
from typing import List, Optional
import hashlib
class UserService:
def __init__(self, db: Session):
self.repository = UserRepository(db)
def get_user(self, user_id: int) -> Optional[UserResponse]:
user = self.repository.get_by_id(user_id)
if user:
return UserResponse.from_orm(user)
return None
def get_users(self, skip: int = 0, limit: int = 100) -> List[UserResponse]:
users = self.repository.get_all(skip, limit)
return [UserResponse.from_orm(user) for user in users]
def create_user(self, user_data: UserCreate) -> UserResponse:
# Check if username already exists
existing_user = self.repository.get_by_username(user_data.username)
if existing_user:
raise ValueError("Username already exists")
# Hash password
hashed_password = hashlib.sha256(user_data.password.encode()).hexdigest()
# Create user
user_dict = user_data.dict(exclude={"password"})
user_dict["password_hash"] = hashed_password
user = self.repository.create(user_dict)
return UserResponse.from_orm(user)
def update_user(self, user_id: int, user_data: UserUpdate) -> Optional[UserResponse]:
user = self.repository.update(user_id, user_data.dict(exclude_unset=True))
if user:
return UserResponse.from_orm(user)
return None
def delete_user(self, user_id: int) -> bool:
return self.repository.delete(user_id)
# Example usage
from db import SessionLocal
def main():
db = SessionLocal()
try:
# Create user service
user_service = UserService(db)
# Create a user
new_user = UserCreate(
username="johndoe",
email="john@example.com",
password="secure_password"
)
user = user_service.create_user(new_user)
print(f"Created user: {user.username}, ID: {user.id}")
# Get all users
users = user_service.get_users()
print(f"Total users: {len(users)}")
# Update user
updated_user = user_service.update_user(
user.id,
UserUpdate(email="john.doe@example.com")
)
print(f"Updated user email: {updated_user.email}")
# Delete user
deleted = user_service.delete_user(user.id)
print(f"User deleted: {deleted}")
finally:
db.close()
if __name__ == "__main__":
main()
Practical Activities
Activity 1: Building a Blog API with SQLAlchemy
Create a simple blog API using SQLAlchemy with the following features:
- Define models for User, Post, and Comment with appropriate relationships
- Implement repository classes for each model
- Create API endpoints for:
- User registration and authentication
- Creating, reading, updating, and deleting posts
- Adding and retrieving comments on posts
- Add validation for all input data
- Implement proper error handling and response formatting
Activity 2: Data Migration Script
Create a script that demonstrates data migration between different database schemas:
- Define an "old" schema with a simple users table
- Define a "new" schema with an enhanced users table (additional fields)
- Write a migration script that:
- Reads data from the old schema
- Transforms it as needed
- Writes it to the new schema
- Handles potential data conversion issues
Activity 3: PostgreSQL-Specific Features
Build an application that showcases PostgreSQL-specific features with SQLAlchemy:
- Implement a product catalog with:
- JSON/JSONB columns for storing complex product attributes
- Array columns for storing tags and categories
- Full-text search capabilities for product descriptions
- Create a user interface or API endpoints to interact with the catalog
- Implement advanced filtering and searching functionality
- Add performance metrics to measure query execution time
Key Takeaways
- ORM (Object-Relational Mapping) bridges the gap between object-oriented programming and relational databases
- SQLAlchemy is a powerful, flexible ORM for Python that supports both high-level object operations and low-level SQL commands
- The declarative approach makes it easy to define your database schema as Python classes
- SQLAlchemy's session provides a workspace for tracking changes to objects before committing them to the database
- Relationships in SQLAlchemy allow you to navigate between related objects without writing explicit JOIN queries
- The query API offers a rich set of methods for filtering, sorting, and retrieving data
- SQLAlchemy supports PostgreSQL-specific features like arrays, JSON/JSONB, and full-text search
- Following best practices like using the repository pattern can help keep your database code organized and maintainable
- Performance optimizations like eager loading and bulk operations can significantly improve application speed
- SQLAlchemy's flexible architecture allows you to choose the right level of abstraction for your specific needs
Further Learning Resources
- SQLAlchemy Documentation
- PostgreSQL Documentation
- "Essential SQLAlchemy" by Jason Myers and Rick Copeland
- "Building Database Applications with SQLAlchemy" by Robert Picard
- "Fluent Python" by Luciano Ramalho (for better understanding of Python and ORM concepts)