Python and PostgreSQL Integration

Connecting Python Applications to PostgreSQL Databases

Introduction to Python Database APIs

Python provides several ways to interact with databases. Understanding the different abstraction levels helps you choose the right approach for your project.

graph TD PDB[Python Database Access] --- L1[Level 1: Database Drivers] PDB --- L2[Level 2: SQL Helpers] PDB --- L3[Level 3: ORMs] L1 --- psycopg2[psycopg2] L1 --- asyncpg[asyncpg] L2 --- records[Records] L2 --- dataset[Dataset] L2 --- sqlhelper[SQL Helper] L3 --- sqlalchemy[SQLAlchemy] L3 --- django[Django ORM] L3 --- peewee[Peewee] style PDB fill:#336791,stroke:#333,stroke-width:2px,color:#fff

Analogy: Different Levels of Driving Assistance

The different levels of database APIs in Python are like different levels of driving assistance in cars:

  • Level 1 (Database Drivers): Like driving a manual transmission car. You have complete control but need to handle every detail yourself: shifting gears, watching RPMs, etc.
  • Level 2 (SQL Helpers): Like driving a car with some convenience features - automatic transmission, cruise control - you still control the car directly, but some repetitive tasks are handled for you.
  • Level 3 (ORMs): Like using driver assistance systems - adaptive cruise control, lane keeping, etc. - you set high-level goals (where to go), and the system handles many low-level details, though you're still in control and can intervene when needed.

Each level trades direct control for convenience. Just as some driving situations call for manual control while others benefit from assistance, different database tasks may be better suited to different levels of abstraction.

Database Drivers (Level 1)

Database drivers provide direct access to the database, offering the most control but requiring detailed knowledge of SQL and the database system.

SQL Helpers (Level 2)

SQL helper libraries provide convenient wrappers around database drivers, reducing boilerplate while still using SQL directly.

ORMs - Object-Relational Mappers (Level 3)

ORMs allow you to work with database data using Python classes and objects instead of writing SQL directly.

Working with psycopg2

psycopg2 is the most popular PostgreSQL adapter for Python. It's a robust, efficient, and feature-complete implementation of the Python DB API 2.0 specification.

Connecting to PostgreSQL

Basic Connection:


import psycopg2

# Method 1: Connection parameters as keyword arguments
try:
    conn = psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost",
        port="5432"
    )
    print("Connection established successfully!")
except Exception as e:
    print(f"Error connecting to PostgreSQL: {e}")
finally:
    if 'conn' in locals():
        conn.close()
        print("Connection closed.")
          

Connection String Method:


import psycopg2

# Method 2: Connection string
conn_string = "postgresql://pythonapp:secure_password@localhost:5432/python_app_db"

try:
    conn = psycopg2.connect(conn_string)
    print("Connection established successfully!")
except Exception as e:
    print(f"Error connecting to PostgreSQL: {e}")
finally:
    if 'conn' in locals():
        conn.close()
        print("Connection closed.")
          

Using with Statement for Automatic Cleanup:


import psycopg2

# Using 'with' statement for automatic connection cleanup
try:
    with psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost",
        port="5432"
    ) as conn:
        print("Connection established successfully!")
        # Connection is automatically closed when exiting the with block
except Exception as e:
    print(f"Error connecting to PostgreSQL: {e}")
          

Connection Best Practices

  • Externalize Credentials: Store connection parameters in a configuration file or environment variables, not in code
  • Use Connection Pooling: For applications handling multiple requests, use a connection pool
  • Handle Connection Errors: Always wrap connection attempts in try-except blocks
  • Close Connections: Ensure connections are closed after use (with statement helps)
  • Use SSL: For production environments, consider enabling SSL connections

Executing SQL Queries

Basic Query Execution:


import psycopg2

try:
    # Establish connection
    with psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost"
    ) as conn:
        # Create a cursor
        with conn.cursor() as cur:
            # Execute a simple query
            cur.execute("SELECT * FROM app.users LIMIT 5")
            
            # Fetch all results
            rows = cur.fetchall()
            
            # Print results
            for row in rows:
                print(row)
                
except Exception as e:
    print(f"Error: {e}")
          

Working with Parameters:


