CRUD Operations with SQLAlchemy

Mastering Database Interactions in Flask Applications

Introduction to CRUD Operations

CRUD stands for Create, Read, Update, and Delete—the four basic operations for managing data in persistent storage. These operations form the foundation of most data-driven applications, from simple blogs to complex enterprise systems.

Flask-SQLAlchemy provides a clean, intuitive API for performing CRUD operations using Python objects rather than raw SQL. This object-oriented approach not only makes your code more readable and maintainable but also helps prevent common issues like SQL injection.

graph TD A[Create] --> B[Read] B --> C[Update] C --> D[Delete] D --> A style A fill:#4CAF50,stroke:#333,stroke-width:2px style B fill:#2196F3,stroke:#333,stroke-width:2px style C fill:#FFC107,stroke:#333,stroke-width:2px style D fill:#F44336,stroke:#333,stroke-width:2px

Think of CRUD operations like managing a library of books:

Working with the Database Session

Before diving into CRUD operations, it's important to understand the SQLAlchemy session, which manages interactions with the database.

In Flask-SQLAlchemy, you access the session via the db.session object. The session is like a staging area where you prepare changes before committing them to the database.

graph LR A[Python Objects] -->|Add to session| B[Session] B -->|Commit| C[Database] B -->|Rollback| A C -->|Query| B B -->|Load| A

Key session operations include:

The session follows the Unit of Work pattern, collecting all changes and applying them as a single transaction when you call commit(). This approach has several benefits:

Create Operations

Creating new records involves instantiating model objects, adding them to the session, and committing the session.

Basic Create Operation

# Assuming we have User model
user = User(username='john_doe', email='john@example.com')
db.session.add(user)
db.session.commit()  # Don't forget to commit!

After commit, SQLAlchemy updates the object with any database-generated values (like auto-incrementing IDs):

print(user.id)  # Now has a value from the database

Creating Related Objects

You can create related objects in several ways:

# Method 1: Create separately and link
user = User(username='jane_doe', email='jane@example.com')
db.session.add(user)
db.session.flush()  # Get the user ID without committing transaction

post = Post(title='My First Post', content='Hello, world!', user_id=user.id)
db.session.add(post)
db.session.commit()

# Method 2: Use the relationship
user = User(username='bob_smith', email='bob@example.com')
post = Post(title='Welcome', content='My blog post')

# Link objects via relationship
user.posts.append(post)  # Automatically sets post.user_id

db.session.add(user)  # Adding the parent often adds related children too
db.session.commit()

# Method 3: Create with relationship attribute
post = Post(title='Another Post', content='More content', 
            author=user)  # Using 'author' backref
db.session.add(post)
db.session.commit()

Bulk Insert Operations

For inserting many records efficiently:

# Create multiple objects
users = [
    User(username='user1', email='user1@example.com'),
    User(username='user2', email='user2@example.com'),
    User(username='user3', email='user3@example.com')
]

# Add all at once
db.session.add_all(users)
db.session.commit()

For very large datasets, consider using bulk_insert_mappings for better performance:

user_data = [
    {'username': f'bulk_user{i}', 'email': f'bulk{i}@example.com'}
    for i in range(1, 1001)  # Creating 1000 users
]

db.session.bulk_insert_mappings(User, user_data)
db.session.commit()

Read Operations

Reading data is performed through queries, which are built using the query attribute of model classes.

Basic Queries

# Get all users
all_users = User.query.all()

# Get first user
first_user = User.query.first()

# Get specific user by primary key
user = User.query.get(1)  # Returns None if not found

# Get or 404 (useful in Flask routes)
user = User.query.get_or_404(1)  # Raises 404 error if not found

Filtering Queries

# Filter by a column value
admin_users = User.query.filter_by(is_admin=True).all()

# More complex filter with expressions
recent_users = User.query.filter(
    User.created_at > datetime(2023, 1, 1)
).all()

# Multiple conditions (AND)
filtered_users = User.query.filter(
    User.is_active == True,
    User.age >= 18
).all()

# OR conditions
from sqlalchemy import or_
results = User.query.filter(
    or_(User.username == 'admin', User.email.endswith('@admin.com'))
).all()

# LIKE queries
search_results = User.query.filter(
    User.username.like('%john%')  # Case-sensitive
).all()

# ILIKE (case-insensitive) in PostgreSQL
if db.engine.name == 'postgresql':
    search_results = User.query.filter(
        User.username.ilike('%john%')
    ).all()
else:
    # SQLite alternative
    search_results = User.query.filter(
        User.username.like('%john%')
    ).all()

Ordering Results

# Order by username ascending
ordered_users = User.query.order_by(User.username).all()

