Database Integration with Flask-SQLAlchemy

Module 23: Web Frameworks II (Python) - Tuesday, Lecture 2

Why Do Web Applications Need Databases?

Before diving into Flask-SQLAlchemy, let's understand why databases are essential for web applications:

The Library Analogy

Think of a database as a library. A library organizes books (data) into shelves (tables) according to categories (schema). Librarians (database management systems) help find books quickly using the catalog system (indexes). They enforce library rules (constraints), ensure books are returned properly (transactions), and control who can access restricted collections (security). Similarly, databases organize, store, protect, and provide efficient access to your application's data.

graph TD A[Web Application] -->|Stores Data| B[(Database)] A -->|Retrieves Data| B A -->|Updates Data| B A -->|Deletes Data| B C[User 1] -->|Interacts with| A D[User 2] -->|Interacts with| A E[User 3] -->|Interacts with| A B -->|Ensures Persistence| F[Data Storage] B -->|Maintains Relationships| G[Data Relationships] B -->|Enforces Rules| H[Data Integrity] B -->|Optimizes Access| I[Efficient Queries]

Introduction to SQLAlchemy and Flask-SQLAlchemy

SQLAlchemy is a powerful SQL toolkit and Object-Relational Mapping (ORM) library for Python. It provides a full suite of well-established enterprise-level persistence patterns.

What is an ORM?

ORM (Object-Relational Mapping) is a programming technique that converts data between incompatible type systems in object-oriented programming languages and relational databases. It creates a "virtual object database" that can be used from within the programming language.

graph LR A[Python Object] -->|ORM Maps| B[(Relational Database)] B -->|ORM Translates| A subgraph Python A C[user = User(name='John')] end subgraph SQL B D[INSERT INTO users (name) VALUES ('John')] end

Flask-SQLAlchemy

Flask-SQLAlchemy is an extension for Flask that adds support for SQLAlchemy. It simplifies using SQLAlchemy with Flask by providing defaults and helpers that make common patterns easier to implement.

Key benefits of using Flask-SQLAlchemy include:

Setting Up Flask-SQLAlchemy

Let's start by installing and configuring Flask-SQLAlchemy in a Flask application:

Installation

pip install flask-sqlalchemy

For specific database engines, you'll also need to install the appropriate driver:

# For SQLite (usually pre-installed with Python)
# No additional driver needed

# For PostgreSQL
pip install psycopg2-binary

# For MySQL
pip install mysqlclient

# For SQL Server
pip install pyodbc

Basic Configuration

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# Configure the database URI
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'

# Optional: Disable modification tracking (reduces overhead)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

# Create the SQLAlchemy instance
db = SQLAlchemy(app)

# Now you can define models and work with the database

Database URIs for Different Database Engines

Database URI Format Example
SQLite sqlite:///path/to/file.db sqlite:///site.db
PostgreSQL postgresql://username:password@host:port/database postgresql://user:pass@localhost:5432/mydb
MySQL mysql://username:password@host:port/database mysql://user:pass@localhost:3306/mydb
SQL Server mssql+pyodbc://username:password@host:port/database?driver=DRIVER mssql+pyodbc://user:pass@server/mydb?driver=SQL+Server

Real-World Configuration Best Practice

In a production environment, it's best to separate configuration from code and use environment variables for sensitive information:

import os

app = Flask(__name__)

# Get database URI from environment variable or use a default for development
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:///dev.db')

# Other configuration settings
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = os.environ.get('SQLALCHEMY_ECHO', 'False').lower() == 'true'

Defining Models

Models are Python classes that represent tables in your database. With Flask-SQLAlchemy, you define models by creating classes that inherit from db.Model:

Basic Model Definition

class User(db.Model):
    __tablename__ = 'users'  # Optional: specify table name (defaults to lowercase class name)
    
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    def __repr__(self):
        return f'<User {self.username}>'

This defines a User model with four columns: id, username, email, and created_at.

Common Column Types

Column Type Python Type Description
db.Integer int Regular integer
db.String(size) str String with optional maximum length
db.Text str Unlimited length text
db.DateTime datetime Date and time
db.Float float Floating-point value
db.Boolean bool Boolean value
db.Enum enum.Enum Set of enumerated values
db.JSON dict, list JSON-formatted data
db.LargeBinary bytes Binary data (use with caution)