import psycopg2

try:
    with psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost"
    ) as conn:
        with conn.cursor() as cur:
            # Always use parameterized queries to prevent SQL injection
            username = "johndoe"
            
            # BAD: Don't do this!
            # cur.execute(f"SELECT * FROM app.users WHERE username = '{username}'")
            
            # GOOD: Use parameter placeholders
            cur.execute("SELECT * FROM app.users WHERE username = %s", (username,))
            
            # Fetch one row
            user = cur.fetchone()
            
            if user:
                print(f"Found user: {user}")
            else:
                print(f"No user found with username: {username}")
                
except Exception as e:
    print(f"Error: {e}")
          

Handling Different Data Types:


import psycopg2
import datetime

try:
    with psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost"
    ) as conn:
        with conn.cursor() as cur:
            # Example with different parameter types
            cur.execute(
                """
                INSERT INTO app.orders (
                    customer_id, 
                    items, 
                    total_amount, 
                    order_date,
                    is_shipped,
                    notes
                ) VALUES (%s, %s, %s, %s, %s, %s)
                RETURNING id
                """,
                (
                    42,                                 # Integer
                    ['Widget', 'Gadget', 'Doohickey'],  # Array
                    99.95,                              # Float
                    datetime.datetime.now(),            # Timestamp
                    True,                               # Boolean
                    "Priority shipping requested"       # Text
                )
            )
            
            # Get the ID of the inserted row
            order_id = cur.fetchone()[0]
            print(f"Created order with ID: {order_id}")
            
            # Commit the transaction
            conn.commit()
                
except Exception as e:
    print(f"Error: {e}")
          

Transactions in psycopg2

Transaction Management:


import psycopg2

try:
    # Establish connection
    with psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost"
    ) as conn:
        # By default, psycopg2 runs in autocommit=False mode
        # This means all operations are within a transaction
        
        with conn.cursor() as cur:
            # First operation
            cur.execute(
                "INSERT INTO app.accounts (user_id, balance) VALUES (%s, %s)",
                (1, 1000.00)
            )
            
            # Second operation - both will succeed or fail together
            cur.execute(
                "INSERT INTO app.transactions (account_id, amount, description) VALUES (%s, %s, %s)",
                (cur.fetchone()[0], 1000.00, "Initial deposit")
            )
        
        # Transaction is committed when the connection context exits
        # You can also explicitly commit:
        # conn.commit()
    
except Exception as e:
    # If an error occurs, the transaction is automatically rolled back
    print(f"Error: {e}")
    
    # You can also explicitly rollback:
    # if 'conn' in locals():
    #     conn.rollback()
          

Explicit Transaction Control:


import psycopg2

try:
    # Set autocommit mode to True - each statement will be a separate transaction
    conn = psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost"
    )
    conn.autocommit = True
    
    with conn.cursor() as cur:
        # This executes and commits immediately
        cur.execute("INSERT INTO app.logs (message) VALUES ('Autocommit mode')")
    
    # Now start an explicit transaction
    conn.autocommit = False
    
    try:
        with conn.cursor() as cur:
            # First operation
            cur.execute("UPDATE app.accounts SET balance = balance - 100 WHERE id = 1")
            
            # Check if we have sufficient funds
            cur.execute("SELECT balance FROM app.accounts WHERE id = 1")
            balance = cur.fetchone()[0]
            
            if balance < 0:
                # Insufficient funds - rollback
                conn.rollback()
                print("Transaction cancelled: Insufficient funds")
            else:
                # Second operation
                cur.execute(
                    "INSERT INTO app.transactions (account_id, amount, description) VALUES (%s, %s, %s)",
                    (1, -100.00, "Withdrawal")
                )
                # Commit the transaction
                conn.commit()
                print("Transaction completed successfully")
    except Exception as e:
        conn.rollback()
        print(f"Transaction error: {e}")
    
    conn.close()
    
except Exception as e:
    print(f"Connection error: {e}")
          

Fetching Results

Different Fetch Methods:


import psycopg2