# Order by creation date descending
newest_users = User.query.order_by(User.created_at.desc()).all()

# Multiple ordering criteria
users = User.query.order_by(User.is_admin.desc(), User.username).all()

Limiting Results and Pagination

# Limit to first 10 users
top_users = User.query.limit(10).all()

# Skip first 10, get next 10 (offset & limit)
page_2 = User.query.offset(10).limit(10).all()

# Using Flask-SQLAlchemy's paginate method
page = request.args.get('page', 1, type=int)
per_page = 10
pagination = User.query.paginate(page=page, per_page=per_page)

# Now you can access:
pagination.items  # List of items on this page
pagination.page  # Current page number
pagination.per_page  # Items per page
pagination.total  # Total number of items
pagination.pages  # Total number of pages
pagination.has_next  # True if there's a next page
pagination.has_prev  # True if there's a previous page
pagination.next_num  # Next page number
pagination.prev_num  # Previous page number

Joins and Relationships

# Eager loading with joinedload (to avoid N+1 query problem)
from sqlalchemy.orm import joinedload

users_with_posts = User.query.options(joinedload(User.posts)).all()

# For each user, accessing posts won't trigger additional queries
for user in users_with_posts:
    print(f"{user.username} has {len(user.posts)} posts")

# Filter by related objects
users_with_posts = User.query.filter(User.posts.any()).all()

# Users with posts containing a specific title
users = User.query.filter(
    User.posts.any(Post.title.like('%Flask%'))
).all()

# Explicit joins
from sqlalchemy.orm import join

results = db.session.query(User, Post).join(Post).filter(
    Post.title.like('%Flask%')
).all()

# Now results contains tuples of (user, post)

Aggregation and Grouping

from sqlalchemy import func

# Count all users
user_count = User.query.count()

# Count by group
user_counts = db.session.query(
    User.is_admin, func.count(User.id)
).group_by(User.is_admin).all()

# Result is list of tuples: [(False, 95), (True, 5)]

# More complex aggregation
from sqlalchemy import func, desc

# Get users with post counts, ordered by count
post_counts = db.session.query(
    User.username,
    func.count(Post.id).label('post_count')
).join(Post).group_by(User.id).order_by(desc('post_count')).all()

# Get average, min, max age
age_stats = db.session.query(
    func.avg(User.age).label('avg_age'),
    func.min(User.age).label('min_age'),
    func.max(User.age).label('max_age')
).one()

print(f"Age stats: avg={age_stats.avg_age}, min={age_stats.min_age}, max={age_stats.max_age}")

Update Operations

Updates involve modifying existing objects and committing the changes.

Updating a Single Object

# Get the object
user = User.query.get(1)

# Modify attributes
user.username = 'new_username'
user.email = 'new_email@example.com'

# Save changes
db.session.commit()

Behind the scenes, SQLAlchemy tracks changes to the object and generates the appropriate SQL UPDATE statement when you commit.

Bulk Updates

# Update all users with a specific condition
User.query.filter_by(is_active=False).update({'last_login': None})
db.session.commit()

# More complex update with expression
from sqlalchemy import text
User.query.filter(User.login_count > 100).update(
    {'rank': text('login_count / 10')},
    synchronize_session=False  # Don't try to update Python objects
)
db.session.commit()

The synchronize_session parameter controls how SQLAlchemy updates Python objects after a bulk update:

Conditional Updates

# Atomic increment (avoids race conditions)
post = Post.query.get(1)
Post.query.filter_by(id=post.id).update(
    {Post.view_count: Post.view_count + 1}
)
db.session.commit()

Delete Operations

Deleting records involves marking objects for deletion and committing the changes.

Deleting a Single Object

# Get the object
user = User.query.get(1)

# Mark for deletion
db.session.delete(user)

# Commit the deletion
db.session.commit()

Bulk Deletes

# Delete all inactive users
User.query.filter_by(is_active=False).delete()
db.session.commit()

# More complex condition
from datetime import datetime, timedelta
one_year_ago = datetime.utcnow() - timedelta(days=365)

inactive_old_users = User.query.filter(
    User.is_active == False,
    User.last_login < one_year_ago
).delete()

db.session.commit()
print(f"Deleted {inactive_old_users} users")

Cascade Deletes

Be careful with deletions when you have relationships. By default, SQLAlchemy doesn't automatically delete related objects. You can configure cascade behavior in the relationship definition:

class User(db.Model):
    # ...
    # This will automatically delete all user's posts when user is deleted
    posts = db.relationship('Post', backref='author', lazy=True,
                           cascade='all, delete-orphan')

Common cascade options:

