Introduction to Python Database APIs
Python provides several ways to interact with databases. Understanding the different abstraction levels helps you choose the right approach for your project.
Analogy: Different Levels of Driving Assistance
The different levels of database APIs in Python are like different levels of driving assistance in cars:
- Level 1 (Database Drivers): Like driving a manual transmission car. You have complete control but need to handle every detail yourself: shifting gears, watching RPMs, etc.
- Level 2 (SQL Helpers): Like driving a car with some convenience features - automatic transmission, cruise control - you still control the car directly, but some repetitive tasks are handled for you.
- Level 3 (ORMs): Like using driver assistance systems - adaptive cruise control, lane keeping, etc. - you set high-level goals (where to go), and the system handles many low-level details, though you're still in control and can intervene when needed.
Each level trades direct control for convenience. Just as some driving situations call for manual control while others benefit from assistance, different database tasks may be better suited to different levels of abstraction.
Database Drivers (Level 1)
Database drivers provide direct access to the database, offering the most control but requiring detailed knowledge of SQL and the database system.
- psycopg2: The most widely used PostgreSQL adapter for Python
- psycopg2-binary: Pre-compiled binary version of psycopg2
- asyncpg: High-performance asynchronous PostgreSQL driver
- py-postgresql: Pure Python PostgreSQL driver
SQL Helpers (Level 2)
SQL helper libraries provide convenient wrappers around database drivers, reducing boilerplate while still using SQL directly.
- Records: SQL for Humans - simplified SQL database access
- Dataset: Database access that combines core concepts of the ORMs and SQL
- psycopg2 extras: Additional capabilities provided by psycopg2
ORMs - Object-Relational Mappers (Level 3)
ORMs allow you to work with database data using Python classes and objects instead of writing SQL directly.
- SQLAlchemy: The most comprehensive and powerful Python ORM
- Django ORM: The ORM included with the Django web framework
- Peewee: A small, simple, and expressive ORM
- Pony ORM: Provides a generator-based query syntax
- SQLObject: One of the original Python ORMs
- Tortoise ORM: An async ORM inspired by Django
Working with psycopg2
psycopg2 is the most popular PostgreSQL adapter for Python. It's a robust, efficient, and feature-complete implementation of the Python DB API 2.0 specification.
Connecting to PostgreSQL
Basic Connection:
import psycopg2
# Method 1: Connection parameters as keyword arguments
try:
conn = psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost",
port="5432"
)
print("Connection established successfully!")
except Exception as e:
print(f"Error connecting to PostgreSQL: {e}")
finally:
if 'conn' in locals():
conn.close()
print("Connection closed.")
Connection String Method:
import psycopg2
# Method 2: Connection string
conn_string = "postgresql://pythonapp:secure_password@localhost:5432/python_app_db"
try:
conn = psycopg2.connect(conn_string)
print("Connection established successfully!")
except Exception as e:
print(f"Error connecting to PostgreSQL: {e}")
finally:
if 'conn' in locals():
conn.close()
print("Connection closed.")
Using with Statement for Automatic Cleanup:
import psycopg2
# Using 'with' statement for automatic connection cleanup
try:
with psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost",
port="5432"
) as conn:
print("Connection established successfully!")
# Connection is automatically closed when exiting the with block
except Exception as e:
print(f"Error connecting to PostgreSQL: {e}")
Connection Best Practices
- Externalize Credentials: Store connection parameters in a configuration file or environment variables, not in code
- Use Connection Pooling: For applications handling multiple requests, use a connection pool
- Handle Connection Errors: Always wrap connection attempts in try-except blocks
- Close Connections: Ensure connections are closed after use (with statement helps)
- Use SSL: For production environments, consider enabling SSL connections
Executing SQL Queries
Basic Query Execution:
import psycopg2
try:
# Establish connection
with psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost"
) as conn:
# Create a cursor
with conn.cursor() as cur:
# Execute a simple query
cur.execute("SELECT * FROM app.users LIMIT 5")
# Fetch all results
rows = cur.fetchall()
# Print results
for row in rows:
print(row)
except Exception as e:
print(f"Error: {e}")
Working with Parameters:
import psycopg2
try:
with psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost"
) as conn:
with conn.cursor() as cur:
# Always use parameterized queries to prevent SQL injection
username = "johndoe"
# BAD: Don't do this!
# cur.execute(f"SELECT * FROM app.users WHERE username = '{username}'")
# GOOD: Use parameter placeholders
cur.execute("SELECT * FROM app.users WHERE username = %s", (username,))
# Fetch one row
user = cur.fetchone()
if user:
print(f"Found user: {user}")
else:
print(f"No user found with username: {username}")
except Exception as e:
print(f"Error: {e}")
Handling Different Data Types:
import psycopg2
import datetime
try:
with psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost"
) as conn:
with conn.cursor() as cur:
# Example with different parameter types
cur.execute(
"""
INSERT INTO app.orders (
customer_id,
items,
total_amount,
order_date,
is_shipped,
notes
) VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
42, # Integer
['Widget', 'Gadget', 'Doohickey'], # Array
99.95, # Float
datetime.datetime.now(), # Timestamp
True, # Boolean
"Priority shipping requested" # Text
)
)
# Get the ID of the inserted row
order_id = cur.fetchone()[0]
print(f"Created order with ID: {order_id}")
# Commit the transaction
conn.commit()
except Exception as e:
print(f"Error: {e}")
Transactions in psycopg2
Transaction Management:
import psycopg2
try:
# Establish connection
with psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost"
) as conn:
# By default, psycopg2 runs in autocommit=False mode
# This means all operations are within a transaction
with conn.cursor() as cur:
# First operation
cur.execute(
"INSERT INTO app.accounts (user_id, balance) VALUES (%s, %s)",
(1, 1000.00)
)
# Second operation - both will succeed or fail together
cur.execute(
"INSERT INTO app.transactions (account_id, amount, description) VALUES (%s, %s, %s)",
(cur.fetchone()[0], 1000.00, "Initial deposit")
)
# Transaction is committed when the connection context exits
# You can also explicitly commit:
# conn.commit()
except Exception as e:
# If an error occurs, the transaction is automatically rolled back
print(f"Error: {e}")
# You can also explicitly rollback:
# if 'conn' in locals():
# conn.rollback()
Explicit Transaction Control:
import psycopg2
try:
# Set autocommit mode to True - each statement will be a separate transaction
conn = psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost"
)
conn.autocommit = True
with conn.cursor() as cur:
# This executes and commits immediately
cur.execute("INSERT INTO app.logs (message) VALUES ('Autocommit mode')")
# Now start an explicit transaction
conn.autocommit = False
try:
with conn.cursor() as cur:
# First operation
cur.execute("UPDATE app.accounts SET balance = balance - 100 WHERE id = 1")
# Check if we have sufficient funds
cur.execute("SELECT balance FROM app.accounts WHERE id = 1")
balance = cur.fetchone()[0]
if balance < 0:
# Insufficient funds - rollback
conn.rollback()
print("Transaction cancelled: Insufficient funds")
else:
# Second operation
cur.execute(
"INSERT INTO app.transactions (account_id, amount, description) VALUES (%s, %s, %s)",
(1, -100.00, "Withdrawal")
)
# Commit the transaction
conn.commit()
print("Transaction completed successfully")
except Exception as e:
conn.rollback()
print(f"Transaction error: {e}")
conn.close()
except Exception as e:
print(f"Connection error: {e}")
Fetching Results
Different Fetch Methods:
import psycopg2
try:
with psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost"
) as conn:
with conn.cursor() as cur:
cur.execute("SELECT id, username, email FROM app.users")
# Method 1: fetchone() - retrieves one row at a time
print("First user:")
first_user = cur.fetchone()
print(first_user)
# Method 2: fetchmany(n) - retrieves n rows at a time
print("\nNext 3 users:")
next_three_users = cur.fetchmany(3)
for user in next_three_users:
print(user)
# Method 3: fetchall() - retrieves all remaining rows
print("\nAll remaining users:")
remaining_users = cur.fetchall()
for user in remaining_users:
print(user)
except Exception as e:
print(f"Error: {e}")
Working with Different Cursor Types:
import psycopg2
import psycopg2.extras
try:
with psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost"
) as conn:
# Regular cursor - returns tuples
with conn.cursor() as cur:
cur.execute("SELECT id, username, email FROM app.users LIMIT 2")
rows = cur.fetchall()
print("Regular cursor results (tuples):")
for row in rows:
print(f"ID: {row[0]}, Username: {row[1]}, Email: {row[2]}")
# Dictionary cursor - returns dictionaries
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute("SELECT id, username, email FROM app.users LIMIT 2")
rows = cur.fetchall()
print("\nDictCursor results (dictionary-like):")
for row in rows:
print(f"ID: {row['id']}, Username: {row['username']}, Email: {row['email']}")
# Real dictionary cursor - returns actual dictionaries
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT id, username, email FROM app.users LIMIT 2")
rows = cur.fetchall()
print("\nRealDictCursor results (actual dictionaries):")
for row in rows:
print(row) # This is a dictionary
# Named tuple cursor - returns named tuples
with conn.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cur:
cur.execute("SELECT id, username, email FROM app.users LIMIT 2")
rows = cur.fetchall()
print("\nNamedTupleCursor results (named tuples):")
for row in rows:
print(f"ID: {row.id}, Username: {row.username}, Email: {row.email}")
except Exception as e:
print(f"Error: {e}")
Server-Side Cursors
When dealing with large result sets, server-side cursors allow fetching results in chunks without loading everything into memory.
Using Server-Side Cursors:
import psycopg2
try:
with psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost"
) as conn:
# Create a named server-side cursor
# This doesn't fetch all rows at once, helping with memory usage
with conn.cursor(name='large_result_set_cursor') as cur:
# Execute a query that might return many rows
cur.execute("SELECT * FROM app.logs")
# Fetch in batches of 100 rows
batch_size = 100
batch_num = 0
while True:
rows = cur.fetchmany(batch_size)
if not rows:
break
batch_num += 1
print(f"Processing batch {batch_num} ({len(rows)} rows)")
# Process each row in the batch
for row in rows:
# Do something with the row
pass
print(f"Processed {batch_num} batches")
except Exception as e:
print(f"Error: {e}")
psycopg2 Advanced Features
Working with PostgreSQL Data Types
Working with Arrays:
import psycopg2
from psycopg2.extras import execute_values
try:
with psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost"
) as conn:
with conn.cursor() as cur:
# Create a table with an array column
cur.execute("""
CREATE TABLE IF NOT EXISTS app.products (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
tags TEXT[],
stock_levels INTEGER[]
)
""")
# Insert a record with array data
cur.execute(
"INSERT INTO app.products (name, tags, stock_levels) VALUES (%s, %s, %s)",
(
"Ergonomic Keyboard",
["electronics", "office", "ergonomic"], # Text array
[45, 62, 13] # Integer array
)
)
# Query the array data
cur.execute("SELECT * FROM app.products")
products = cur.fetchall()
for product in products:
print(f"Product: {product[1]}")
print(f"Tags: {product[2]}")
print(f"Stock Levels: {product[3]}")
# Array operators
cur.execute(
"SELECT * FROM app.products WHERE %s = ANY(tags)",
("office",)
)
office_products = cur.fetchall()
print(f"\nOffice products: {len(office_products)}")
# Update an array element
cur.execute(
"UPDATE app.products SET stock_levels[1] = %s WHERE id = %s",
(50, 1)
)
# Append to an array
cur.execute(
"UPDATE app.products SET tags = array_append(tags, %s) WHERE id = %s",
("bestseller", 1)
)
conn.commit()
except Exception as e:
print(f"Error: {e}")
Working with JSON/JSONB:
import psycopg2
import json
from psycopg2.extras import Json
try:
with psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost"
) as conn:
with conn.cursor() as cur:
# Create a table with JSON columns
cur.execute("""
CREATE TABLE IF NOT EXISTS app.events (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
data JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Python dictionary to store as JSON
event_data = {
"user_id": 123,
"action": "login",
"device": {
"type": "mobile",
"os": "iOS",
"version": "15.2"
},
"location": {
"country": "US",
"city": "San Francisco"
}
}
# Method 1: Convert to JSON string manually
cur.execute(
"INSERT INTO app.events (name, data) VALUES (%s, %s)",
("User Login", json.dumps(event_data))
)
# Method 2: Use psycopg2's Json adapter
cur.execute(
"INSERT INTO app.events (name, data) VALUES (%s, %s)",
("User Login", Json(event_data))
)
# Query JSON data
cur.execute("SELECT id, name, data FROM app.events LIMIT 5")
events = cur.fetchall()
for event in events:
event_id, event_name, event_data = event
print(f"Event {event_id}: {event_name}")
print(f"Data: {event_data}")
print(f"User ID: {event_data.get('user_id')}")
print(f"Device OS: {event_data.get('device', {}).get('os')}")
print()
# Query using JSON operators
cur.execute("""
SELECT * FROM app.events
WHERE data->>'action' = %s
AND data->'device'->>'type' = %s
""", ('login', 'mobile'))
mobile_logins = cur.fetchall()
print(f"Mobile login events: {len(mobile_logins)}")
# Update JSON data
cur.execute("""
UPDATE app.events
SET data = data || %s::jsonb
WHERE id = %s
""", (json.dumps({"processed": True}), 1))
conn.commit()
except Exception as e:
print(f"Error: {e}")
Batch Operations
Efficient Batch Insertions:
import psycopg2
from psycopg2.extras import execute_values
import time
try:
with psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost"
) as conn:
with conn.cursor() as cur:
# Create a test table
cur.execute("""
CREATE TABLE IF NOT EXISTS app.logs (
id SERIAL PRIMARY KEY,
level VARCHAR(10) NOT NULL,
message TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Prepare a large dataset
log_entries = [
('INFO', f'Log message {i}') for i in range(1, 10001)
]
# Method 1: Loop and execute (slow)
start_time = time.time()
for level, message in log_entries[:100]: # Just do 100 for comparison
cur.execute(
"INSERT INTO app.logs (level, message) VALUES (%s, %s)",
(level, message)
)
loop_time = time.time() - start_time
print(f"Loop insertion time for 100 records: {loop_time:.2f} seconds")
# Method 2: executemany (better)
start_time = time.time()
cur.executemany(
"INSERT INTO app.logs (level, message) VALUES (%s, %s)",
log_entries[:1000] # Do 1000 for better comparison
)
executemany_time = time.time() - start_time
print(f"executemany time for 1000 records: {executemany_time:.2f} seconds")
# Method 3: execute_values (best)
start_time = time.time()
execute_values(
cur,
"INSERT INTO app.logs (level, message) VALUES %s",
log_entries,
template="(%s, %s)"
)
execute_values_time = time.time() - start_time
print(f"execute_values time for 10000 records: {execute_values_time:.2f} seconds")
# Compare the performance
print(f"execute_values is {(loop_time/100)/(execute_values_time/10000):.1f}x faster than loop per record")
print(f"execute_values is {(executemany_time/1000)/(execute_values_time/10000):.1f}x faster than executemany per record")
conn.commit()
except Exception as e:
print(f"Error: {e}")
Copy Operations
Using COPY for Bulk Data Transfer:
import psycopg2
import csv
import io
try:
with psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost"
) as conn:
with conn.cursor() as cur:
# Create a test table
cur.execute("""
CREATE TABLE IF NOT EXISTS app.employees (
id SERIAL PRIMARY KEY,
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL,
email VARCHAR(100),
department VARCHAR(50),
salary NUMERIC(10, 2)
)
""")
# Create sample data
sample_data = [
['John', 'Smith', 'john@example.com', 'Engineering', 85000],
['Jane', 'Doe', 'jane@example.com', 'Marketing', 75000],
['Bob', 'Johnson', 'bob@example.com', 'Finance', 90000],
['Alice', 'Williams', 'alice@example.com', 'Engineering', 82000],
['Charlie', 'Brown', 'charlie@example.com', 'HR', 65000],
] * 1000 # Repeat data to create a larger dataset
# Method 1: Using CSV file-like object with COPY
csv_file = io.StringIO()
csv_writer = csv.writer(csv_file)
csv_writer.writerows(sample_data)
csv_file.seek(0)
cur.copy_from(
csv_file,
'app.employees',
sep=',',
columns=('first_name', 'last_name', 'email', 'department', 'salary')
)
# Check how many rows were inserted
cur.execute("SELECT COUNT(*) FROM app.employees")
count = cur.fetchone()[0]
print(f"Inserted {count} rows using COPY")
# Method 2: Using copy_expert with COPY
# First, truncate the table
cur.execute("TRUNCATE app.employees RESTART IDENTITY")
# Prepare data again
csv_file = io.StringIO()
csv_writer = csv.writer(csv_file)
csv_writer.writerows(sample_data)
csv_file.seek(0)
# Use copy_expert for more flexibility
cur.copy_expert(
sql="COPY app.employees (first_name, last_name, email, department, salary) FROM STDIN WITH CSV",
file=csv_file
)
# Check how many rows were inserted
cur.execute("SELECT COUNT(*) FROM app.employees")
count = cur.fetchone()[0]
print(f"Inserted {count} rows using copy_expert")
conn.commit()
except Exception as e:
print(f"Error: {e}")
Connection Pooling
For applications that handle multiple requests or operations, connection pooling can significantly improve performance by reusing database connections instead of establishing new ones for each operation.
Connection Pooling with psycopg2.pool:
import psycopg2
from psycopg2 import pool
import time
import threading
# Create a connection pool
try:
# Create a ThreadedConnectionPool
# minconn: minimum number of connections
# maxconn: maximum number of connections
connection_pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=10,
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost"
)
print("Connection pool created successfully")
# Function to execute a query using a connection from the pool
def execute_query(query, params=None):
conn = None
try:
# Get a connection from the pool
conn = connection_pool.getconn()
# Use the connection
with conn.cursor() as cur:
cur.execute(query, params or ())
return cur.fetchall()
finally:
# Return the connection to the pool
if conn:
connection_pool.putconn(conn)
# Function to simulate a task that uses the database
def task(task_id):
print(f"Task {task_id} starting")
result = execute_query(
"SELECT pg_sleep(1), %s as task_id",
(task_id,)
)
print(f"Task {task_id} completed")
# Simulate multiple concurrent tasks
threads = []
start_time = time.time()
# Create and start 5 threads
for i in range(5):
t = threading.Thread(target=task, args=(i,))
threads.append(t)
t.start()
# Wait for all threads to complete
for t in threads:
t.join()
end_time = time.time()
print(f"All tasks completed in {end_time - start_time:.2f} seconds")
except Exception as e:
print(f"Error: {e}")
finally:
# Close the connection pool
if 'connection_pool' in locals():
connection_pool.closeall()
print("Connection pool closed")
Connection Pooling Best Practices
- Pool Size: Set appropriate minconn and maxconn values based on your application's concurrency requirements
- Connection Lifecycle: Always return connections to the pool (putconn) when done using them
- Error Handling: Handle connection errors and implement retries if necessary
- Pool Monitoring: For production applications, monitor pool usage and adjust parameters if needed
- Clean Shutdown: Call closeall() when shutting down your application
- Alternative Libraries: Consider specialized libraries like SQLAlchemy's connection pool or external poolers like PgBouncer for high-load applications
Error Handling and Logging
Exception Hierarchy
psycopg2 defines a set of exceptions that correspond to different PostgreSQL error conditions:
Detailed Error Handling:
import psycopg2
from psycopg2 import errors
try:
with psycopg2.connect(
dbname="python_app_db",
user="pythonapp",
password="secure_password",
host="localhost"
) as conn:
with conn.cursor() as cur:
try:
# Try creating a table
cur.execute("""
CREATE TABLE app.users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL
)
""")
print("Table created successfully")
except errors.DuplicateTable:
print("Table already exists")
try:
# Try inserting a record
cur.execute("""
INSERT INTO app.users (username, email)
VALUES (%s, %s)
""", ('johndoe', 'john@example.com'))
print("User inserted successfully")
except errors.UniqueViolation as e:
print(f"Duplicate user: {e}")
try:
# Try a query with invalid SQL
cur.execute("SELECT * FROM nonexistent_table")
except errors.UndefinedTable:
print("The table does not exist")
try:
# Try a query with too many parameters
cur.execute("SELECT * FROM app.users WHERE id = %s", (1, 2))
except psycopg2.ProgrammingError as e:
print(f"Programming error: {e}")
conn.commit()
except psycopg2.OperationalError as e:
print(f"Connection error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Implementing a Database Logger
Database Logging Class:
import psycopg2
import logging
import datetime
import traceback
class DatabaseLogger:
def __init__(self, connection_params):
self.conn_params = connection_params
self.setup_log_table()
def setup_log_table(self):
"""Ensure the log table exists."""
try:
with psycopg2.connect(**self.conn_params) as conn:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS app.application_logs (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
level VARCHAR(10) NOT NULL,
message TEXT NOT NULL,
details TEXT,
source VARCHAR(255)
)
""")
conn.commit()
except Exception as e:
# Fall back to console logging if we can't set up the log table
logging.error(f"Failed to set up log table: {e}")
def log(self, level, message, details=None, source=None):
"""Log a message to the database."""
try:
with psycopg2.connect(**self.conn_params) as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO app.application_logs (level, message, details, source)
VALUES (%s, %s, %s, %s)
""", (level, message, details, source))
conn.commit()
except Exception as e:
# Fall back to console logging if database logging fails
logging.error(f"Failed to write to log table: {e}")
logging.error(f"Original log: [{level}] {message}")
def info(self, message, source=None):
"""Log an info message."""
self.log("INFO", message, source=source)
def warning(self, message, source=None):
"""Log a warning message."""
self.log("WARNING", message, source=source)
def error(self, message, details=None, source=None):
"""Log an error message."""
self.log("ERROR", message, details=details, source=source)
def exception(self, message, exception, source=None):
"""Log an exception with stack trace."""
stack_trace = traceback.format_exc()
details = f"Exception: {str(exception)}\n\nStack Trace:\n{stack_trace}"
self.log("ERROR", message, details=details, source=source)
def get_recent_logs(self, limit=50, level=None):
"""Retrieve recent logs from the database."""
try:
with psycopg2.connect(**self.conn_params) as conn:
with conn.cursor() as cur:
query = """
SELECT id, timestamp, level, message, details, source
FROM app.application_logs
"""
params = []
if level:
query += " WHERE level = %s"
params.append(level)
query += " ORDER BY timestamp DESC LIMIT %s"
params.append(limit)
cur.execute(query, params)
return cur.fetchall()
except Exception as e:
logging.error(f"Failed to retrieve logs: {e}")
return []
# Example usage
if __name__ == "__main__":
# Set up the database logger
db_logger = DatabaseLogger({
'dbname': 'python_app_db',
'user': 'pythonapp',
'password': 'secure_password',
'host': 'localhost'
})
# Log some messages
db_logger.info("Application started", "main.py")
try:
# Simulate an error
result = 10 / 0
except Exception as e:
db_logger.exception("Division by zero error", e, "error_handling_example")
db_logger.warning("Database connection pool running low", "db_module")
# Retrieve recent error logs
error_logs = db_logger.get_recent_logs(level="ERROR")
print(f"Recent Errors ({len(error_logs)}):")
for log in error_logs:
print(f"[{log[1]}] {log[3]}")
Practical Activities
Activity 1: Basic Database Operations
Create a Python script that performs the following operations using psycopg2:
- Connects to your PostgreSQL database
- Creates a table of your choice (e.g., books, products, or students)
- Inserts at least 5 records into the table
- Updates one of the records
- Deletes one of the records
- Retrieves and displays all records
- Performs a filtered query and displays the results
- Uses proper error handling throughout
Activity 2: Transaction Management
Create a banking simulation with the following components:
- Create tables for accounts and transactions
- Implement a transfer function that:
- Deducts money from one account
- Adds money to another account
- Records the transaction
- Uses a transaction to ensure atomicity
- Handles potential errors (e.g., insufficient funds)
- Test the function with both successful and failed transfers
- Implement a transaction history function that retrieves all transactions for an account
Activity 3: Data Import and Export
Create a script that demonstrates efficient data import and export:
- Generate or download a CSV file with sample data (at least 1000 rows)
- Create a table matching the CSV structure
- Import the CSV data using psycopg2's copy_from or copy_expert
- Perform some aggregation queries on the imported data
- Export the results to a new CSV file
- Compare the performance of different import methods (loop, executemany, copy)
Key Takeaways
- Python offers multiple abstraction levels for database access, from low-level drivers to high-level ORMs
- psycopg2 is the most widely used PostgreSQL adapter for Python, offering a comprehensive feature set
- Connection management is critical; using the with statement ensures proper resource cleanup
- Always use parameterized queries to prevent SQL injection vulnerabilities
- Different cursor types (dict cursor, named tuple cursor) provide convenient ways to access result data
- Transactions ensure data consistency for operations that should succeed or fail together
- Batch operations like executemany and copy_from offer substantial performance improvements for bulk data operations
- Connection pooling is essential for multi-threaded or high-concurrency applications
- Proper error handling with specific exception types helps diagnose and resolve issues
- psycopg2 supports the full range of PostgreSQL data types, including arrays, JSON, and more
Next Steps
In our next lecture, we'll explore Object-Relational Mapping (ORM) with SQLAlchemy, which provides a higher level of abstraction for database operations in Python.