try:
    with psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost"
    ) as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT id, username, email FROM app.users")
            
            # Method 1: fetchone() - retrieves one row at a time
            print("First user:")
            first_user = cur.fetchone()
            print(first_user)
            
            # Method 2: fetchmany(n) - retrieves n rows at a time
            print("\nNext 3 users:")
            next_three_users = cur.fetchmany(3)
            for user in next_three_users:
                print(user)
            
            # Method 3: fetchall() - retrieves all remaining rows
            print("\nAll remaining users:")
            remaining_users = cur.fetchall()
            for user in remaining_users:
                print(user)
            
except Exception as e:
    print(f"Error: {e}")
          

Working with Different Cursor Types:


import psycopg2
import psycopg2.extras

try:
    with psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost"
    ) as conn:
        # Regular cursor - returns tuples
        with conn.cursor() as cur:
            cur.execute("SELECT id, username, email FROM app.users LIMIT 2")
            rows = cur.fetchall()
            print("Regular cursor results (tuples):")
            for row in rows:
                print(f"ID: {row[0]}, Username: {row[1]}, Email: {row[2]}")
        
        # Dictionary cursor - returns dictionaries
        with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
            cur.execute("SELECT id, username, email FROM app.users LIMIT 2")
            rows = cur.fetchall()
            print("\nDictCursor results (dictionary-like):")
            for row in rows:
                print(f"ID: {row['id']}, Username: {row['username']}, Email: {row['email']}")
        
        # Real dictionary cursor - returns actual dictionaries
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute("SELECT id, username, email FROM app.users LIMIT 2")
            rows = cur.fetchall()
            print("\nRealDictCursor results (actual dictionaries):")
            for row in rows:
                print(row)  # This is a dictionary
        
        # Named tuple cursor - returns named tuples
        with conn.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cur:
            cur.execute("SELECT id, username, email FROM app.users LIMIT 2")
            rows = cur.fetchall()
            print("\nNamedTupleCursor results (named tuples):")
            for row in rows:
                print(f"ID: {row.id}, Username: {row.username}, Email: {row.email}")
        
except Exception as e:
    print(f"Error: {e}")
          

Server-Side Cursors

When dealing with large result sets, server-side cursors allow fetching results in chunks without loading everything into memory.

Using Server-Side Cursors:


import psycopg2

try:
    with psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost"
    ) as conn:
        # Create a named server-side cursor
        # This doesn't fetch all rows at once, helping with memory usage
        with conn.cursor(name='large_result_set_cursor') as cur:
            # Execute a query that might return many rows
            cur.execute("SELECT * FROM app.logs")
            
            # Fetch in batches of 100 rows
            batch_size = 100
            batch_num = 0
            
            while True:
                rows = cur.fetchmany(batch_size)
                
                if not rows:
                    break
                
                batch_num += 1
                print(f"Processing batch {batch_num} ({len(rows)} rows)")
                
                # Process each row in the batch
                for row in rows:
                    # Do something with the row
                    pass
            
            print(f"Processed {batch_num} batches")
            
except Exception as e:
    print(f"Error: {e}")
          

psycopg2 Advanced Features

Working with PostgreSQL Data Types

Working with Arrays:


import psycopg2
from psycopg2.extras import execute_values

try:
    with psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost"
    ) as conn:
        with conn.cursor() as cur:
            # Create a table with an array column
            cur.execute("""
                CREATE TABLE IF NOT EXISTS app.products (
                    id SERIAL PRIMARY KEY,
                    name VARCHAR(100) NOT NULL,
                    tags TEXT[],
                    stock_levels INTEGER[]
                )
            """)
            
            # Insert a record with array data
            cur.execute(
                "INSERT INTO app.products (name, tags, stock_levels) VALUES (%s, %s, %s)",
                (
                    "Ergonomic Keyboard",
                    ["electronics", "office", "ergonomic"],  # Text array
                    [45, 62, 13]  # Integer array
                )
            )
            
            # Query the array data
            cur.execute("SELECT * FROM app.products")
            products = cur.fetchall()
            
            for product in products:
                print(f"Product: {product[1]}")
                print(f"Tags: {product[2]}")
                print(f"Stock Levels: {product[3]}")
                
            # Array operators
            cur.execute(
                "SELECT * FROM app.products WHERE %s = ANY(tags)",
                ("office",)
            )
            office_products = cur.fetchall()
            print(f"\nOffice products: {len(office_products)}")
            
            # Update an array element
            cur.execute(
                "UPDATE app.products SET stock_levels[1] = %s WHERE id = %s",
                (50, 1)
            )
            
            # Append to an array
            cur.execute(
                "UPDATE app.products SET tags = array_append(tags, %s) WHERE id = %s",
                ("bestseller", 1)
            )
            
            conn.commit()
            