Soft Deletes

Instead of physically removing records, you can implement "soft deletes" by flagging records as deleted:

class SoftDeleteMixin:
    deleted_at = db.Column(db.DateTime, nullable=True)
    
    def soft_delete(self):
        self.deleted_at = datetime.utcnow()
        
    @property
    def is_deleted(self):
        return self.deleted_at is not None
        
class User(db.Model, SoftDeleteMixin):
    # ...
    
    # Add a query property that filters deleted items by default
    @classmethod
    def get_active_query(cls):
        return cls.query.filter_by(deleted_at=None)
        
# Usage
# Soft delete a user
user = User.query.get(1)
user.soft_delete()
db.session.commit()

# Query only active users
active_users = User.get_active_query().all()

Transactions and Error Handling

Transactions ensure that a group of database operations succeed or fail together, maintaining data integrity.

Basic Transaction Pattern

try:
    # Perform database operations
    user = User(username='transaction_test', email='test@example.com')
    db.session.add(user)
    
    post = Post(title='Transaction Post', content='Testing transactions', author=user)
    db.session.add(post)
    
    # If everything succeeded, commit the transaction
    db.session.commit()
    print("Transaction successful!")
    
except Exception as e:
    # If anything failed, roll back the transaction
    db.session.rollback()
    print(f"Transaction failed: {e}")
    # Handle the error appropriately
    
finally:
    # Ensure the session is properly managed
    db.session.close()

Using Context Managers

You can use db.session.begin() as a context manager for cleaner transaction code:

try:
    with db.session.begin():
        # All operations in this block are part of the same transaction
        user = User(username='context_test', email='context@example.com')
        db.session.add(user)
        
        post = Post(title='Context Post', content='Testing context manager', author=user)
        db.session.add(post)
        
        # No need to call commit - it happens automatically if no exceptions occur
    
    print("Transaction successful!")
    
except Exception as e:
    # No need to call rollback - it happens automatically on exception
    print(f"Transaction failed: {e}")
    # Handle the error appropriately

Nested Transactions with Savepoints

For more complex scenarios, you can use savepoints to create nested transactions:

try:
    # Start main transaction
    with db.session.begin():
        # Create a user
        user = User(username='nested_test', email='nested@example.com')
        db.session.add(user)
        
        # Create a savepoint
        savepoint = db.session.begin_nested()
        try:
            # Try to create a post
            post = Post(title='', content='')  # Missing required title/content
            db.session.add(post)
            savepoint.commit()
        except Exception as e:
            # Savepoint rollback - just this part fails
            savepoint.rollback()
            print(f"Couldn't create post: {e}, but user will still be created")
            
            # Create a valid post instead
            post = Post(title='Valid Post', content='Content', author=user)
            db.session.add(post)
    
    print("Transaction completed")
    
except Exception as e:
    print(f"Entire transaction failed: {e}")
    # Even though we handle the savepoint exception, other exceptions would
    # still roll back the entire transaction

Advanced Query Techniques

Subqueries

from sqlalchemy import func

# Find users with more posts than average
avg_posts = db.session.query(func.avg(
    db.session.query(func.count(Post.id))
    .filter(Post.user_id == User.id)
    .correlate(User)
    .scalar_subquery()
)).scalar()

prolific_users = User.query.having(
    func.count(Post.id) > avg_posts
).join(User.posts).group_by(User.id).all()

Common Table Expressions (CTEs)

from sqlalchemy import func, select
from sqlalchemy.sql import text

# This requires SQLAlchemy 1.4+ and a database that supports CTEs
post_counts = select([
    Post.user_id,
    func.count().label('post_count')
]).group_by(Post.user_id).cte('post_counts')

users_with_counts = db.session.query(
    User,
    post_counts.c.post_count
).join(
    post_counts,
    User.id == post_counts.c.user_id
).order_by(post_counts.c.post_count.desc()).all()

Raw SQL Queries

When you need to run raw SQL (for performance or complex queries):

# Execute raw SQL and get results
sql = text("SELECT username, email FROM users WHERE id < :max_id ORDER BY username")
result = db.session.execute(sql, {'max_id': 100})

for row in result:
    print(f"Username: {row.username}, Email: {row.email}")
    
# Load model instances from raw SQL
sql = text("SELECT * FROM users WHERE username LIKE :pattern")
users = db.session.execute(sql, {'pattern': '%admin%'}).scalars(User).all()

# For full safety from SQL injection, always use parameterized queries

Performance Optimization

Eager Loading vs. Lazy Loading

By default, relationships are lazy loaded (loaded only when accessed). This can lead to the "N+1 query problem" when you load many objects and access their relationships.