Column Options

Relationships Between Models

Relational databases allow tables to be related to each other. Flask-SQLAlchemy makes it easy to define and work with these relationships:

Types of Relationships

graph LR A[User] -->|one-to-many| B[Post] C[Post] -->|many-to-one| D[Category] E[User] -->|one-to-one| F[Profile] G[Post] -->|many-to-many| H[Tag]

One-to-Many Relationship Example

class User(db.Model):
    __tablename__ = 'users'
    
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    
    # Define the relationship to Post
    posts = db.relationship('Post', backref='author', lazy=True)
    
    def __repr__(self):
        return f'<User {self.username}>'

class Post(db.Model):
    __tablename__ = 'posts'
    
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(100), nullable=False)
    content = db.Column(db.Text, nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    # Foreign key to User
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
    
    def __repr__(self):
        return f'<Post {self.title}>'

In this example:

Many-to-Many Relationship Example

# Association table for the many-to-many relationship
post_tags = db.Table('post_tags',
    db.Column('post_id', db.Integer, db.ForeignKey('posts.id'), primary_key=True),
    db.Column('tag_id', db.Integer, db.ForeignKey('tags.id'), primary_key=True)
)

class Post(db.Model):
    __tablename__ = 'posts'
    
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(100), nullable=False)
    content = db.Column(db.Text, nullable=False)
    
    # Many-to-many relationship with Tag
    tags = db.relationship('Tag', secondary=post_tags, 
                         backref=db.backref('posts', lazy=True))

class Tag(db.Model):
    __tablename__ = 'tags'
    
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), unique=True, nullable=False)

In this example:

One-to-One Relationship Example

class User(db.Model):
    __tablename__ = 'users'
    
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    
    # One-to-one relationship with Profile (uselist=False makes it one-to-one)
    profile = db.relationship('Profile', backref='user', uselist=False, lazy=True)
    
    def __repr__(self):
        return f'<User {self.username}>'

class Profile(db.Model):
    __tablename__ = 'profiles'
    
    id = db.Column(db.Integer, primary_key=True)
    first_name = db.Column(db.String(50))
    last_name = db.Column(db.String(50))
    bio = db.Column(db.Text)
    
    # Foreign key to User - the unique constraint makes it one-to-one
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False, unique=True)
    
    def __repr__(self):
        return f'<Profile {self.id}>'

In this example:

Creating and Setting Up the Database

After defining models, you need to create the database tables. Flask-SQLAlchemy provides methods to initialize the database:

Creating Tables

# In your Flask application
from app import app, db

# Create all tables
with app.app_context():
    db.create_all()
    
# Or from an interactive Python shell
from app import app, db
with app.app_context():
    db.create_all()

This creates all tables defined by your models that don't already exist in the database.

Important Note

db.create_all() only creates tables that don't exist and doesn't update existing tables. For schema migrations (changes to existing tables), you should use a tool like Flask-Migrate.

Dropping Tables

# Be careful with this!
with app.app_context():
    db.drop_all()

Using Flask-Migrate for Schema Migrations

For production applications, you should use Flask-Migrate (based on Alembic) to handle database schema changes:

# Installation
pip install Flask-Migrate

# In your Flask application
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
db = SQLAlchemy(app)
migrate = Migrate(app, db)

# Define models...

# Using Flask-Migrate from the command line
# Initialize migrations
$ flask db init

# Create a migration
$ flask db migrate -m "Initial migration"

# Apply the migration
$ flask db upgrade

# Revert the migration
$ flask db downgrade

Basic Database Operations (CRUD)

Flask-SQLAlchemy makes it easy to perform CRUD (Create, Read, Update, Delete) operations:

Creating Records (Create)

# Create a new user
new_user = User(username='john_doe', email='john@example.com')

# Add the user to the session
db.session.add(new_user)

# Commit the session to save changes
db.session.commit()

Reading Records (Read)

# Get all users
all_users = User.query.all()

# Get a user by primary key
user = User.query.get(1)

# Get a user by a specific attribute
user = User.query.filter_by(username='john_doe').first()

# Use more complex filters
users = User.query.filter(User.email.endswith('@example.com')).all()

# Order results
recent_users = User.query.order_by(User.created_at.desc()).limit(5).all()

