Introduction to CRUD Operations
CRUD stands for Create, Read, Update, and Delete—the four basic operations for managing data in persistent storage. These operations form the foundation of most data-driven applications, from simple blogs to complex enterprise systems.
Flask-SQLAlchemy provides a clean, intuitive API for performing CRUD operations using Python objects rather than raw SQL. This object-oriented approach not only makes your code more readable and maintainable but also helps prevent common issues like SQL injection.
Think of CRUD operations like managing a library of books:
- Create: Adding a new book to the library
- Read: Looking up books by title, author, or browsing the collection
- Update: Changing details about a book, like its location or condition
- Delete: Removing a book from the library when it's no longer needed
Working with the Database Session
Before diving into CRUD operations, it's important to understand the SQLAlchemy session, which manages interactions with the database.
In Flask-SQLAlchemy, you access the session via the db.session object. The session
is like a staging area where you prepare changes before committing them to the database.
Key session operations include:
db.session.add(obj): Add a new or modified object to the sessiondb.session.add_all([obj1, obj2, ...]): Add multiple objects to the sessiondb.session.delete(obj): Mark an object for deletiondb.session.commit(): Persist all changes to the databasedb.session.rollback(): Revert uncommitted changesdb.session.flush(): Similar to commit but within a transaction
The session follows the Unit of Work pattern, collecting all changes and applying them as a single transaction
when you call commit(). This approach has several benefits:
- Changes are atomic—either all succeed or all fail
- Improved performance by batching database operations
- Ability to review and validate changes before committing
- Option to rollback if something goes wrong
Create Operations
Creating new records involves instantiating model objects, adding them to the session, and committing the session.
Basic Create Operation
# Assuming we have User model
user = User(username='john_doe', email='john@example.com')
db.session.add(user)
db.session.commit() # Don't forget to commit!
After commit, SQLAlchemy updates the object with any database-generated values (like auto-incrementing IDs):
print(user.id) # Now has a value from the database
Creating Related Objects
You can create related objects in several ways:
# Method 1: Create separately and link
user = User(username='jane_doe', email='jane@example.com')
db.session.add(user)
db.session.flush() # Get the user ID without committing transaction
post = Post(title='My First Post', content='Hello, world!', user_id=user.id)
db.session.add(post)
db.session.commit()
# Method 2: Use the relationship
user = User(username='bob_smith', email='bob@example.com')
post = Post(title='Welcome', content='My blog post')
# Link objects via relationship
user.posts.append(post) # Automatically sets post.user_id
db.session.add(user) # Adding the parent often adds related children too
db.session.commit()
# Method 3: Create with relationship attribute
post = Post(title='Another Post', content='More content',
author=user) # Using 'author' backref
db.session.add(post)
db.session.commit()
Bulk Insert Operations
For inserting many records efficiently:
# Create multiple objects
users = [
User(username='user1', email='user1@example.com'),
User(username='user2', email='user2@example.com'),
User(username='user3', email='user3@example.com')
]
# Add all at once
db.session.add_all(users)
db.session.commit()
For very large datasets, consider using bulk_insert_mappings for better performance:
user_data = [
{'username': f'bulk_user{i}', 'email': f'bulk{i}@example.com'}
for i in range(1, 1001) # Creating 1000 users
]
db.session.bulk_insert_mappings(User, user_data)
db.session.commit()
Read Operations
Reading data is performed through queries, which are built using the query attribute
of model classes.
Basic Queries
# Get all users
all_users = User.query.all()
# Get first user
first_user = User.query.first()
# Get specific user by primary key
user = User.query.get(1) # Returns None if not found
# Get or 404 (useful in Flask routes)
user = User.query.get_or_404(1) # Raises 404 error if not found
Filtering Queries
# Filter by a column value
admin_users = User.query.filter_by(is_admin=True).all()
# More complex filter with expressions
recent_users = User.query.filter(
User.created_at > datetime(2023, 1, 1)
).all()
# Multiple conditions (AND)
filtered_users = User.query.filter(
User.is_active == True,
User.age >= 18
).all()
# OR conditions
from sqlalchemy import or_
results = User.query.filter(
or_(User.username == 'admin', User.email.endswith('@admin.com'))
).all()
# LIKE queries
search_results = User.query.filter(
User.username.like('%john%') # Case-sensitive
).all()
# ILIKE (case-insensitive) in PostgreSQL
if db.engine.name == 'postgresql':
search_results = User.query.filter(
User.username.ilike('%john%')
).all()
else:
# SQLite alternative
search_results = User.query.filter(
User.username.like('%john%')
).all()
Ordering Results
# Order by username ascending
ordered_users = User.query.order_by(User.username).all()
# Order by creation date descending
newest_users = User.query.order_by(User.created_at.desc()).all()
# Multiple ordering criteria
users = User.query.order_by(User.is_admin.desc(), User.username).all()
Limiting Results and Pagination
# Limit to first 10 users
top_users = User.query.limit(10).all()
# Skip first 10, get next 10 (offset & limit)
page_2 = User.query.offset(10).limit(10).all()
# Using Flask-SQLAlchemy's paginate method
page = request.args.get('page', 1, type=int)
per_page = 10
pagination = User.query.paginate(page=page, per_page=per_page)
# Now you can access:
pagination.items # List of items on this page
pagination.page # Current page number
pagination.per_page # Items per page
pagination.total # Total number of items
pagination.pages # Total number of pages
pagination.has_next # True if there's a next page
pagination.has_prev # True if there's a previous page
pagination.next_num # Next page number
pagination.prev_num # Previous page number
Joins and Relationships
# Eager loading with joinedload (to avoid N+1 query problem)
from sqlalchemy.orm import joinedload
users_with_posts = User.query.options(joinedload(User.posts)).all()
# For each user, accessing posts won't trigger additional queries
for user in users_with_posts:
print(f"{user.username} has {len(user.posts)} posts")
# Filter by related objects
users_with_posts = User.query.filter(User.posts.any()).all()
# Users with posts containing a specific title
users = User.query.filter(
User.posts.any(Post.title.like('%Flask%'))
).all()
# Explicit joins
from sqlalchemy.orm import join
results = db.session.query(User, Post).join(Post).filter(
Post.title.like('%Flask%')
).all()
# Now results contains tuples of (user, post)
Aggregation and Grouping
from sqlalchemy import func
# Count all users
user_count = User.query.count()
# Count by group
user_counts = db.session.query(
User.is_admin, func.count(User.id)
).group_by(User.is_admin).all()
# Result is list of tuples: [(False, 95), (True, 5)]
# More complex aggregation
from sqlalchemy import func, desc
# Get users with post counts, ordered by count
post_counts = db.session.query(
User.username,
func.count(Post.id).label('post_count')
).join(Post).group_by(User.id).order_by(desc('post_count')).all()
# Get average, min, max age
age_stats = db.session.query(
func.avg(User.age).label('avg_age'),
func.min(User.age).label('min_age'),
func.max(User.age).label('max_age')
).one()
print(f"Age stats: avg={age_stats.avg_age}, min={age_stats.min_age}, max={age_stats.max_age}")
Update Operations
Updates involve modifying existing objects and committing the changes.
Updating a Single Object
# Get the object
user = User.query.get(1)
# Modify attributes
user.username = 'new_username'
user.email = 'new_email@example.com'
# Save changes
db.session.commit()
Behind the scenes, SQLAlchemy tracks changes to the object and generates the appropriate SQL UPDATE statement when you commit.
Bulk Updates
# Update all users with a specific condition
User.query.filter_by(is_active=False).update({'last_login': None})
db.session.commit()
# More complex update with expression
from sqlalchemy import text
User.query.filter(User.login_count > 100).update(
{'rank': text('login_count / 10')},
synchronize_session=False # Don't try to update Python objects
)
db.session.commit()
The synchronize_session parameter controls how SQLAlchemy updates Python objects after a bulk update:
'evaluate'(default) - Evaluates the update in Python'fetch'- Fetches updated objects from the databaseFalse- Doesn't update Python objects (fastest, but session may be inconsistent)
Conditional Updates
# Atomic increment (avoids race conditions)
post = Post.query.get(1)
Post.query.filter_by(id=post.id).update(
{Post.view_count: Post.view_count + 1}
)
db.session.commit()
Delete Operations
Deleting records involves marking objects for deletion and committing the changes.
Deleting a Single Object
# Get the object
user = User.query.get(1)
# Mark for deletion
db.session.delete(user)
# Commit the deletion
db.session.commit()
Bulk Deletes
# Delete all inactive users
User.query.filter_by(is_active=False).delete()
db.session.commit()
# More complex condition
from datetime import datetime, timedelta
one_year_ago = datetime.utcnow() - timedelta(days=365)
inactive_old_users = User.query.filter(
User.is_active == False,
User.last_login < one_year_ago
).delete()
db.session.commit()
print(f"Deleted {inactive_old_users} users")
Cascade Deletes
Be careful with deletions when you have relationships. By default, SQLAlchemy doesn't automatically delete related objects. You can configure cascade behavior in the relationship definition:
class User(db.Model):
# ...
# This will automatically delete all user's posts when user is deleted
posts = db.relationship('Post', backref='author', lazy=True,
cascade='all, delete-orphan')
Common cascade options:
save-update- Changes to parent propagate to childrendelete- When parent is deleted, delete childrendelete-orphan- When child is removed from parent, delete childall- Shorthand for 'save-update, merge, refresh-expire, expunge, delete'
Soft Deletes
Instead of physically removing records, you can implement "soft deletes" by flagging records as deleted:
class SoftDeleteMixin:
deleted_at = db.Column(db.DateTime, nullable=True)
def soft_delete(self):
self.deleted_at = datetime.utcnow()
@property
def is_deleted(self):
return self.deleted_at is not None
class User(db.Model, SoftDeleteMixin):
# ...
# Add a query property that filters deleted items by default
@classmethod
def get_active_query(cls):
return cls.query.filter_by(deleted_at=None)
# Usage
# Soft delete a user
user = User.query.get(1)
user.soft_delete()
db.session.commit()
# Query only active users
active_users = User.get_active_query().all()
Transactions and Error Handling
Transactions ensure that a group of database operations succeed or fail together, maintaining data integrity.
Basic Transaction Pattern
try:
# Perform database operations
user = User(username='transaction_test', email='test@example.com')
db.session.add(user)
post = Post(title='Transaction Post', content='Testing transactions', author=user)
db.session.add(post)
# If everything succeeded, commit the transaction
db.session.commit()
print("Transaction successful!")
except Exception as e:
# If anything failed, roll back the transaction
db.session.rollback()
print(f"Transaction failed: {e}")
# Handle the error appropriately
finally:
# Ensure the session is properly managed
db.session.close()
Using Context Managers
You can use db.session.begin() as a context manager for cleaner transaction code:
try:
with db.session.begin():
# All operations in this block are part of the same transaction
user = User(username='context_test', email='context@example.com')
db.session.add(user)
post = Post(title='Context Post', content='Testing context manager', author=user)
db.session.add(post)
# No need to call commit - it happens automatically if no exceptions occur
print("Transaction successful!")
except Exception as e:
# No need to call rollback - it happens automatically on exception
print(f"Transaction failed: {e}")
# Handle the error appropriately
Nested Transactions with Savepoints
For more complex scenarios, you can use savepoints to create nested transactions:
try:
# Start main transaction
with db.session.begin():
# Create a user
user = User(username='nested_test', email='nested@example.com')
db.session.add(user)
# Create a savepoint
savepoint = db.session.begin_nested()
try:
# Try to create a post
post = Post(title='', content='') # Missing required title/content
db.session.add(post)
savepoint.commit()
except Exception as e:
# Savepoint rollback - just this part fails
savepoint.rollback()
print(f"Couldn't create post: {e}, but user will still be created")
# Create a valid post instead
post = Post(title='Valid Post', content='Content', author=user)
db.session.add(post)
print("Transaction completed")
except Exception as e:
print(f"Entire transaction failed: {e}")
# Even though we handle the savepoint exception, other exceptions would
# still roll back the entire transaction
Advanced Query Techniques
Subqueries
from sqlalchemy import func
# Find users with more posts than average
avg_posts = db.session.query(func.avg(
db.session.query(func.count(Post.id))
.filter(Post.user_id == User.id)
.correlate(User)
.scalar_subquery()
)).scalar()
prolific_users = User.query.having(
func.count(Post.id) > avg_posts
).join(User.posts).group_by(User.id).all()
Common Table Expressions (CTEs)
from sqlalchemy import func, select
from sqlalchemy.sql import text
# This requires SQLAlchemy 1.4+ and a database that supports CTEs
post_counts = select([
Post.user_id,
func.count().label('post_count')
]).group_by(Post.user_id).cte('post_counts')
users_with_counts = db.session.query(
User,
post_counts.c.post_count
).join(
post_counts,
User.id == post_counts.c.user_id
).order_by(post_counts.c.post_count.desc()).all()
Raw SQL Queries
When you need to run raw SQL (for performance or complex queries):
# Execute raw SQL and get results
sql = text("SELECT username, email FROM users WHERE id < :max_id ORDER BY username")
result = db.session.execute(sql, {'max_id': 100})
for row in result:
print(f"Username: {row.username}, Email: {row.email}")
# Load model instances from raw SQL
sql = text("SELECT * FROM users WHERE username LIKE :pattern")
users = db.session.execute(sql, {'pattern': '%admin%'}).scalars(User).all()
# For full safety from SQL injection, always use parameterized queries
Performance Optimization
Eager Loading vs. Lazy Loading
By default, relationships are lazy loaded (loaded only when accessed). This can lead to the "N+1 query problem" when you load many objects and access their relationships.
# N+1 problem example
users = User.query.limit(100).all()
for user in users:
# This will trigger a separate query for each user
print(f"{user.username} has {len(user.posts)} posts")
Solutions include:
# Eager loading with joinedload (creates a JOIN)
users = User.query.options(joinedload(User.posts)).limit(100).all()
# Eager loading with subqueryload (uses a subquery, better for collections)
users = User.query.options(subqueryload(User.posts)).limit(100).all()
# Eager loading with selectinload (uses a separate SELECT IN query)
users = User.query.options(selectinload(User.posts)).limit(100).all()
Choose the appropriate loading method based on your data model and access patterns:
joinedload: Best for one-to-one or small collectionssubqueryload: Good for many-to-many or larger collectionsselectinload: Efficient for loading many collections
Database Indexing
Properly indexed columns dramatically improve query performance:
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(120), unique=True, index=True) # Indexed
username = db.Column(db.String(80), unique=True, index=True) # Indexed
# Rarely queried column, no index needed
bio = db.Column(db.Text)
# Composite index for common query pattern
__table_args__ = (
db.Index('idx_user_active_created', 'is_active', 'created_at'),
)
Best practices for indexing:
- Index columns used in WHERE, JOIN, and ORDER BY clauses
- Consider composite indexes for common query patterns
- Don't overindex - each index has a maintenance cost
- Index foreign keys
Query Profiling
You can log slow queries to identify optimization opportunities:
# Enable query logging (in development)
app.config['SQLALCHEMY_ECHO'] = True
# For more detailed profiling, use Flask-DebugToolbar or similar tools
Real-world Example: CRUD Operations in a Flask Application
Let's implement a complete CRUD flow for a blog application:
from flask import Flask, render_template, request, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///blog.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = 'your-secret-key'
db = SQLAlchemy(app)
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100), nullable=False)
content = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def __repr__(self):
return f"Post('{self.title}', '{self.created_at}')"
@app.route('/')
def home():
# READ operation - Get all posts, newest first
posts = Post.query.order_by(Post.created_at.desc()).all()
return render_template('home.html', posts=posts)
@app.route('/post/')
def post_detail(post_id):
# READ operation - Get specific post
post = Post.query.get_or_404(post_id)
return render_template('post_detail.html', post=post)
@app.route('/post/new', methods=['GET', 'POST'])
def new_post():
if request.method == 'POST':
# CREATE operation
title = request.form['title']
content = request.form['content']
# Validate data
if not title or not content:
flash('Title and content are required!', 'danger')
return render_template('post_form.html')
# Create and save the post
post = Post(title=title, content=content)
db.session.add(post)
try:
db.session.commit()
flash('Post created successfully!', 'success')
return redirect(url_for('post_detail', post_id=post.id))
except Exception as e:
db.session.rollback()
flash(f'Error creating post: {str(e)}', 'danger')
return render_template('post_form.html')
# GET request - show the form
return render_template('post_form.html')
@app.route('/post//edit', methods=['GET', 'POST'])
def edit_post(post_id):
# Get the post to edit
post = Post.query.get_or_404(post_id)
if request.method == 'POST':
# UPDATE operation
title = request.form['title']
content = request.form['content']
# Validate data
if not title or not content:
flash('Title and content are required!', 'danger')
return render_template('post_form.html', post=post)
# Update the post
post.title = title
post.content = content
try:
db.session.commit()
flash('Post updated successfully!', 'success')
return redirect(url_for('post_detail', post_id=post.id))
except Exception as e:
db.session.rollback()
flash(f'Error updating post: {str(e)}', 'danger')
return render_template('post_form.html', post=post)
# GET request - show the form with current data
return render_template('post_form.html', post=post)
@app.route('/post//delete', methods=['POST'])
def delete_post(post_id):
# DELETE operation
post = Post.query.get_or_404(post_id)
try:
db.session.delete(post)
db.session.commit()
flash('Post deleted successfully!', 'success')
return redirect(url_for('home'))
except Exception as e:
db.session.rollback()
flash(f'Error deleting post: {str(e)}', 'danger')
return redirect(url_for('post_detail', post_id=post.id))
if __name__ == '__main__':
db.create_all() # Create tables
app.run(debug=True)
This example demonstrates:
- Creating posts with validation and error handling
- Reading posts with both list and detail views
- Updating posts with pre-populated forms
- Deleting posts with confirmation
- Proper transaction management
- User feedback with flash messages
Practical Activity: Task Manager API
Let's apply what we've learned by creating a RESTful API for a task manager application:
- Create a Task model with fields for title, description, due date, status, and priority
- Implement API endpoints for listing, creating, retrieving, updating, and deleting tasks
- Add filtering and sorting capabilities
- Include proper error handling and validation
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///tasks.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
class Task(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
due_date = db.Column(db.Date, nullable=True)
status = db.Column(db.String(20), default='pending')
priority = db.Column(db.Integer, default=1) # 1=low, 2=medium, 3=high
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'title': self.title,
'description': self.description,
'due_date': self.due_date.isoformat() if self.due_date else None,
'status': self.status,
'priority': self.priority,
'created_at': self.created_at.isoformat()
}
@app.route('/api/tasks', methods=['GET'])
def get_tasks():
# Get query parameters for filtering
status = request.args.get('status')
priority = request.args.get('priority', type=int)
# Start with base query
query = Task.query
# Apply filters if provided
if status:
query = query.filter_by(status=status)
if priority:
query = query.filter_by(priority=priority)
# Sort by due date, with tasks without due dates at the end
tasks = query.order_by(
# This sorts NULL due_dates last
Task.due_date.is_(None),
Task.due_date,
Task.priority.desc()
).all()
return jsonify([task.to_dict() for task in tasks])
@app.route('/api/tasks/', methods=['GET'])
def get_task(task_id):
task = Task.query.get_or_404(task_id)
return jsonify(task.to_dict())
@app.route('/api/tasks', methods=['POST'])
def create_task():
data = request.get_json()
# Validate required fields
if not data or 'title' not in data:
return jsonify({'error': 'Title is required'}), 400
# Parse due date if provided
due_date = None
if 'due_date' in data and data['due_date']:
try:
due_date = datetime.fromisoformat(data['due_date']).date()
except ValueError:
return jsonify({'error': 'Invalid due date format. Use ISO format (YYYY-MM-DD)'}), 400
# Create new task
task = Task(
title=data['title'],
description=data.get('description', ''),
due_date=due_date,
status=data.get('status', 'pending'),
priority=data.get('priority', 1)
)
db.session.add(task)
try:
db.session.commit()
return jsonify(task.to_dict()), 201
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500
@app.route('/api/tasks/', methods=['PUT'])
def update_task(task_id):
task = Task.query.get_or_404(task_id)
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
# Update fields if provided
if 'title' in data:
task.title = data['title']
if 'description' in data:
task.description = data['description']
if 'status' in data:
task.status = data['status']
if 'priority' in data:
task.priority = data['priority']
if 'due_date' in data:
if data['due_date']:
try:
task.due_date = datetime.fromisoformat(data['due_date']).date()
except ValueError:
return jsonify({'error': 'Invalid due date format. Use ISO format (YYYY-MM-DD)'}), 400
else:
task.due_date = None
try:
db.session.commit()
return jsonify(task.to_dict())
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500
@app.route('/api/tasks/', methods=['DELETE'])
def delete_task(task_id):
task = Task.query.get_or_404(task_id)
try:
db.session.delete(task)
db.session.commit()
return jsonify({'message': 'Task deleted successfully'}), 200
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500
# Additional endpoint for bulk operations
@app.route('/api/tasks/bulk/update_status', methods=['POST'])
def bulk_update_status():
data = request.get_json()
if not data or 'task_ids' not in data or 'status' not in data:
return jsonify({'error': 'task_ids and status are required'}), 400
task_ids = data['task_ids']
new_status = data['status']
try:
# Bulk update
updated = Task.query.filter(Task.id.in_(task_ids)).update(
{'status': new_status},
synchronize_session=False
)
db.session.commit()
return jsonify({'message': f'Updated {updated} tasks'}), 200
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
db.create_all()
app.run(debug=True)
Tasks for extending this API:
- Add pagination to the tasks list endpoint
- Implement search functionality (e.g., search tasks by title or description)
- Add user authentication and task ownership
- Implement task tags or categories with a many-to-many relationship
- Add task completion history with a one-to-many relationship
Key Takeaways
- CRUD operations (Create, Read, Update, Delete) form the foundation of data manipulation in web applications
- The SQLAlchemy session manages changes to objects and transactions with the database
- Query building provides a flexible way to retrieve and filter data
- Transactions ensure data integrity by grouping operations that must succeed or fail together
- Performance optimization techniques include proper indexing and eager loading of relationships
- Error handling is essential for robust database operations
- Flask-SQLAlchemy simplifies the implementation of CRUD operations in Flask applications