# N+1 problem example
users = User.query.limit(100).all()
for user in users:
    # This will trigger a separate query for each user
    print(f"{user.username} has {len(user.posts)} posts")

Solutions include:

# Eager loading with joinedload (creates a JOIN)
users = User.query.options(joinedload(User.posts)).limit(100).all()

# Eager loading with subqueryload (uses a subquery, better for collections)
users = User.query.options(subqueryload(User.posts)).limit(100).all()

# Eager loading with selectinload (uses a separate SELECT IN query)
users = User.query.options(selectinload(User.posts)).limit(100).all()

Choose the appropriate loading method based on your data model and access patterns:

Database Indexing

Properly indexed columns dramatically improve query performance:

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    email = db.Column(db.String(120), unique=True, index=True)  # Indexed
    username = db.Column(db.String(80), unique=True, index=True)  # Indexed
    # Rarely queried column, no index needed
    bio = db.Column(db.Text)
    
    # Composite index for common query pattern
    __table_args__ = (
        db.Index('idx_user_active_created', 'is_active', 'created_at'),
    )

Best practices for indexing:

Query Profiling

You can log slow queries to identify optimization opportunities:

# Enable query logging (in development)
app.config['SQLALCHEMY_ECHO'] = True

# For more detailed profiling, use Flask-DebugToolbar or similar tools

Real-world Example: CRUD Operations in a Flask Application

Let's implement a complete CRUD flow for a blog application:

from flask import Flask, render_template, request, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///blog.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = 'your-secret-key'

db = SQLAlchemy(app)