# Pagination
users_page = User.query.paginate(page=2, per_page=20)
for user in users_page.items:
    print(user.username)

Updating Records (Update)

# Get the user to update
user = User.query.get(1)

# Update attributes
user.username = 'new_username'
user.email = 'new_email@example.com'

# Commit the changes
db.session.commit()

Deleting Records (Delete)

# Get the user to delete
user = User.query.get(1)

# Mark for deletion
db.session.delete(user)

# Commit the deletion
db.session.commit()

Bulk Operations

# Bulk insert
users = [
    User(username='user1', email='user1@example.com'),
    User(username='user2', email='user2@example.com'),
    User(username='user3', email='user3@example.com')
]
db.session.add_all(users)
db.session.commit()

# Bulk update
User.query.filter(User.email.endswith('@example.com')).update(
    {User.active: True}, synchronize_session=False
)
db.session.commit()

# Bulk delete
User.query.filter(User.last_login < datetime(2020, 1, 1)).delete()
db.session.commit()

Advanced Querying Techniques

Flask-SQLAlchemy provides powerful querying capabilities inherited from SQLAlchemy:

Complex Filtering

from sqlalchemy import and_, or_, not_

# AND conditions
users = User.query.filter(
    and_(
        User.active == True,
        User.email.endswith('@example.com')
    )
).all()

# OR conditions
users = User.query.filter(
    or_(
        User.username == 'admin',
        User.email == 'admin@example.com'
    )
).all()

# NOT conditions
users = User.query.filter(
    not_(User.username.startswith('deleted_'))
).all()

# Combined conditions
users = User.query.filter(
    and_(
        User.active == True,
        or_(
            User.role == 'admin',
            User.role == 'moderator'
        )
    )
).all()

Joins

# Inner join
posts_with_authors = db.session.query(Post, User).join(User).all()

# Specific join condition
posts_with_authors = db.session.query(Post, User).join(
    User, Post.user_id == User.id
).all()

# Left outer join
posts_with_authors = db.session.query(Post, User).outerjoin(User).all()

# Multiple joins
results = db.session.query(Post, User, Category).join(
    User, Post.user_id == User.id
).join(
    Category, Post.category_id == Category.id
).all()

Aggregations and Grouping

from sqlalchemy import func

# Count
user_count = User.query.count()

# Specific count
active_users = User.query.filter(User.active == True).count()

# Sum
total_posts = db.session.query(func.sum(User.post_count)).scalar()

# Average
avg_age = db.session.query(func.avg(User.age)).scalar()

# Min/Max
newest_user = db.session.query(func.max(User.created_at)).scalar()
oldest_user = db.session.query(func.min(User.created_at)).scalar()

# Group by
posts_by_category = db.session.query(
    Category.name, func.count(Post.id)
).join(
    Post, Post.category_id == Category.id
).group_by(
    Category.name
).all()

Subqueries

from sqlalchemy import subquery

# Users with more than 5 posts
active_users = db.session.query(
    User.id
).join(
    Post
).group_by(
    User.id
).having(
    func.count(Post.id) > 5
).subquery()

# Use the subquery
users = User.query.join(
    active_users, User.id == active_users.c.id
).all()

Raw SQL Queries

When SQLAlchemy's ORM isn't sufficient, you can execute raw SQL:

# Using the engine directly
result = db.engine.execute('SELECT * FROM users WHERE username = :username', 
                         {'username': 'john_doe'})
for row in result:
    print(row)

# Using SQLAlchemy's text function
from sqlalchemy import text
result = db.session.execute(text('SELECT * FROM users WHERE username = :username'), 
                          {'username': 'john_doe'})
for row in result:
    print(row)

Best Practice: Prefer the ORM

While raw SQL is available, it's generally better to use the ORM when possible because:

  • The ORM handles database-specific SQL differences
  • The ORM provides protection against SQL injection
  • Changes to your models are automatically reflected in queries
  • The ORM maintains type safety between Python and SQL

Use raw SQL only when necessary for performance optimization or complex queries that are difficult to express in the ORM.

Using Models in Flask Views

Now, let's see how to integrate database operations with Flask views:

Listing Records