except Exception as e:
    print(f"Error: {e}")
          

Working with JSON/JSONB:


import psycopg2
import json
from psycopg2.extras import Json

try:
    with psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost"
    ) as conn:
        with conn.cursor() as cur:
            # Create a table with JSON columns
            cur.execute("""
                CREATE TABLE IF NOT EXISTS app.events (
                    id SERIAL PRIMARY KEY,
                    name VARCHAR(100) NOT NULL,
                    data JSONB NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)
            
            # Python dictionary to store as JSON
            event_data = {
                "user_id": 123,
                "action": "login",
                "device": {
                    "type": "mobile",
                    "os": "iOS",
                    "version": "15.2"
                },
                "location": {
                    "country": "US",
                    "city": "San Francisco"
                }
            }
            
            # Method 1: Convert to JSON string manually
            cur.execute(
                "INSERT INTO app.events (name, data) VALUES (%s, %s)",
                ("User Login", json.dumps(event_data))
            )
            
            # Method 2: Use psycopg2's Json adapter
            cur.execute(
                "INSERT INTO app.events (name, data) VALUES (%s, %s)",
                ("User Login", Json(event_data))
            )
            
            # Query JSON data
            cur.execute("SELECT id, name, data FROM app.events LIMIT 5")
            events = cur.fetchall()
            
            for event in events:
                event_id, event_name, event_data = event
                print(f"Event {event_id}: {event_name}")
                print(f"Data: {event_data}")
                print(f"User ID: {event_data.get('user_id')}")
                print(f"Device OS: {event_data.get('device', {}).get('os')}")
                print()
                
            # Query using JSON operators
            cur.execute("""
                SELECT * FROM app.events 
                WHERE data->>'action' = %s 
                AND data->'device'->>'type' = %s
            """, ('login', 'mobile'))
            
            mobile_logins = cur.fetchall()
            print(f"Mobile login events: {len(mobile_logins)}")
            
            # Update JSON data
            cur.execute("""
                UPDATE app.events 
                SET data = data || %s::jsonb
                WHERE id = %s
            """, (json.dumps({"processed": True}), 1))
            
            conn.commit()
            
except Exception as e:
    print(f"Error: {e}")
          

Batch Operations

Efficient Batch Insertions:


import psycopg2
from psycopg2.extras import execute_values
import time

try:
    with psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost"
    ) as conn:
        with conn.cursor() as cur:
            # Create a test table
            cur.execute("""
                CREATE TABLE IF NOT EXISTS app.logs (
                    id SERIAL PRIMARY KEY,
                    level VARCHAR(10) NOT NULL,
                    message TEXT NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)
            
            # Prepare a large dataset
            log_entries = [
                ('INFO', f'Log message {i}') for i in range(1, 10001)
            ]
            
            # Method 1: Loop and execute (slow)
            start_time = time.time()
            
            for level, message in log_entries[:100]:  # Just do 100 for comparison
                cur.execute(
                    "INSERT INTO app.logs (level, message) VALUES (%s, %s)",
                    (level, message)
                )
            
            loop_time = time.time() - start_time
            print(f"Loop insertion time for 100 records: {loop_time:.2f} seconds")
            
            # Method 2: executemany (better)
            start_time = time.time()
            
            cur.executemany(
                "INSERT INTO app.logs (level, message) VALUES (%s, %s)",
                log_entries[:1000]  # Do 1000 for better comparison
            )
            
            executemany_time = time.time() - start_time
            print(f"executemany time for 1000 records: {executemany_time:.2f} seconds")
            
            # Method 3: execute_values (best)
            start_time = time.time()
            
            execute_values(
                cur,
                "INSERT INTO app.logs (level, message) VALUES %s",
                log_entries,
                template="(%s, %s)"
            )
            
            execute_values_time = time.time() - start_time
            print(f"execute_values time for 10000 records: {execute_values_time:.2f} seconds")
            
            # Compare the performance
            print(f"execute_values is {(loop_time/100)/(execute_values_time/10000):.1f}x faster than loop per record")
            print(f"execute_values is {(executemany_time/1000)/(execute_values_time/10000):.1f}x faster than executemany per record")
            
            conn.commit()
            
except Exception as e:
    print(f"Error: {e}")
          

Copy Operations

Using COPY for Bulk Data Transfer:


import psycopg2
import csv
import io

try:
    with psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost"
    ) as conn:
        with conn.cursor() as cur:
            # Create a test table
            cur.execute("""
                CREATE TABLE IF NOT EXISTS app.employees (
                    id SERIAL PRIMARY KEY,
                    first_name VARCHAR(50) NOT NULL,
                    last_name VARCHAR(50) NOT NULL,
                    email VARCHAR(100),
                    department VARCHAR(50),
                    salary NUMERIC(10, 2)
                )
            """)
            
            # Create sample data
            sample_data = [
                ['John', 'Smith', 'john@example.com', 'Engineering', 85000],
                ['Jane', 'Doe', 'jane@example.com', 'Marketing', 75000],
                ['Bob', 'Johnson', 'bob@example.com', 'Finance', 90000],
                ['Alice', 'Williams', 'alice@example.com', 'Engineering', 82000],
                ['Charlie', 'Brown', 'charlie@example.com', 'HR', 65000],
            ] * 1000  # Repeat data to create a larger dataset
            
            # Method 1: Using CSV file-like object with COPY
            csv_file = io.StringIO()
            csv_writer = csv.writer(csv_file)
            csv_writer.writerows(sample_data)
            csv_file.seek(0)
            
            cur.copy_from(
                csv_file,
                'app.employees',
                sep=',',
                columns=('first_name', 'last_name', 'email', 'department', 'salary')
            )
            
            # Check how many rows were inserted
            cur.execute("SELECT COUNT(*) FROM app.employees")
            count = cur.fetchone()[0]
            print(f"Inserted {count} rows using COPY")
            
            # Method 2: Using copy_expert with COPY
            # First, truncate the table
            cur.execute("TRUNCATE app.employees RESTART IDENTITY")
            
            # Prepare data again
            csv_file = io.StringIO()
            csv_writer = csv.writer(csv_file)
            csv_writer.writerows(sample_data)
            csv_file.seek(0)
            
            # Use copy_expert for more flexibility
            cur.copy_expert(
                sql="COPY app.employees (first_name, last_name, email, department, salary) FROM STDIN WITH CSV",
                file=csv_file
            )
            
            # Check how many rows were inserted
            cur.execute("SELECT COUNT(*) FROM app.employees")
            count = cur.fetchone()[0]
            print(f"Inserted {count} rows using copy_expert")
            
            conn.commit()
            