class Post(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(100), nullable=False)
    content = db.Column(db.Text, nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    def __repr__(self):
        return f"Post('{self.title}', '{self.created_at}')"

@app.route('/')
def home():
    # READ operation - Get all posts, newest first
    posts = Post.query.order_by(Post.created_at.desc()).all()
    return render_template('home.html', posts=posts)

@app.route('/post/')
def post_detail(post_id):
    # READ operation - Get specific post
    post = Post.query.get_or_404(post_id)
    return render_template('post_detail.html', post=post)

@app.route('/post/new', methods=['GET', 'POST'])
def new_post():
    if request.method == 'POST':
        # CREATE operation
        title = request.form['title']
        content = request.form['content']
        
        # Validate data
        if not title or not content:
            flash('Title and content are required!', 'danger')
            return render_template('post_form.html')
        
        # Create and save the post
        post = Post(title=title, content=content)
        db.session.add(post)
        
        try:
            db.session.commit()
            flash('Post created successfully!', 'success')
            return redirect(url_for('post_detail', post_id=post.id))
        except Exception as e:
            db.session.rollback()
            flash(f'Error creating post: {str(e)}', 'danger')
            return render_template('post_form.html')
    
    # GET request - show the form
    return render_template('post_form.html')

@app.route('/post//edit', methods=['GET', 'POST'])
def edit_post(post_id):
    # Get the post to edit
    post = Post.query.get_or_404(post_id)
    
    if request.method == 'POST':
        # UPDATE operation
        title = request.form['title']
        content = request.form['content']
        
        # Validate data
        if not title or not content:
            flash('Title and content are required!', 'danger')
            return render_template('post_form.html', post=post)
        
        # Update the post
        post.title = title
        post.content = content
        
        try:
            db.session.commit()
            flash('Post updated successfully!', 'success')
            return redirect(url_for('post_detail', post_id=post.id))
        except Exception as e:
            db.session.rollback()
            flash(f'Error updating post: {str(e)}', 'danger')
            return render_template('post_form.html', post=post)
    
    # GET request - show the form with current data
    return render_template('post_form.html', post=post)

@app.route('/post//delete', methods=['POST'])
def delete_post(post_id):
    # DELETE operation
    post = Post.query.get_or_404(post_id)
    
    try:
        db.session.delete(post)
        db.session.commit()
        flash('Post deleted successfully!', 'success')
        return redirect(url_for('home'))
    except Exception as e:
        db.session.rollback()
        flash(f'Error deleting post: {str(e)}', 'danger')
        return redirect(url_for('post_detail', post_id=post.id))

if __name__ == '__main__':
    db.create_all()  # Create tables
    app.run(debug=True)

This example demonstrates:

Practical Activity: Task Manager API

Let's apply what we've learned by creating a RESTful API for a task manager application:

  1. Create a Task model with fields for title, description, due date, status, and priority
  2. Implement API endpoints for listing, creating, retrieving, updating, and deleting tasks
  3. Add filtering and sorting capabilities
  4. Include proper error handling and validation
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///tasks.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)

class Task(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(100), nullable=False)
    description = db.Column(db.Text)
    due_date = db.Column(db.Date, nullable=True)
    status = db.Column(db.String(20), default='pending')
    priority = db.Column(db.Integer, default=1)  # 1=low, 2=medium, 3=high
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    def to_dict(self):
        return {
            'id': self.id,
            'title': self.title,
            'description': self.description,
            'due_date': self.due_date.isoformat() if self.due_date else None,
            'status': self.status,
            'priority': self.priority,
            'created_at': self.created_at.isoformat()
        }

@app.route('/api/tasks', methods=['GET'])
def get_tasks():
    # Get query parameters for filtering
    status = request.args.get('status')
    priority = request.args.get('priority', type=int)
    
    # Start with base query
    query = Task.query
    
    # Apply filters if provided
    if status:
        query = query.filter_by(status=status)
    if priority:
        query = query.filter_by(priority=priority)
    
    # Sort by due date, with tasks without due dates at the end
    tasks = query.order_by(
        # This sorts NULL due_dates last
        Task.due_date.is_(None),
        Task.due_date,
        Task.priority.desc()
    ).all()
    
    return jsonify([task.to_dict() for task in tasks])

@app.route('/api/tasks/', methods=['GET'])
def get_task(task_id):
    task = Task.query.get_or_404(task_id)
    return jsonify(task.to_dict())

@app.route('/api/tasks', methods=['POST'])
def create_task():
    data = request.get_json()
    
    # Validate required fields
    if not data or 'title' not in data:
        return jsonify({'error': 'Title is required'}), 400
    
    # Parse due date if provided
    due_date = None
    if 'due_date' in data and data['due_date']:
        try:
            due_date = datetime.fromisoformat(data['due_date']).date()
        except ValueError:
            return jsonify({'error': 'Invalid due date format. Use ISO format (YYYY-MM-DD)'}), 400
    
    # Create new task
    task = Task(
        title=data['title'],
        description=data.get('description', ''),
        due_date=due_date,
        status=data.get('status', 'pending'),
        priority=data.get('priority', 1)
    )
    
    db.session.add(task)
    
    try:
        db.session.commit()
        return jsonify(task.to_dict()), 201
    except Exception as e:
        db.session.rollback()
        return jsonify({'error': str(e)}), 500

@app.route('/api/tasks/', methods=['PUT'])
def update_task(task_id):
    task = Task.query.get_or_404(task_id)
    data = request.get_json()
    
    if not data:
        return jsonify({'error': 'No data provided'}), 400
    
    # Update fields if provided
    if 'title' in data:
        task.title = data['title']
    if 'description' in data:
        task.description = data['description']
    if 'status' in data:
        task.status = data['status']
    if 'priority' in data:
        task.priority = data['priority']
    if 'due_date' in data:
        if data['due_date']:
            try:
                task.due_date = datetime.fromisoformat(data['due_date']).date()
            except ValueError:
                return jsonify({'error': 'Invalid due date format. Use ISO format (YYYY-MM-DD)'}), 400
        else:
            task.due_date = None
    
    try:
        db.session.commit()
        return jsonify(task.to_dict())
    except Exception as e:
        db.session.rollback()
        return jsonify({'error': str(e)}), 500

@app.route('/api/tasks/', methods=['DELETE'])
def delete_task(task_id):
    task = Task.query.get_or_404(task_id)
    
    try:
        db.session.delete(task)
        db.session.commit()
        return jsonify({'message': 'Task deleted successfully'}), 200
    except Exception as e:
        db.session.rollback()
        return jsonify({'error': str(e)}), 500

# Additional endpoint for bulk operations
@app.route('/api/tasks/bulk/update_status', methods=['POST'])
def bulk_update_status():
    data = request.get_json()
    
    if not data or 'task_ids' not in data or 'status' not in data:
        return jsonify({'error': 'task_ids and status are required'}), 400
    
    task_ids = data['task_ids']
    new_status = data['status']
    
    try:
        # Bulk update
        updated = Task.query.filter(Task.id.in_(task_ids)).update(
            {'status': new_status},
            synchronize_session=False
        )
        
        db.session.commit()
        return jsonify({'message': f'Updated {updated} tasks'}), 200
    except Exception as e:
        db.session.rollback()
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    db.create_all()
    app.run(debug=True)

Tasks for extending this API:

  1. Add pagination to the tasks list endpoint
  2. Implement search functionality (e.g., search tasks by title or description)
  3. Add user authentication and task ownership
  4. Implement task tags or categories with a many-to-many relationship
  5. Add task completion history with a one-to-many relationship

Key Takeaways

Further Learning Resources