@app.route('/users')
def list_users():
    # Get query parameters
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    
    # Query with pagination
    pagination = User.query.order_by(User.username).paginate(
        page=page, per_page=per_page, error_out=False
    )
    
    # Render template with data
    return render_template(
        'users/index.html',
        users=pagination.items,
        pagination=pagination
    )

Showing a Single Record

@app.route('/users/')
def show_user(user_id):
    # Get the user or return 404
    user = User.query.get_or_404(user_id)
    
    # Get related data
    recent_posts = Post.query.filter_by(user_id=user.id).order_by(
        Post.created_at.desc()
    ).limit(5).all()
    
    # Render template with data
    return render_template(
        'users/show.html',
        user=user,
        recent_posts=recent_posts
    )

Creating a Record

@app.route('/users/new', methods=['GET', 'POST'])
def create_user():
    form = UserForm()
    
    if form.validate_on_submit():
        # Create a new user from form data
        user = User(
            username=form.username.data,
            email=form.email.data,
            password=generate_password_hash(form.password.data)
        )
        
        try:
            # Save to database
            db.session.add(user)
            db.session.commit()
            
            # Flash success message
            flash('User created successfully!', 'success')
            
            # Redirect to the user's page
            return redirect(url_for('show_user', user_id=user.id))
        except IntegrityError:
            # Handle unique constraint violations
            db.session.rollback()
            flash('Username or email already exists!', 'danger')
    
    # GET request or form validation failed
    return render_template('users/new.html', form=form)

Updating a Record

@app.route('/users//edit', methods=['GET', 'POST'])
def edit_user(user_id):
    # Get the user or return 404
    user = User.query.get_or_404(user_id)
    
    # Create form and populate with user data
    form = UserForm(obj=user)
    
    if form.validate_on_submit():
        # Update user from form data
        form.populate_obj(user)
        
        try:
            # Save changes
            db.session.commit()
            
            # Flash success message
            flash('User updated successfully!', 'success')
            
            # Redirect to the user's page
            return redirect(url_for('show_user', user_id=user.id))
        except IntegrityError:
            # Handle unique constraint violations
            db.session.rollback()
            flash('Username or email already exists!', 'danger')
    
    # GET request or form validation failed
    return render_template('users/edit.html', form=form, user=user)

Deleting a Record

@app.route('/users//delete', methods=['POST'])
def delete_user(user_id):
    # Get the user or return 404
    user = User.query.get_or_404(user_id)
    
    try:
        # Delete the user
        db.session.delete(user)
        db.session.commit()
        
        # Flash success message
        flash('User deleted successfully!', 'success')
    except Exception as e:
        # Handle errors
        db.session.rollback()
        flash(f'Error deleting user: {str(e)}', 'danger')
    
    # Redirect to users list
    return redirect(url_for('list_users'))

Model Methods and Properties

You can add custom methods and properties to your models to encapsulate business logic:

Instance Methods

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password_hash = db.Column(db.String(128))
    posts = db.relationship('Post', backref='author', lazy=True)
    
    def set_password(self, password):
        """Set the user's password hash."""
        self.password_hash = generate_password_hash(password)
    
    def check_password(self, password):
        """Check if the password matches the hash."""
        return check_password_hash(self.password_hash, password)
    
    def get_recent_posts(self, limit=5):
        """Get the user's most recent posts."""
        return Post.query.filter_by(user_id=self.id).order_by(
            Post.created_at.desc()
        ).limit(limit).all()
    
    def has_role(self, role_name):
        """Check if the user has a specific role."""
        return any(role.name == role_name for role in self.roles)

Class Methods

class User(db.Model):
    # ... other columns and methods ...
    
    @classmethod
    def find_by_username(cls, username):
        """Find a user by username."""
        return cls.query.filter_by(username=username).first()
    
    @classmethod
    def find_by_email(cls, email):
        """Find a user by email."""
        return cls.query.filter_by(email=email).first()
    
    @classmethod
    def get_statistics(cls):
        """Get user statistics."""
        from sqlalchemy import func
        
        stats = {}
        stats['total'] = cls.query.count()
        stats['active'] = cls.query.filter_by(active=True).count()
        stats['new_today'] = cls.query.filter(
            cls.created_at >= datetime.today().date()
        ).count()
        stats['average_age'] = db.session.query(
            func.avg(cls.age)
        ).scalar()
        
        return stats