except Exception as e:
    print(f"Error: {e}")
          

Connection Pooling

For applications that handle multiple requests or operations, connection pooling can significantly improve performance by reusing database connections instead of establishing new ones for each operation.

Connection Pooling with psycopg2.pool:


import psycopg2
from psycopg2 import pool
import time
import threading

# Create a connection pool
try:
    # Create a ThreadedConnectionPool
    # minconn: minimum number of connections
    # maxconn: maximum number of connections
    connection_pool = pool.ThreadedConnectionPool(
        minconn=1,
        maxconn=10,
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost"
    )
    
    print("Connection pool created successfully")
    
    # Function to execute a query using a connection from the pool
    def execute_query(query, params=None):
        conn = None
        try:
            # Get a connection from the pool
            conn = connection_pool.getconn()
            
            # Use the connection
            with conn.cursor() as cur:
                cur.execute(query, params or ())
                return cur.fetchall()
        finally:
            # Return the connection to the pool
            if conn:
                connection_pool.putconn(conn)
    
    # Function to simulate a task that uses the database
    def task(task_id):
        print(f"Task {task_id} starting")
        result = execute_query(
            "SELECT pg_sleep(1), %s as task_id", 
            (task_id,)
        )
        print(f"Task {task_id} completed")
    
    # Simulate multiple concurrent tasks
    threads = []
    
    start_time = time.time()
    
    # Create and start 5 threads
    for i in range(5):
        t = threading.Thread(target=task, args=(i,))
        threads.append(t)
        t.start()
    
    # Wait for all threads to complete
    for t in threads:
        t.join()
    
    end_time = time.time()
    print(f"All tasks completed in {end_time - start_time:.2f} seconds")
    
