PostgreSQL with Python
Reading time: ~40 minutes | Level: Intermediate → Engineering
Before reading further, predict what happens when this application is deployed:
import psycopg2
import os
def get_user(user_id: int):
# Opens a brand-new connection for every request
conn = psycopg2.connect(os.environ["DATABASE_URL"])
cursor = conn.cursor()
cursor.execute("SELECT id, name, email FROM users WHERE id = %s", (user_id,))
row = cursor.fetchone()
conn.close()
return row
With one user hitting the API, everything works fine. You deploy. Traffic climbs. At around 100 concurrent requests, the app starts returning errors:
OperationalError: FATAL: remaining connection slots are reserved
for non-replication superuser connections
Or worse - the application hangs indefinitely, waiting for a connection that never arrives.
What is happening? PostgreSQL has a hard limit on simultaneous connections (default: 100). Each psycopg2.connect() call opens a new TCP connection to the database server. Under load, 100 requests open 100 connections simultaneously. PostgreSQL rejects the 101st. The connection is also expensive to create - TCP handshake, SSL negotiation, process fork on the server - taking 5–20 ms each time.
The solution is connection pooling: maintain a fixed set of reusable connections and hand them out to requests as needed. When a request finishes, the connection goes back into the pool - not closed. Under load, requests wait briefly for a free connection rather than hammering the server with new ones.
This lesson explains why pooling is essential, how to implement it correctly with psycopg2, and how to handle everything PostgreSQL-specific that you will not find in SQLite: JSONB, arrays, UUIDs, bulk COPY, and the RETURNING clause.
What You Will Learn
- The difference between psycopg2 and psycopg3 and which to use for new projects
- Connecting with DSN strings and environment variables - credentials management done right
- Connection pooling with
ThreadedConnectionPool- how it works, why it matters, how to size it - Executing queries with PostgreSQL's
%splaceholder - Handling PostgreSQL-specific types: JSONB, arrays, UUID, timestamptz
- Bulk inserts with
COPY- orders of magnitude faster than repeatedINSERT - Getting inserted data back with
RETURNING - Production-grade connection configuration: timeouts, keepalives, SSL
Prerequisites
- Python Foundation: functions, context managers (
with), exceptions, environment variables - Module 07, Lesson 01: SQL Fundamentals (SELECT, INSERT, UPDATE, DELETE, transactions)
- Module 07, Lesson 02: SQLite with Python (DB-API 2.0 interface)
- A running PostgreSQL instance (local Docker or managed service)
Part 1 - psycopg2 vs psycopg3: Which to Use
Python has two major PostgreSQL drivers. Both implement DB-API 2.0. Both are production-battle-tested. They have different architectures and tradeoffs.
psycopg2
Released in 2010. The dominant PostgreSQL driver for Python for over a decade. Written in C as a wrapper around libpq (PostgreSQL's C client library).
pip install psycopg2-binary # development / CI
pip install psycopg2 # production (requires libpq-dev on the system)
psycopg2-binary bundles a pre-compiled libpq so you can install it without system dependencies. For production deployments where you control the OS, psycopg2 (without -binary) links against the system libpq for better performance and security patch coverage.
Strengths:
- Extremely stable and battle-tested
- Massive ecosystem of tutorials, StackOverflow answers, and compatible libraries
- Synchronous and well-understood threading model
- All major frameworks (Django, Flask, FastAPI sync mode) use it by default
Limitations:
- Does not support
async/awaitnatively (there ispsycopg2-asyncbut it is a workaround) - No active new development - maintenance mode only
- Type handling requires explicit adaptation for newer PostgreSQL types
psycopg3 (psycopg)
Released in 2021. A ground-up rewrite in pure Python (with an optional C extension for speed). The actively developed successor.
pip install psycopg # pure Python
pip install psycopg[binary] # with C extension (faster)
pip install psycopg[pool] # with built-in connection pool
Strengths:
- Full
async/awaitsupport - works natively with asyncio, FastAPI async, Starlette - Better type handling out of the box - Python dicts map to JSONB, lists to arrays
- Built-in
ConnectionPoolclass (inpsycopg_pool) - Active development with PostgreSQL 15+ features
Limitations:
- Smaller ecosystem than psycopg2
- Some legacy codebases and ORMs still assume psycopg2's API
Which to Use
| Scenario | Recommendation |
|---|---|
| New synchronous project | psycopg2 (stable ecosystem) or psycopg3 (modern) |
| New async project (FastAPI, aiohttp) | psycopg3 - native async is a first-class feature |
| Existing psycopg2 codebase | Stay on psycopg2; migrate incrementally |
| Greenfield with Python 3.10+ | psycopg3 - it is the future |
:::note DB-API 2.0 Compatibility
This lesson uses psycopg2 for all examples since it remains the most widely deployed driver. The query execution patterns - execute(), fetchone(), fetchall(), %s placeholders - are identical in psycopg3. The main differences are in connection pooling, async support, and some type adapter APIs.
:::
Part 2 - Connection Setup: DSN Strings, Environment Variables, SSL
Connecting with a DSN String
PostgreSQL connections are described by a DSN (Data Source Name) - a URL-style string that encodes all connection parameters:
postgresql://user:password@host:port/dbname?sslmode=require
import psycopg2
# From a DSN string
conn = psycopg2.connect("postgresql://appuser:secret@localhost:5432/myapp")
# Or as keyword arguments
conn = psycopg2.connect(
host="localhost",
port=5432,
dbname="myapp",
user="appuser",
password="secret",
)
Never Hardcode Credentials
:::warning Store Credentials in Environment Variables, Not Source Code Database credentials in source code get committed to version control. Once in git history, they are effectively public - even if you delete them in a later commit. The entire history is visible to anyone who clones the repository.
Wrong:
Correct:
import os
import psycopg2
DATABASE_URL = os.environ["DATABASE_URL"] # raises KeyError if not set - good
conn = psycopg2.connect(DATABASE_URL)
For production systems, go further: use a secrets manager (AWS Secrets Manager, HashiCorp Vault, GCP Secret Manager) that rotates credentials automatically and provides an audit log of who accessed what and when. :::
Environment Variable Pattern
import os
import psycopg2
def get_connection_params() -> dict:
"""
Read connection parameters from environment variables.
Supports both DATABASE_URL and individual PG* variables.
"""
database_url = os.environ.get("DATABASE_URL")
if database_url:
return {"dsn": database_url}
return {
"host": os.environ.get("PGHOST", "localhost"),
"port": int(os.environ.get("PGPORT", "5432")),
"dbname": os.environ.get("PGDATABASE", "myapp"),
"user": os.environ.get("PGUSER", "postgres"),
"password": os.environ["PGPASSWORD"], # required - no default
}
The PG* environment variable names are the PostgreSQL standard - psql and libpq also read them, so they work across your whole toolchain.
SSL Configuration
Always require SSL for connections to production databases:
import psycopg2
import os
conn = psycopg2.connect(
dsn=os.environ["DATABASE_URL"],
sslmode="require", # require encrypted connection
# sslmode="verify-full", # also verify the server certificate (best practice)
)
sslmode | Behaviour |
|---|---|
disable | No SSL - plaintext only. Never use in production. |
allow | SSL if the server requests it. |
prefer | SSL if available, plaintext fallback. Default. |
require | SSL required. Does not verify certificate. |
verify-ca | SSL + verify cert against trusted CA. |
verify-full | SSL + verify cert + verify hostname. Most secure. |
Part 3 - Connection Pooling
Why Pooling Matters
The pool lifecycle:
- On startup, the pool opens
minconnconnections to the database - When your code requests a connection, the pool hands out an idle one
- Your code uses it, then returns it to the pool - not closes it
- If all connections are in use and a new request arrives, the caller waits
- If the number of active requests exceeds
maxconn,getconn()raisesPoolError
ThreadedConnectionPool
psycopg2 ships with a built-in pool designed for multi-threaded applications:
import psycopg2.pool
import os
# Create once at application startup - not per-request
pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2, # open 2 connections immediately on startup
maxconn=10, # never exceed 10 simultaneous connections
dsn=os.environ["DATABASE_URL"],
sslmode="require",
)
ThreadedConnectionPool is thread-safe: getconn() and putconn() are protected by an internal lock, so multiple threads can borrow and return connections simultaneously without data races.
Using the Pool Safely
The most important rule with connection pools: always return the connection, even when an exception occurs. A connection that is borrowed but never returned is a leak - the pool will eventually exhaust its connections and every new request will block forever.
import psycopg2.pool
import contextlib
import os
pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
dsn=os.environ["DATABASE_URL"],
)
@contextlib.contextmanager
def get_db():
"""
Context manager that borrows a connection from the pool
and guarantees it is returned - even on exception.
"""
conn = pool.getconn()
try:
yield conn
conn.commit() # commit if no exception was raised
except Exception:
conn.rollback() # rollback on any error
raise
finally:
pool.putconn(conn) # ALWAYS return to pool
# Usage - clean and safe
def get_user(user_id: int):
with get_db() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT id, name, email FROM users WHERE id = %s",
(user_id,)
)
return cursor.fetchone()
Pool Sizing Formula
Choosing maxconn is not arbitrary. The standard formula from database performance engineering:
maxconn = (number of CPU cores on DB server) × 2 + number of effective spindles
For a modern cloud database instance (for example, 4 vCPUs, SSD storage):
maxconn = 4 × 2 + 1 = 9 → use 10 as a round number
For an application with multiple server processes (for example, 4 Gunicorn workers each with their own pool):
pool_size_per_worker = total_max_connections / number_of_workers
= 100 / 4 = 25 per worker
:::tip PgBouncer for Very High Concurrency If you have hundreds of application processes and cannot reduce connection count, use PgBouncer - a connection pooler that sits between your application and PostgreSQL. Your application connects to PgBouncer (which accepts thousands of connections), and PgBouncer maintains a small pool to the actual database. PgBouncer is the standard solution for Django and Flask deployments at scale. :::
Connection Pool Lifecycle Diagram
Part 4 - Executing Queries
PostgreSQL Placeholder: %s
PostgreSQL drivers use %s as the positional placeholder - not ? as in SQLite. This is the most common mistake when switching between drivers.
# SQLite - uses ?
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
# PostgreSQL (psycopg2/psycopg3) - uses %s
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
:::danger Never Use Python's % String Formatting in SQL
%s looks identical to Python's old % string-formatting operator, but they are completely different things. psycopg2 intercepts %s in SQL strings and handles escaping safely at the driver level. Python's % operator does not - it produces a plain concatenated string that enables SQL injection.
# SAFE - psycopg2 handles the binding
cursor.execute("SELECT * FROM users WHERE email = %s", (email,))
# DANGEROUS - SQL injection
cursor.execute("SELECT * FROM users WHERE email = %s" % email) # NEVER
# DANGEROUS - same problem with f-strings
cursor.execute(f"SELECT * FROM users WHERE email = '{email}'") # NEVER
Always pass parameters as the second argument tuple to execute(). Never concatenate or format them into the SQL string itself.
:::
Named Parameters
psycopg2 also supports named parameters using %(name)s:
cursor.execute(
"SELECT id, name FROM users WHERE email = %(email)s AND active = %(active)s",
)
execute(), fetchone(), fetchall()
with get_db() as conn:
with conn.cursor() as cursor:
# Single row - returns a tuple or None
cursor.execute("SELECT id, name, email FROM users WHERE id = %s", (1,))
row = cursor.fetchone()
if row:
user_id, name, email = row
# All rows - returns a list of tuples
cursor.execute("SELECT id, name FROM users WHERE active = %s", (True,))
rows = cursor.fetchall()
# Stream rows - memory-efficient for large result sets
cursor.execute("SELECT id, name FROM users")
for row in cursor:
process(row)
# fetchmany - batch processing
while True:
batch = cursor.fetchmany(500)
if not batch:
break
process_batch(batch)
cursor.rowcount and cursor.statusmessage
with get_db() as conn:
with conn.cursor() as cursor:
cursor.execute(
"UPDATE users SET last_seen = NOW() WHERE id = %s",
(user_id,)
)
print(cursor.rowcount) # e.g. 1
print(cursor.statusmessage) # e.g. "UPDATE 1"
RealDictCursor - Column Access by Name
By default psycopg2 returns plain tuples. For named column access, use RealDictCursor:
import psycopg2
import psycopg2.extras
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
cursor.execute("SELECT id, name, email FROM users WHERE id = %s", (1,))
row = cursor.fetchone()
print(row["name"]) # access by column name
Or apply it pool-wide by passing it at pool creation time:
pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
dsn=os.environ["DATABASE_URL"],
cursor_factory=psycopg2.extras.RealDictCursor,
)
Part 5 - PostgreSQL-Specific Types
PostgreSQL has rich native types that have no SQLite equivalent. psycopg2 includes adapters that convert between PostgreSQL wire types and Python objects automatically - but most require explicit registration at startup.
JSONB
PostgreSQL's JSONB type stores JSON in a binary decomposed form. It is indexed, queryable with operators, and faster to retrieve than plain JSON.
import psycopg2
import psycopg2.extras
# Register JSONB adapter once at startup
psycopg2.extras.register_default_jsonb(globally=True)
with get_db() as conn:
with conn.cursor() as cursor:
# Insert - use psycopg2.extras.Json() for automatic serialization
metadata = {
"tags": ["python", "backend"],
"version": 2,
"config": {"timeout": 30, "retries": 3}
}
cursor.execute(
"INSERT INTO products (name, metadata) VALUES (%s, %s)",
("Widget", psycopg2.extras.Json(metadata))
)
# ->> extracts a JSONB field as text
cursor.execute(
"SELECT name, metadata->>'version' AS version FROM products WHERE id = %s",
(product_id,)
)
# @> containment - find products tagged 'python'
cursor.execute(
"SELECT id, name FROM products WHERE metadata @> %s",
(psycopg2.extras.Json({"tags": ["python"]}),)
)
rows = cursor.fetchall()
:::tip psycopg2.extras.Json vs json.dumps
psycopg2.extras.Json(data) is a type adapter that handles serialization and correct SQL escaping for you. json.dumps() also works but bypasses the adapter layer and requires you to handle quoting manually. For JSONB columns, Json() is the cleaner, safer choice.
:::
Arrays
PostgreSQL arrays map naturally to Python lists. psycopg2 converts them automatically in both directions:
# Schema uses: tags TEXT[], scores INTEGER[]
with get_db() as conn:
with conn.cursor() as cursor:
# Python list → PostgreSQL array (automatic)
cursor.execute(
"INSERT INTO articles (title, tags) VALUES (%s, %s)",
("Python Arrays", ["python", "postgresql", "tutorial"])
)
# ANY() to find rows containing a specific value
cursor.execute(
"SELECT id, title FROM articles WHERE %s = ANY(tags)",
("postgresql",)
)
# Retrieve - PostgreSQL array → Python list (automatic)
cursor.execute("SELECT title, tags FROM articles WHERE id = %s", (1,))
row = cursor.fetchone()
title, tags = row
print(tags) # ['python', 'postgresql', 'tutorial']
# && operator - array overlap (any element in common)
cursor.execute(
"SELECT title FROM articles WHERE tags && %s",
(["python", "django"],)
)
UUID
PostgreSQL has a native UUID type. Register the adapter once and psycopg2 converts automatically:
import uuid
import psycopg2.extras
# Register UUID adapter once at startup
psycopg2.extras.register_uuid()
with get_db() as conn:
with conn.cursor() as cursor:
new_id = uuid.uuid4()
cursor.execute(
"INSERT INTO sessions (id, user_id) VALUES (%s, %s)",
(new_id, user_id)
)
# Returned as uuid.UUID - not a plain string
cursor.execute("SELECT id, user_id FROM sessions WHERE id = %s", (new_id,))
row = cursor.fetchone()
session_id = row[0]
print(type(session_id)) # <class 'uuid.UUID'>
print(str(session_id)) # '550e8400-e29b-41d4-a716-446655440000'
Timestamps with Timezone (timestamptz)
Always use TIMESTAMP WITH TIME ZONE - PostgreSQL stores in UTC and converts on display:
from datetime import datetime, timezone
with get_db() as conn:
with conn.cursor() as cursor:
# Insert timezone-aware datetime
now = datetime.now(timezone.utc)
cursor.execute(
"INSERT INTO events (user_id, occurred_at) VALUES (%s, %s)",
(user_id, now)
)
# PostgreSQL NOW() returns the current timestamptz
cursor.execute(
"INSERT INTO logs (message, logged_at) VALUES (%s, NOW())",
("App started",)
)
# Retrieve - psycopg2 returns a timezone-aware datetime
cursor.execute("SELECT occurred_at FROM events WHERE id = %s", (event_id,))
row = cursor.fetchone()
ts = row[0]
print(type(ts)) # <class 'datetime.datetime'>
print(ts.tzinfo) # UTC
:::warning Use Timezone-Aware Datetimes
Always use datetime.now(timezone.utc) instead of datetime.utcnow(). The latter returns a naive datetime with no tzinfo - PostgreSQL treats it as the session timezone, which may not be UTC in all environments. datetime.utcnow() is deprecated in Python 3.12 precisely because naive "UTC" datetimes cause subtle, hard-to-reproduce bugs.
:::
Part 6 - COPY for Bulk Inserts
Why COPY Is So Much Faster
For large datasets, INSERT is slow because each row goes through:
- Query parsing and planning overhead
- Network round-trip cost per statement
- A separate write-ahead log (WAL) record per row
COPY bypasses most of this - data is streamed as raw bytes in a single network transfer, with dramatically reduced per-row overhead.
Typical speedup: 50x–200x for datasets of 100,000+ rows.
copy_expert() with CSV
import io
import csv
import psycopg2
def bulk_insert_products(conn, products: list[dict]) -> int:
"""
Insert many products using COPY FROM STDIN.
Returns the number of rows inserted.
"""
buffer = io.StringIO()
writer = csv.writer(buffer)
for p in products:
writer.writerow([
p["name"],
p["price"],
p["category"],
p.get("stock", 0),
])
buffer.seek(0)
with conn.cursor() as cursor:
cursor.copy_expert(
"COPY products (name, price, category, stock) FROM STDIN WITH CSV",
buffer,
)
count = cursor.rowcount
conn.commit()
return count
copy_from() - Simpler Tab-Delimited Format
For simple cases where the data contains no tab characters:
import io
def bulk_insert_simple(conn, rows: list[tuple]) -> None:
"""rows is a list of (name, price) tuples."""
buffer = io.StringIO()
for name, price in rows:
buffer.write(f"{name}\t{price}\n")
buffer.seek(0)
with conn.cursor() as cursor:
cursor.copy_from(buffer, "products", columns=("name", "price"))
conn.commit()
:::tip Use copy_expert() for Production
copy_from() uses tab-delimited format - any tab character in your data will break parsing silently. copy_expert() with WITH (FORMAT csv) handles quoting and escaping correctly for arbitrary data. Prefer copy_expert() unless your data is guaranteed tab-free.
:::
Benchmark: INSERT vs COPY
import io
import time
def benchmark(conn, n_rows: int = 10_000):
rows = [
(f"Product {i}", round(9.99 + i * 0.01, 2), "test", i % 100)
for i in range(n_rows)
]
# Method 1: executemany INSERT
start = time.perf_counter()
with conn.cursor() as cur:
cur.executemany(
"INSERT INTO products (name, price, category, stock) VALUES (%s, %s, %s, %s)",
rows,
)
conn.rollback()
insert_time = time.perf_counter() - start
# Method 2: COPY FROM STDIN
start = time.perf_counter()
buf = io.StringIO()
for row in rows:
buf.write("\t".join(str(v) for v in row) + "\n")
buf.seek(0)
with conn.cursor() as cur:
cur.copy_from(buf, "products", columns=("name", "price", "category", "stock"))
conn.rollback()
copy_time = time.perf_counter() - start
print(f"INSERT: {insert_time:.2f}s | COPY: {copy_time:.2f}s | "
f"Speedup: {insert_time / copy_time:.1f}x")
Part 7 - The RETURNING Clause
A common anti-pattern: insert a row, then immediately query it to get its auto-generated id or server-set created_at. That is two round-trips when one will do.
PostgreSQL's RETURNING clause returns column values from the rows affected by INSERT, UPDATE, or DELETE - in the same statement:
# Anti-pattern: two round-trips
cursor.execute("INSERT INTO users (name, email) VALUES (%s, %s)", (name, email))
cursor.execute("SELECT id, created_at FROM users WHERE email = %s", (email,))
row = cursor.fetchone()
# Correct: one round-trip with RETURNING
cursor.execute(
"INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id, created_at",
(name, email)
)
row = cursor.fetchone()
user_id, created_at = row
RETURNING works on UPDATE and DELETE too:
# Get the full updated row back after a price change
cursor.execute(
"""
UPDATE products
SET price = price * 0.9,
updated_at = NOW()
WHERE id = %s
RETURNING id, name, price, updated_at
""",
(product_id,),
)
updated = cursor.fetchone()
# Get IDs of all expired sessions being deleted
cursor.execute(
"DELETE FROM sessions WHERE expires_at < NOW() RETURNING id, user_id",
)
expired = cursor.fetchall()
print(f"Removed {len(expired)} expired sessions")
Upsert with ON CONFLICT ... RETURNING
cursor.execute(
"""
INSERT INTO user_preferences (user_id, key, value)
VALUES (%s, %s, %s)
ON CONFLICT (user_id, key) DO UPDATE
SET value = EXCLUDED.value,
updated_at = NOW()
RETURNING id, user_id, key, value, updated_at
""",
(user_id, "theme", "dark"),
)
row = cursor.fetchone()
Part 8 - Connection Configuration for Production
Connection Timeouts
Connections that hang indefinitely are a common failure mode in production. Always set explicit timeouts:
pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
dsn=os.environ["DATABASE_URL"],
connect_timeout=5, # give up connecting after 5 seconds
options="-c statement_timeout=30000", # kill queries running longer than 30 seconds
)
Or set timeouts per-session after the connection is established:
with get_db() as conn:
with conn.cursor() as cursor:
cursor.execute("SET statement_timeout = '30s'")
cursor.execute("SET lock_timeout = '5s'")
# your query here
TCP Keepalives
Long-idle connections get silently dropped by firewalls, load balancers, and cloud NAT gateways. TCP keepalives probe idle connections and detect drops before your next query hangs for minutes:
pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
dsn=os.environ["DATABASE_URL"],
keepalives=1, # enable TCP keepalives
keepalives_idle=60, # probe after 60 seconds of idle
keepalives_interval=10, # retransmit every 10 seconds
keepalives_count=5, # give up after 5 failed probes
)
Handling Broken Connections in the Pool
Connections can go stale - the database server restarts, or a firewall silently drops the TCP session. A robust pool wrapper validates connections before use:
def _is_conn_alive(conn) -> bool:
"""Quick liveness check - executes a trivial query."""
try:
with conn.cursor() as cur:
cur.execute("SELECT 1")
return True
except psycopg2.Error:
return False
@contextlib.contextmanager
def get_db():
conn = pool.getconn()
try:
if not _is_conn_alive(conn):
pool.putconn(conn, close=True) # discard the dead connection
conn = pool.getconn() # get a fresh one
yield conn
conn.commit()
except Exception:
try:
conn.rollback()
except psycopg2.Error:
pass
raise
finally:
pool.putconn(conn)
:::note putconn(conn, close=True)
Passing close=True to putconn() tells the pool to close and remove the connection rather than returning it to the idle set. The pool will open a new connection the next time one is needed. This is the correct way to evict a dead connection - do not call conn.close() directly on a pooled connection.
:::
Full Working Example: Product Catalogue with Connection Pool
This example assembles all the patterns from this lesson into a production-style service that you can run locally.
"""
product_catalogue.py
A production-style PostgreSQL-backed product catalogue.
Demonstrates: connection pooling, parameterized queries,
JSONB, arrays, UUID, RETURNING, and COPY bulk insert.
"""
import io
import csv
import json
import logging
import os
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
import psycopg2
import psycopg2.extras
import psycopg2.pool
logger = logging.getLogger(__name__)
# Register type adapters once at module load
psycopg2.extras.register_uuid()
psycopg2.extras.register_default_jsonb(globally=True)
# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS products (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
price NUMERIC(10, 2) NOT NULL CHECK (price >= 0),
category TEXT NOT NULL,
tags TEXT[] NOT NULL DEFAULT '{}',
metadata JSONB NOT NULL DEFAULT '{}',
stock INTEGER NOT NULL DEFAULT 0 CHECK (stock >= 0),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_products_category
ON products (category);
CREATE INDEX IF NOT EXISTS idx_products_tags
ON products USING GIN (tags);
CREATE INDEX IF NOT EXISTS idx_products_metadata
ON products USING GIN (metadata);
"""
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
@dataclass
class Product:
id: Optional[uuid.UUID]
name: str
price: float
category: str
tags: list[str] = field(default_factory=list)
metadata: dict = field(default_factory=dict)
stock: int = 0
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@classmethod
def from_row(cls, row) -> "Product":
return cls(
id=row["id"],
name=row["name"],
price=float(row["price"]),
category=row["category"],
tags=list(row["tags"]) if row["tags"] else [],
metadata=dict(row["metadata"]) if row["metadata"] else {},
stock=row["stock"],
created_at=row["created_at"],
updated_at=row["updated_at"],
)
# ---------------------------------------------------------------------------
# Pool management
# ---------------------------------------------------------------------------
_pool: Optional[psycopg2.pool.ThreadedConnectionPool] = None
def init_pool(
dsn: Optional[str] = None,
min_conn: int = 2,
max_conn: int = 10,
) -> None:
"""Create the connection pool. Call once at application startup."""
global _pool
dsn = dsn or os.environ["DATABASE_URL"]
_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=min_conn,
maxconn=max_conn,
dsn=dsn,
connect_timeout=5,
keepalives=1,
keepalives_idle=60,
keepalives_interval=10,
keepalives_count=5,
cursor_factory=psycopg2.extras.RealDictCursor,
options="-c statement_timeout=30000",
)
logger.info("Pool ready (min=%d, max=%d)", min_conn, max_conn)
def close_pool() -> None:
"""Close all connections. Call at application shutdown."""
if _pool:
_pool.closeall()
logger.info("Pool closed")
@contextmanager
def get_db():
"""Borrow a connection; commit on clean exit, rollback on exception."""
conn = _pool.getconn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
_pool.putconn(conn)
# ---------------------------------------------------------------------------
# Schema setup
# ---------------------------------------------------------------------------
def init_db() -> None:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(SCHEMA_SQL)
# ---------------------------------------------------------------------------
# CRUD
# ---------------------------------------------------------------------------
def create_product(
name: str,
price: float,
category: str,
tags: Optional[list[str]] = None,
metadata: Optional[dict] = None,
stock: int = 0,
) -> Product:
"""Insert a product and return the full row via RETURNING."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO products (name, price, category, tags, metadata, stock)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id, name, price, category, tags, metadata,
stock, created_at, updated_at
""",
(
name,
price,
category,
tags or [],
psycopg2.extras.Json(metadata or {}),
stock,
),
)
return Product.from_row(cur.fetchone())
def get_product(product_id: uuid.UUID) -> Optional[Product]:
"""Fetch a single product by UUID. Returns None if not found."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, price, category, tags, metadata,
stock, created_at, updated_at
FROM products
WHERE id = %s
""",
(product_id,),
)
row = cur.fetchone()
return Product.from_row(row) if row else None
def list_products_by_category(category: str) -> list[Product]:
"""Return all products in a category ordered by price ascending."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, price, category, tags, metadata,
stock, created_at, updated_at
FROM products
WHERE category = %s
ORDER BY price ASC
""",
(category,),
)
return [Product.from_row(row) for row in cur.fetchall()]
def search_by_tag(tag: str) -> list[Product]:
"""Find all products containing a specific tag (uses GIN index)."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, price, category, tags, metadata,
stock, created_at, updated_at
FROM products
WHERE %s = ANY(tags)
ORDER BY name
""",
(tag,),
)
return [Product.from_row(row) for row in cur.fetchall()]
def search_by_metadata(criteria: dict) -> list[Product]:
"""Find products where JSONB metadata contains the given key-value pairs."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, price, category, tags, metadata,
stock, created_at, updated_at
FROM products
WHERE metadata @> %s
ORDER BY name
""",
(psycopg2.extras.Json(criteria),),
)
return [Product.from_row(row) for row in cur.fetchall()]
def update_stock(product_id: uuid.UUID, delta: int) -> Optional[Product]:
"""
Adjust stock by delta (positive = add, negative = remove).
Returns the updated product, or None if not found.
The CHECK constraint on the column prevents stock from going negative.
"""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE products
SET stock = stock + %s,
updated_at = NOW()
WHERE id = %s
RETURNING id, name, price, category, tags, metadata,
stock, created_at, updated_at
""",
(delta, product_id),
)
row = cur.fetchone()
return Product.from_row(row) if row else None
def delete_product(product_id: uuid.UUID) -> bool:
"""Delete a product. Returns True if a row was deleted."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM products WHERE id = %s RETURNING id",
(product_id,),
)
return cur.fetchone() is not None
# ---------------------------------------------------------------------------
# Bulk insert via COPY
# ---------------------------------------------------------------------------
def bulk_create_products(products: list[dict]) -> int:
"""
Insert many products using COPY FROM STDIN - far faster than INSERT for
large datasets. Returns the number of rows inserted.
Each dict must have: name, price, category.
Optional keys: tags (list[str]), metadata (dict), stock (int).
"""
buffer = io.StringIO()
writer = csv.writer(buffer, quoting=csv.QUOTE_MINIMAL)
for p in products:
# Build PostgreSQL array literal: {tag1,tag2}
tags_literal = "{" + ",".join(p.get("tags", [])) + "}"
writer.writerow([
p["name"],
p["price"],
p["category"],
tags_literal,
json.dumps(p.get("metadata", {})),
p.get("stock", 0),
])
buffer.seek(0)
with get_db() as conn:
with conn.cursor() as cur:
cur.copy_expert(
"""
COPY products (name, price, category, tags, metadata, stock)
FROM STDIN
WITH (FORMAT csv)
""",
buffer,
)
return cur.rowcount
# ---------------------------------------------------------------------------
# Demo
# ---------------------------------------------------------------------------
def demo():
logging.basicConfig(level=logging.INFO)
init_pool(
dsn="postgresql://postgres:postgres@localhost:5432/catalogue",
min_conn=2,
max_conn=10,
)
init_db()
# Single inserts with RETURNING
widget = create_product(
name="Python Widget",
price=29.99,
category="software",
tags=["python", "utility", "open-source"],
metadata={"version": "1.0", "license": "MIT", "python_requires": ">=3.10"},
stock=500,
)
print(f"Created: {widget.name} id={widget.id}")
create_product(
name="Async Gadget",
price=49.99,
category="software",
tags=["python", "async", "networking"],
metadata={"version": "2.1", "license": "Apache-2.0"},
stock=250,
)
# Bulk insert via COPY
catalogue = [
{
"name": f"Bulk Item {i}",
"price": round(5.00 + i * 0.10, 2),
"category": "bulk",
"tags": ["imported", "wholesale"],
"stock": max(0, 1000 - i),
}
for i in range(10_000)
]
n = bulk_create_products(catalogue)
print(f"Bulk inserted {n} items via COPY")
# Query by category
sw = list_products_by_category("software")
print(f"\nSoftware products ({len(sw)}):")
for p in sw:
print(f" {p.name:<22} ${p.price:.2f} tags={p.tags}")
# Search by tag - uses GIN index on tags column
py_products = search_by_tag("python")
print(f"\nProducts tagged 'python': {len(py_products)}")
# Search by JSONB containment - uses GIN index on metadata column
mit_products = search_by_metadata({"license": "MIT"})
print(f"Products with MIT license: {len(mit_products)}")
# Update stock - RETURNING gives us the updated row in one round-trip
updated = update_stock(widget.id, -10)
if updated:
print(f"\nStock after -10: {updated.stock} updated_at={updated.updated_at}")
close_pool()
if __name__ == "__main__":
demo()
Local development setup with Docker and psql verification queries
# Start PostgreSQL 16 in Docker
docker run -d \
--name pg-dev \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=catalogue \
-p 5432:5432 \
postgres:16
# Wait for readiness
docker exec pg-dev pg_isready -U postgres
# Set the connection string environment variable
export DATABASE_URL="postgresql://postgres:postgres@localhost:5432/catalogue"
# Install the driver and run the demo
pip install psycopg2-binary
python product_catalogue.py
-- Connect with psql
psql $DATABASE_URL
-- Inspect the table schema
\d products
-- List all indexes
\di idx_products_*
-- JSONB containment query (uses GIN index)
SELECT name, metadata->>'license' AS license
FROM products
WHERE metadata ? 'license'
ORDER BY name
LIMIT 5;
-- Array overlap query (uses GIN index)
SELECT name, tags
FROM products
WHERE tags && ARRAY['python', 'async']
ORDER BY name;
-- Category summary
SELECT category,
COUNT(*) AS n_products,
SUM(stock) AS total_stock,
ROUND(AVG(price), 2) AS avg_price
FROM products
GROUP BY category
ORDER BY n_products DESC;
Graded Practice Challenges
Beginner \text{---} Parameterized Price Range Query
Write a function get_products_in_price_range(min_price, max_price) that returns all products with a price between min_price and max_price (inclusive), ordered by price ascending.
Requirements:
- Use
\%sparameterized placeholders \text{---} no string formatting in SQL - Return a list of
Productobjects usingProduct.from_row() - Handle the case where no products exist in the range (return an empty list)
Show Solution
def get_products_in_price_range(
min_price: float,
max_price: float,
) -> list[Product]:
"""
Return products in the given price range, cheapest first.
BETWEEN is inclusive on both ends: price >= min AND price <= max.
"""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, price, category, tags, metadata,
stock, created_at, updated_at
FROM products
WHERE price BETWEEN \%s AND \%s
ORDER BY price ASC
""",
(min_price, max_price),
)
return [Product.from_row(row) for row in cur.fetchall()]
# Usage
budget = get_products_in_price_range(10.00, 50.00)
for p in budget:
print(f"${p.price:.2f} {p.name}")
Key points:
- Both
min_priceandmax_priceare passed in the parameter tuple - never interpolated into the string BETWEEN %s AND %sis equivalent toprice >= %s AND price <= %s- inclusive on both endsfetchall()returns an empty list when no rows match - no special-case needed
Intermediate - Inventory Alert with Aggregation
Write a function get_low_stock_report(threshold) that returns a summary of every category whose total stock falls below threshold. Each item in the returned list should be a dict with:
category- the category nametotal_stock- sum of stock across all products in the categoryproduct_count- number of products in the categorymin_stock- the lowest individual stock value in the categorylowest_stock_product- the name of the product with the lowest stock
Use a single SQL query with a CTE, GROUP BY, HAVING, and a correlated subquery.
Show Solution
def get_low_stock_report(threshold: int = 100) -> list[dict]:
"""
Return categories where total stock is below threshold.
Uses a CTE with HAVING for group-level filtering and a correlated
subquery to identify the lowest-stock product in each category.
"""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
WITH category_stats AS (
SELECT
p.category,
SUM(p.stock) AS total_stock,
COUNT(*) AS product_count,
MIN(p.stock) AS min_stock,
(
SELECT p2.name
FROM products p2
WHERE p2.category = p.category
ORDER BY p2.stock ASC
LIMIT 1
) AS lowest_stock_product
FROM products p
GROUP BY p.category
HAVING SUM(p.stock) < %s
)
SELECT
category,
total_stock,
product_count,
min_stock,
lowest_stock_product
FROM category_stats
ORDER BY total_stock ASC
""",
(threshold,),
)
return [dict(row) for row in cur.fetchall()]
# Usage
alerts = get_low_stock_report(threshold=500)
if not alerts:
print("All categories have adequate stock")
else:
header = f"{'Category':<15} {'Total':>7} {'Products':>8} Lowest product"
print(header)
print("-" * len(header))
for a in alerts:
print(
f"{a['category']:<15} {a['total_stock']:>7} "
f"{a['product_count']:>8} "
f"{a['lowest_stock_product']} ({a['min_stock']} units)"
)
Key points:
- The
%splaceholder inHAVING SUM(p.stock) < %sis fully supported -HAVINGaccepts parameters just likeWHERE - The correlated subquery for
lowest_stock_productexecutes once per group - acceptable for a small number of categories; for very large category counts, a window function (FIRST_VALUE) would be more efficient dict(row)works because the pool usesRealDictCursor, which returns dict-likeRealDictRowobjects
Advanced - Connection Pool with Health Monitoring
Build a MonitoredPool class that wraps ThreadedConnectionPool and adds:
- A
checkout_countmetric - total connections ever borrowed across all threads - A
total_wait_msmetric - cumulative milliseconds spent waiting for connections - An
error_countmetric - number of connections that caused a rollback - A
health_check()method that returns a status dict including a live database probe - Automatic retry with exponential backoff when
getconn()raisesPoolError- up to 3 retries with 50 ms, 100 ms, and 200 ms delays between attempts
Show Solution
import time
import threading
import contextlib
from dataclasses import dataclass, field
import psycopg2
import psycopg2.pool
import psycopg2.extras
import os
@dataclass
class _Metrics:
"""Thread-safe counters for pool observability."""
checkout_count: int = 0
total_wait_ms: float = 0.0
error_count: int = 0
_lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
def record_checkout(self, wait_ms: float) -> None:
with self._lock:
self.checkout_count += 1
self.total_wait_ms += wait_ms
def record_error(self) -> None:
with self._lock:
self.error_count += 1
def snapshot(self) -> dict:
with self._lock:
avg = (
self.total_wait_ms / self.checkout_count
if self.checkout_count > 0 else 0.0
)
return {
"checkout_count": self.checkout_count,
"total_wait_ms": round(self.total_wait_ms, 2),
"avg_wait_ms": round(avg, 2),
"error_count": self.error_count,
}
class MonitoredPool:
"""
ThreadedConnectionPool with metrics collection and
automatic retry on pool exhaustion.
"""
_RETRY_DELAYS_MS = [50, 100, 200] # exponential-ish backoff
def __init__(self, min_conn: int, max_conn: int, **kwargs):
self._pool = psycopg2.pool.ThreadedConnectionPool(
min_conn, max_conn, **kwargs
)
self._metrics = _Metrics()
self._min_conn = min_conn
self._max_conn = max_conn
def _getconn_with_retry(self):
"""Attempt getconn() with backoff on PoolError exhaustion."""
last_exc = None
for delay_ms in [0] + self._RETRY_DELAYS_MS:
if delay_ms:
time.sleep(delay_ms / 1000)
try:
return self._pool.getconn()
except psycopg2.pool.PoolError as exc:
last_exc = exc
raise last_exc
@contextlib.contextmanager
def connection(self):
"""
Borrow a connection with metrics tracking.
Commits on success, rolls back on exception, always returns to pool.
"""
t0 = time.perf_counter()
conn = self._getconn_with_retry()
wait_ms = (time.perf_counter() - t0) * 1000
self._metrics.record_checkout(wait_ms)
try:
yield conn
conn.commit()
except Exception:
self._metrics.record_error()
try:
conn.rollback()
except psycopg2.Error:
pass # connection may be dead; putconn will handle cleanup
raise
finally:
self._pool.putconn(conn)
def health_check(self) -> dict:
"""
Return pool metrics plus a live database connectivity probe.
Safe to expose from a /healthz or /ready HTTP endpoint.
"""
metrics = self._metrics.snapshot()
db_ok = False
try:
with self.connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT 1")
db_ok = True
except Exception:
pass
return {
"status": "healthy" if db_ok else "degraded",
"db_reachable": db_ok,
"pool_min": self._min_conn,
"pool_max": self._max_conn,
**metrics,
}
def close(self) -> None:
"""Close all connections in the pool."""
self._pool.closeall()
# --- Demo ---
if __name__ == "__main__":
import json
pool = MonitoredPool(
min_conn=2,
max_conn=5,
dsn=os.environ.get(
"DATABASE_URL",
"postgresql://postgres:postgres@localhost:5432/catalogue",
),
cursor_factory=psycopg2.extras.RealDictCursor,
)
def simulate_request(request_id: int) -> None:
try:
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) AS n FROM products")
row = cur.fetchone()
print(f"Request {request_id:>3}: {row['n']} products")
except Exception as exc:
print(f"Request {request_id:>3} failed: {exc}")
threads = [
threading.Thread(target=simulate_request, args=(i,))
for i in range(20)
]
for t in threads:
t.start()
for t in threads:
t.join()
print("\nPool health report:")
print(json.dumps(pool.health_check(), indent=2))
pool.close()
Key points:
_Metricscarries its ownthreading.Lock- counter updates are atomic without an outer lock onMonitoredPool- The retry loop uses
[0] + _RETRY_DELAYS_MSso the first attempt is immediate and the loop body stays uniform for all attempts health_check()reusesself.connection()- the liveness probe is a real checkout and shows up in metrics, making it a true end-to-end test_getconn_with_retry()is a separate method rather than inline logic - easier to unit-test in isolationpsycopg2.Error(notException) is caught on the rollback path to avoid accidentally swallowing unrelated errors
Key Takeaways
- psycopg2 is the battle-tested standard driver; psycopg3 is the modern successor with native
async/await- choose psycopg3 for new projects where async matters - Store credentials in environment variables (
DATABASE_URL,PGPASSWORD) or a secrets manager - never in source code or config files committed to version control - PostgreSQL uses
%splaceholders in psycopg2 and psycopg3 - not?as in SQLite; always pass parameters as the second argument tuple toexecute(), never use f-strings or%string formatting - Connection pooling with
ThreadedConnectionPoolis mandatory for multi-threaded production services - opening a new connection per request exhausts PostgreSQL's connection limit under load - Size your pool with the formula
maxconn = (DB CPU cores × 2) + spindles- for a 4-core cloud instance, 10 connections per application process is a solid starting point - Use the
RETURNINGclause to retrieve inserted or updated row data in a single round-trip - eliminates the common insert-then-select anti-pattern COPY FROM STDINviacopy_expert()is 50–200x faster thanexecutemany()for large bulk inserts- PostgreSQL types map cleanly to Python:
JSONB↔dict(viaJson()),TEXT[]↔list,UUID↔uuid.UUID(afterregister_uuid()),TIMESTAMPTZ↔ timezone-awaredatetime - Always configure connection timeouts (
connect_timeout,statement_timeout) and TCP keepalives in production - silent connection drops from firewalls and cloud networking are among the most common database failure modes
What's Next
Lesson 04 covers Transactions - the mechanism that makes multiple database operations atomic. You have already used transactions implicitly (conn.commit(), conn.rollback()), but production systems require explicit control: savepoints for partial rollbacks, isolation levels for concurrent reads (READ COMMITTED vs REPEATABLE READ vs SERIALIZABLE), and advisory locks for application-level coordination across processes.
Understanding transaction isolation is the difference between a database layer that occasionally loses or duplicates data under concurrent load and one that guarantees correctness regardless of how many application instances are running simultaneously.