Properties

class User(db.Model):
    # ... other columns and methods ...
    
    @property
    def full_name(self):
        """Get the user's full name."""
        return f"{self.first_name} {self.last_name}"
    
    @property
    def is_active(self):
        """Check if the user is active."""
        return self.status == 'active'
    
    @property
    def age(self):
        """Calculate the user's age from their birth date."""
        if not self.birth_date:
            return None
        today = datetime.today()
        return today.year - self.birth_date.year - (
            (today.month, today.day) < (self.birth_date.month, self.birth_date.day)
        )

Event Listeners

from sqlalchemy import event

class Post(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(100), nullable=False)
    content = db.Column(db.Text, nullable=False)
    slug = db.Column(db.String(120), unique=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
    
    def generate_slug(self):
        """Generate a slug from the title."""
        self.slug = slugify(self.title)

# Event listeners
@event.listens_for(Post.title, 'set')
def set_post_slug(target, value, oldvalue, initiator):
    """Generate a slug when the title is set."""
    if value and (not target.slug or value != oldvalue):
        target.slug = slugify(value)
        
@event.listens_for(Post, 'before_insert')
def post_before_insert(mapper, connection, target):
    """Ensure the slug exists before inserting."""
    if not target.slug:
        target.generate_slug()

Real-World Example: E-commerce Product Catalog

Let's examine a comprehensive example of an e-commerce product catalog with categories, products, and reviews:

erDiagram CATEGORY ||--o{ PRODUCT : contains PRODUCT ||--o{ PRODUCT_IMAGE : has PRODUCT ||--o{ REVIEW : receives USER ||--o{ REVIEW : writes PRODUCT }o--o{ TAG : tagged_with CATEGORY { int id PK string name string slug string description datetime created_at } PRODUCT { int id PK string name string slug text description decimal price int stock bool active int category_id FK datetime created_at datetime updated_at } PRODUCT_IMAGE { int id PK int product_id FK string image_path bool is_primary int display_order } REVIEW { int id PK int product_id FK int user_id FK int rating text content bool approved datetime created_at } USER { int id PK string username string email string password_hash } TAG { int id PK string name string slug }

Model Definitions

from datetime import datetime
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from slugify import slugify

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///ecommerce.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)

# Association table for Product-Tag (many-to-many)
product_tags = db.Table('product_tags',
    db.Column('product_id', db.Integer, db.ForeignKey('products.id'), primary_key=True),
    db.Column('tag_id', db.Integer, db.ForeignKey('tags.id'), primary_key=True)
)

class Category(db.Model):
    __tablename__ = 'categories'
    
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), unique=True, nullable=False)
    slug = db.Column(db.String(50), unique=True, nullable=False)
    description = db.Column(db.String(255))
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    # Relationships
    products = db.relationship('Product', backref='category', lazy=True)
    
    def __repr__(self):
        return f'<Category {self.name}>'
    
    @staticmethod
    def generate_slug(target, value, oldvalue, initiator):
        target.slug = slugify(value)

# Event listener for slug generation
db.event.listen(Category.name, 'set', Category.generate_slug)

class Product(db.Model):
    __tablename__ = 'products'
    
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    slug = db.Column(db.String(100), unique=True, nullable=False)
    description = db.Column(db.Text)
    price = db.Column(db.Numeric(10, 2), nullable=False)
    stock = db.Column(db.Integer, default=0)
    active = db.Column(db.Boolean, default=True)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    # Foreign keys
    category_id = db.Column(db.Integer, db.ForeignKey('categories.id'))
    
    # Relationships
    images = db.relationship('ProductImage', backref='product', lazy=True, cascade='all, delete-orphan')
    reviews = db.relationship('Review', backref='product', lazy=True, cascade='all, delete-orphan')
    tags = db.relationship('Tag', secondary=product_tags, backref=db.backref('products', lazy=True))
    
    def __repr__(self):
        return f'<Product {self.name}>'
    
    @staticmethod
    def generate_slug(target, value, oldvalue, initiator):
        target.slug = slugify(value)
    
    @property
    def avg_rating(self):
        """Calculate the average rating from reviews."""
        if not self.reviews:
            return 0
        return sum(review.rating for review in self.reviews) / len(self.reviews)
    
    @property
    def primary_image(self):
        """Get the primary image for the product."""
        primary = next((img for img in self.images if img.is_primary), None)
        if primary:
            return primary
        return self.images[0] if self.images else None
    
    @property
    def in_stock(self):
        """Check if the product is in stock."""
        return self.stock > 0
    
    @classmethod
    def search(cls, query, limit=20):
        """Search products by name or description."""
        return cls.query.filter(
            db.or_(
                cls.name.ilike(f'%{query}%'),
                cls.description.ilike(f'%{query}%')
            )
        ).limit(limit).all()

# Event listener for slug generation
db.event.listen(Product.name, 'set', Product.generate_slug)

class ProductImage(db.Model):
    __tablename__ = 'product_images'
    
    id = db.Column(db.Integer, primary_key=True)
    image_path = db.Column(db.String(255), nullable=False)
    is_primary = db.Column(db.Boolean, default=False)
    display_order = db.Column(db.Integer, default=0)
    
    # Foreign keys
    product_id = db.Column(db.Integer, db.ForeignKey('products.id'), nullable=False)
    
    def __repr__(self):
        return f'<ProductImage {self.id}>'

class Tag(db.Model):
    __tablename__ = 'tags'
    
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), unique=True, nullable=False)
    slug = db.Column(db.String(50), unique=True, nullable=False)
    
    def __repr__(self):
        return f'<Tag {self.name}>'
    
    @staticmethod
    def generate_slug(target, value, oldvalue, initiator):
        target.slug = slugify(value)

# Event listener for slug generation
db.event.listen(Tag.name, 'set', Tag.generate_slug)

class User(db.Model):
    __tablename__ = 'users'
    
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password_hash = db.Column(db.String(128))
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    # Relationships
    reviews = db.relationship('Review', backref='user', lazy=True, cascade='all, delete-orphan')
    
    def __repr__(self):
        return f'<User {self.username}>'

class Review(db.Model):
    __tablename__ = 'reviews'
    
    id = db.Column(db.Integer, primary_key=True)
    rating = db.Column(db.Integer, nullable=False)
    content = db.Column(db.Text)
    approved = db.Column(db.Boolean, default=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    # Foreign keys
    product_id = db.Column(db.Integer, db.ForeignKey('products.id'), nullable=False)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
    
    def __repr__(self):
        return f'<Review {self.id}>'
        
    @property
    def rating_percent(self):
        """Convert the rating to a percentage."""
        return (self.rating / 5) * 100

Example View Functions

# Product listing
@app.route('/products')
def product_list():
    # Get filter parameters
    category_slug = request.args.get('category')
    tag_slug = request.args.get('tag')
    sort_by = request.args.get('sort', 'name')
    page = request.args.get('page', 1, type=int)
    per_page = 12
    
    # Base query
    query = Product.query.filter_by(active=True)
    
    # Apply category filter
    if category_slug:
        category = Category.query.filter_by(slug=category_slug).first_or_404()
        query = query.filter_by(category_id=category.id)
    
    # Apply tag filter
    if tag_slug:
        tag = Tag.query.filter_by(slug=tag_slug).first_or_404()
        query = query.join(product_tags).join(Tag).filter(Tag.id == tag.id)
    
    # Apply sorting
    if sort_by == 'price_low':
        query = query.order_by(Product.price.asc())
    elif sort_by == 'price_high':
        query = query.order_by(Product.price.desc())
    elif sort_by == 'newest':
        query = query.order_by(Product.created_at.desc())
    else:  # Default to name
        query = query.order_by(Product.name.asc())
    
    # Paginate results
    products = query.paginate(page=page, per_page=per_page)
    
    # Get all categories for the sidebar
    categories = Category.query.all()
    
    # Get popular tags
    popular_tags = Tag.query.join(product_tags).group_by(Tag.id).order_by(
        db.func.count().desc()
    ).limit(10).all()
    
    return render_template(
        'products/index.html',
        products=products,
        categories=categories,
        popular_tags=popular_tags,
        current_category=category_slug,
        current_tag=tag_slug,
        current_sort=sort_by
    )

# Product detail
@app.route('/product/')
def product_detail(slug):
    # Get the product or return 404
    product = Product.query.filter_by(slug=slug, active=True).first_or_404()
    
    # Get approved reviews
    reviews = Review.query.filter_by(
        product_id=product.id, approved=True
    ).order_by(Review.created_at.desc()).all()
    
    # Get related products (same category)
    related_products = Product.query.filter(
        Product.category_id == product.category_id,
        Product.id != product.id,
        Product.active == True
    ).limit(4).all()
    
    # Create review form
    form = ReviewForm()
    
    return render_template(
        'products/detail.html',
        product=product,
        reviews=reviews,
        related_products=related_products,
        form=form
    )

# Search products
@app.route('/search')
def search_products():
    query = request.args.get('q', '')
    if not query:
        return redirect(url_for('product_list'))
    
    page = request.args.get('page', 1, type=int)
    per_page = 12
    
    # Search for products matching the query
    products = Product.query.filter(
        Product.active == True,
        db.or_(
            Product.name.ilike(f'%{query}%'),
            Product.description.ilike(f'%{query}%')
        )
    ).paginate(page=page, per_page=per_page)
    
    return render_template(
        'products/search.html',
        products=products,
        query=query
    )

# Submit review
@app.route('/product//review', methods=['POST'])
def submit_review(slug):
    # Get the product or return 404
    product = Product.query.filter_by(slug=slug).first_or_404()
    
    form = ReviewForm()
    
    if form.validate_on_submit():
        # Create a new review
        review = Review(
            rating=form.rating.data,
            content=form.content.data,
            product_id=product.id,
            user_id=current_user.id,
            approved=False  # Require approval
        )
        
        db.session.add(review)
        db.session.commit()
        
        flash('Thank you for your review! It will be visible after approval.', 'success')
    
    return redirect(url_for('product_detail', slug=slug))

Best Practices and Performance Considerations

Here are some best practices and performance tips for working with Flask-SQLAlchemy:

Database Connection Management

# Using teardown functions to close sessions
@app.teardown_appcontext
def shutdown_session(exception=None):
    db.session.remove()

Query Optimization

# Example: Eager loading to avoid N+1 queries
posts = Post.query.options(
    db.joinedload(Post.author),
    db.joinedload(Post.category)
).all()

# Example: Select specific columns
user_emails = db.session.query(
    User.id, User.email
).filter(
    User.active == True
).all()

# Example: Creating an index
class User(db.Model):
    # ...
    email = db.Column(db.String(120), unique=True, nullable=False, index=True)

Database Transactions

# Example: Explicit transaction with try/except
try:
    # Start a transaction
    db.session.begin()
    
    # Multiple operations in one transaction
    user = User(username='johndoe', email='john@example.com')
    db.session.add(user)
    
    profile = Profile(user_id=user.id, bio='Software developer')
    db.session.add(profile)
    
    # Commit the transaction
    db.session.commit()
except Exception as e:
    # Rollback on error
    db.session.rollback()
    flash(f'Error: {str(e)}', 'danger')
    # Re-raise or handle accordingly
    raise

Security Considerations

# BAD - vulnerable to SQL injection
username = request.args.get('username')
users = db.engine.execute(f"SELECT * FROM users WHERE username = '{username}'")

# GOOD - using parameter binding
users = User.query.filter_by(username=username).all()

# Or with raw SQL but still safe
from sqlalchemy import text
users = db.engine.execute(text("SELECT * FROM users WHERE username = :username"), 
                        {"username": username})

Practice Activity

Create a Flask application for a simple task management system:

  1. Create the following models:
    • User (id, username, email, password_hash)
    • Project (id, name, description, owner_id)
    • Task (id, title, description, status, priority, due_date, project_id, assigned_to)
  2. Define relationships between the models:
    • One-to-many relationship between User and Project (one user can have many projects)
    • One-to-many relationship between Project and Task (one project can have many tasks)
    • Many-to-one relationship between Task and User (many tasks can be assigned to one user)
  3. Implement basic CRUD operations for Tasks:
    • List all tasks (with pagination)
    • Show task details
    • Create a new task (with a form)
    • Edit an existing task
    • Delete a task
  4. Add custom methods to the Task model:
    • A method to check if the task is overdue
    • A method to mark the task as complete
    • A property that returns the time remaining until the due date

Bonus challenges:

Further Topics to Explore