except Exception as e:
    print(f"Error: {e}")
finally:
    # Close the connection pool
    if 'connection_pool' in locals():
        connection_pool.closeall()
        print("Connection pool closed")
          

Connection Pooling Best Practices

  • Pool Size: Set appropriate minconn and maxconn values based on your application's concurrency requirements
  • Connection Lifecycle: Always return connections to the pool (putconn) when done using them
  • Error Handling: Handle connection errors and implement retries if necessary
  • Pool Monitoring: For production applications, monitor pool usage and adjust parameters if needed
  • Clean Shutdown: Call closeall() when shutting down your application
  • Alternative Libraries: Consider specialized libraries like SQLAlchemy's connection pool or external poolers like PgBouncer for high-load applications

Error Handling and Logging

Exception Hierarchy

psycopg2 defines a set of exceptions that correspond to different PostgreSQL error conditions:

graph TD E[Exception] --> DE[DatabaseError] DE --> OE[OperationalError] DE --> IE[IntegrityError] DE --> DTE[DataError] DE --> PE[ProgrammingError] DE --> INTE[InternalError] DE --> NE[NotSupportedError] style E fill:#ffcccc,stroke:#333 style DE fill:#ffddcc,stroke:#333 style OE fill:#ffeecc,stroke:#333 style IE fill:#ffeecc,stroke:#333 style DTE fill:#ffeecc,stroke:#333 style PE fill:#ffeecc,stroke:#333 style INTE fill:#ffeecc,stroke:#333 style NE fill:#ffeecc,stroke:#333

Detailed Error Handling:


import psycopg2
from psycopg2 import errors

try:
    with psycopg2.connect(
        dbname="python_app_db",
        user="pythonapp",
        password="secure_password",
        host="localhost"
    ) as conn:
        with conn.cursor() as cur:
            try:
                # Try creating a table
                cur.execute("""
                    CREATE TABLE app.users (
                        id SERIAL PRIMARY KEY,
                        username VARCHAR(50) UNIQUE NOT NULL,
                        email VARCHAR(100) UNIQUE NOT NULL
                    )
                """)
                print("Table created successfully")
            except errors.DuplicateTable:
                print("Table already exists")
            
            try:
                # Try inserting a record
                cur.execute("""
                    INSERT INTO app.users (username, email) 
                    VALUES (%s, %s)
                """, ('johndoe', 'john@example.com'))
                print("User inserted successfully")
            except errors.UniqueViolation as e:
                print(f"Duplicate user: {e}")
            
            try:
                # Try a query with invalid SQL
                cur.execute("SELECT * FROM nonexistent_table")
            except errors.UndefinedTable:
                print("The table does not exist")
            
            try:
                # Try a query with too many parameters
                cur.execute("SELECT * FROM app.users WHERE id = %s", (1, 2))
            except psycopg2.ProgrammingError as e:
                print(f"Programming error: {e}")
            
            conn.commit()
            
except psycopg2.OperationalError as e:
    print(f"Connection error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")
          

Implementing a Database Logger

Database Logging Class:


import psycopg2
import logging
import datetime
import traceback

class DatabaseLogger:
    def __init__(self, connection_params):
        self.conn_params = connection_params
        self.setup_log_table()
    
    def setup_log_table(self):
        """Ensure the log table exists."""
        try:
            with psycopg2.connect(**self.conn_params) as conn:
                with conn.cursor() as cur:
                    cur.execute("""
                        CREATE TABLE IF NOT EXISTS app.application_logs (
                            id SERIAL PRIMARY KEY,
                            timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                            level VARCHAR(10) NOT NULL,
                            message TEXT NOT NULL,
                            details TEXT,
                            source VARCHAR(255)
                        )
                    """)
                conn.commit()
        except Exception as e:
            # Fall back to console logging if we can't set up the log table
            logging.error(f"Failed to set up log table: {e}")
    
    def log(self, level, message, details=None, source=None):
        """Log a message to the database."""
        try:
            with psycopg2.connect(**self.conn_params) as conn:
                with conn.cursor() as cur:
                    cur.execute("""
                        INSERT INTO app.application_logs (level, message, details, source)
                        VALUES (%s, %s, %s, %s)
                    """, (level, message, details, source))
                conn.commit()
        except Exception as e:
            # Fall back to console logging if database logging fails
            logging.error(f"Failed to write to log table: {e}")
            logging.error(f"Original log: [{level}] {message}")
    
    def info(self, message, source=None):
        """Log an info message."""
        self.log("INFO", message, source=source)
    
    def warning(self, message, source=None):
        """Log a warning message."""
        self.log("WARNING", message, source=source)
    
    def error(self, message, details=None, source=None):
        """Log an error message."""
        self.log("ERROR", message, details=details, source=source)
    
    def exception(self, message, exception, source=None):
        """Log an exception with stack trace."""
        stack_trace = traceback.format_exc()
        details = f"Exception: {str(exception)}\n\nStack Trace:\n{stack_trace}"
        self.log("ERROR", message, details=details, source=source)
    
    def get_recent_logs(self, limit=50, level=None):
        """Retrieve recent logs from the database."""
        try:
            with psycopg2.connect(**self.conn_params) as conn:
                with conn.cursor() as cur:
                    query = """
                        SELECT id, timestamp, level, message, details, source
                        FROM app.application_logs
                    """
                    params = []
                    
                    if level:
                        query += " WHERE level = %s"
                        params.append(level)
                    
                    query += " ORDER BY timestamp DESC LIMIT %s"
                    params.append(limit)
                    
                    cur.execute(query, params)
                    return cur.fetchall()
        except Exception as e:
            logging.error(f"Failed to retrieve logs: {e}")
            return []

# Example usage
if __name__ == "__main__":
    # Set up the database logger
    db_logger = DatabaseLogger({
        'dbname': 'python_app_db',
        'user': 'pythonapp',
        'password': 'secure_password',
        'host': 'localhost'
    })
    
    # Log some messages
    db_logger.info("Application started", "main.py")
    
    try:
        # Simulate an error
        result = 10 / 0
    except Exception as e:
        db_logger.exception("Division by zero error", e, "error_handling_example")
    
    db_logger.warning("Database connection pool running low", "db_module")
    
    # Retrieve recent error logs
    error_logs = db_logger.get_recent_logs(level="ERROR")
    print(f"Recent Errors ({len(error_logs)}):")
    for log in error_logs:
        print(f"[{log[1]}] {log[3]}")
          

Practical Activities

Activity 1: Basic Database Operations

Create a Python script that performs the following operations using psycopg2:

  1. Connects to your PostgreSQL database
  2. Creates a table of your choice (e.g., books, products, or students)
  3. Inserts at least 5 records into the table
  4. Updates one of the records
  5. Deletes one of the records
  6. Retrieves and displays all records
  7. Performs a filtered query and displays the results
  8. Uses proper error handling throughout

Activity 2: Transaction Management

Create a banking simulation with the following components:

  1. Create tables for accounts and transactions
  2. Implement a transfer function that:
    • Deducts money from one account
    • Adds money to another account
    • Records the transaction
    • Uses a transaction to ensure atomicity
    • Handles potential errors (e.g., insufficient funds)
  3. Test the function with both successful and failed transfers
  4. Implement a transaction history function that retrieves all transactions for an account

Activity 3: Data Import and Export

Create a script that demonstrates efficient data import and export:

  1. Generate or download a CSV file with sample data (at least 1000 rows)
  2. Create a table matching the CSV structure
  3. Import the CSV data using psycopg2's copy_from or copy_expert
  4. Perform some aggregation queries on the imported data
  5. Export the results to a new CSV file
  6. Compare the performance of different import methods (loop, executemany, copy)

Key Takeaways

Next Steps

In our next lecture, we'll explore Object-Relational Mapping (ORM) with SQLAlchemy, which provides a higher level of abstraction for database operations in Python.