Skip to main content

PostgreSQL with Python

Reading time: ~40 minutes | Level: Intermediate → Engineering

Before reading further, predict what happens when this application is deployed:

import psycopg2
import os

def get_user(user_id: int):
# Opens a brand-new connection for every request
conn = psycopg2.connect(os.environ["DATABASE_URL"])
cursor = conn.cursor()
cursor.execute("SELECT id, name, email FROM users WHERE id = %s", (user_id,))
row = cursor.fetchone()
conn.close()
return row

With one user hitting the API, everything works fine. You deploy. Traffic climbs. At around 100 concurrent requests, the app starts returning errors:

OperationalError: FATAL: remaining connection slots are reserved
for non-replication superuser connections

Or worse - the application hangs indefinitely, waiting for a connection that never arrives.

What is happening? PostgreSQL has a hard limit on simultaneous connections (default: 100). Each psycopg2.connect() call opens a new TCP connection to the database server. Under load, 100 requests open 100 connections simultaneously. PostgreSQL rejects the 101st. The connection is also expensive to create - TCP handshake, SSL negotiation, process fork on the server - taking 5–20 ms each time.

The solution is connection pooling: maintain a fixed set of reusable connections and hand them out to requests as needed. When a request finishes, the connection goes back into the pool - not closed. Under load, requests wait briefly for a free connection rather than hammering the server with new ones.

This lesson explains why pooling is essential, how to implement it correctly with psycopg2, and how to handle everything PostgreSQL-specific that you will not find in SQLite: JSONB, arrays, UUIDs, bulk COPY, and the RETURNING clause.

What You Will Learn

  • The difference between psycopg2 and psycopg3 and which to use for new projects
  • Connecting with DSN strings and environment variables - credentials management done right
  • Connection pooling with ThreadedConnectionPool - how it works, why it matters, how to size it
  • Executing queries with PostgreSQL's %s placeholder
  • Handling PostgreSQL-specific types: JSONB, arrays, UUID, timestamptz
  • Bulk inserts with COPY - orders of magnitude faster than repeated INSERT
  • Getting inserted data back with RETURNING
  • Production-grade connection configuration: timeouts, keepalives, SSL

Prerequisites

  • Python Foundation: functions, context managers (with), exceptions, environment variables
  • Module 07, Lesson 01: SQL Fundamentals (SELECT, INSERT, UPDATE, DELETE, transactions)
  • Module 07, Lesson 02: SQLite with Python (DB-API 2.0 interface)
  • A running PostgreSQL instance (local Docker or managed service)

Part 1 - psycopg2 vs psycopg3: Which to Use

Python has two major PostgreSQL drivers. Both implement DB-API 2.0. Both are production-battle-tested. They have different architectures and tradeoffs.

psycopg2

Released in 2010. The dominant PostgreSQL driver for Python for over a decade. Written in C as a wrapper around libpq (PostgreSQL's C client library).

pip install psycopg2-binary # development / CI
pip install psycopg2 # production (requires libpq-dev on the system)

psycopg2-binary bundles a pre-compiled libpq so you can install it without system dependencies. For production deployments where you control the OS, psycopg2 (without -binary) links against the system libpq for better performance and security patch coverage.

Strengths:

  • Extremely stable and battle-tested
  • Massive ecosystem of tutorials, StackOverflow answers, and compatible libraries
  • Synchronous and well-understood threading model
  • All major frameworks (Django, Flask, FastAPI sync mode) use it by default

Limitations:

  • Does not support async/await natively (there is psycopg2-async but it is a workaround)
  • No active new development - maintenance mode only
  • Type handling requires explicit adaptation for newer PostgreSQL types

psycopg3 (psycopg)

Released in 2021. A ground-up rewrite in pure Python (with an optional C extension for speed). The actively developed successor.

pip install psycopg # pure Python
pip install psycopg[binary] # with C extension (faster)
pip install psycopg[pool] # with built-in connection pool

Strengths:

  • Full async/await support - works natively with asyncio, FastAPI async, Starlette
  • Better type handling out of the box - Python dicts map to JSONB, lists to arrays
  • Built-in ConnectionPool class (in psycopg_pool)
  • Active development with PostgreSQL 15+ features

Limitations:

  • Smaller ecosystem than psycopg2
  • Some legacy codebases and ORMs still assume psycopg2's API

Which to Use

ScenarioRecommendation
New synchronous projectpsycopg2 (stable ecosystem) or psycopg3 (modern)
New async project (FastAPI, aiohttp)psycopg3 - native async is a first-class feature
Existing psycopg2 codebaseStay on psycopg2; migrate incrementally
Greenfield with Python 3.10+psycopg3 - it is the future

:::note DB-API 2.0 Compatibility This lesson uses psycopg2 for all examples since it remains the most widely deployed driver. The query execution patterns - execute(), fetchone(), fetchall(), %s placeholders - are identical in psycopg3. The main differences are in connection pooling, async support, and some type adapter APIs. :::

Part 2 - Connection Setup: DSN Strings, Environment Variables, SSL

Connecting with a DSN String

PostgreSQL connections are described by a DSN (Data Source Name) - a URL-style string that encodes all connection parameters:

postgresql://user:password@host:port/dbname?sslmode=require
import psycopg2

# From a DSN string
conn = psycopg2.connect("postgresql://appuser:secret@localhost:5432/myapp")

# Or as keyword arguments
conn = psycopg2.connect(
host="localhost",
port=5432,
dbname="myapp",
user="appuser",
password="secret",
)

Never Hardcode Credentials

:::warning Store Credentials in Environment Variables, Not Source Code Database credentials in source code get committed to version control. Once in git history, they are effectively public - even if you delete them in a later commit. The entire history is visible to anyone who clones the repository.

Wrong:

conn = psycopg2.connect("postgresql://admin:[email protected]/app")

Correct:

import os
import psycopg2

DATABASE_URL = os.environ["DATABASE_URL"] # raises KeyError if not set - good
conn = psycopg2.connect(DATABASE_URL)

For production systems, go further: use a secrets manager (AWS Secrets Manager, HashiCorp Vault, GCP Secret Manager) that rotates credentials automatically and provides an audit log of who accessed what and when. :::

Environment Variable Pattern

import os
import psycopg2

def get_connection_params() -> dict:
"""
Read connection parameters from environment variables.
Supports both DATABASE_URL and individual PG* variables.
"""
database_url = os.environ.get("DATABASE_URL")
if database_url:
return {"dsn": database_url}

return {
"host": os.environ.get("PGHOST", "localhost"),
"port": int(os.environ.get("PGPORT", "5432")),
"dbname": os.environ.get("PGDATABASE", "myapp"),
"user": os.environ.get("PGUSER", "postgres"),
"password": os.environ["PGPASSWORD"], # required - no default
}

The PG* environment variable names are the PostgreSQL standard - psql and libpq also read them, so they work across your whole toolchain.

SSL Configuration

Always require SSL for connections to production databases:

import psycopg2
import os

conn = psycopg2.connect(
dsn=os.environ["DATABASE_URL"],
sslmode="require", # require encrypted connection
# sslmode="verify-full", # also verify the server certificate (best practice)
)
sslmodeBehaviour
disableNo SSL - plaintext only. Never use in production.
allowSSL if the server requests it.
preferSSL if available, plaintext fallback. Default.
requireSSL required. Does not verify certificate.
verify-caSSL + verify cert against trusted CA.
verify-fullSSL + verify cert + verify hostname. Most secure.

Part 3 - Connection Pooling

Why Pooling Matters

The pool lifecycle:

  1. On startup, the pool opens minconn connections to the database
  2. When your code requests a connection, the pool hands out an idle one
  3. Your code uses it, then returns it to the pool - not closes it
  4. If all connections are in use and a new request arrives, the caller waits
  5. If the number of active requests exceeds maxconn, getconn() raises PoolError

ThreadedConnectionPool

psycopg2 ships with a built-in pool designed for multi-threaded applications:

import psycopg2.pool
import os

# Create once at application startup - not per-request
pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2, # open 2 connections immediately on startup
maxconn=10, # never exceed 10 simultaneous connections
dsn=os.environ["DATABASE_URL"],
sslmode="require",
)

ThreadedConnectionPool is thread-safe: getconn() and putconn() are protected by an internal lock, so multiple threads can borrow and return connections simultaneously without data races.

Using the Pool Safely

The most important rule with connection pools: always return the connection, even when an exception occurs. A connection that is borrowed but never returned is a leak - the pool will eventually exhaust its connections and every new request will block forever.

import psycopg2.pool
import contextlib
import os

pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
dsn=os.environ["DATABASE_URL"],
)


@contextlib.contextmanager
def get_db():
"""
Context manager that borrows a connection from the pool
and guarantees it is returned - even on exception.
"""
conn = pool.getconn()
try:
yield conn
conn.commit() # commit if no exception was raised
except Exception:
conn.rollback() # rollback on any error
raise
finally:
pool.putconn(conn) # ALWAYS return to pool


# Usage - clean and safe
def get_user(user_id: int):
with get_db() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT id, name, email FROM users WHERE id = %s",
(user_id,)
)
return cursor.fetchone()

Pool Sizing Formula

Choosing maxconn is not arbitrary. The standard formula from database performance engineering:

maxconn = (number of CPU cores on DB server) × 2 + number of effective spindles

For a modern cloud database instance (for example, 4 vCPUs, SSD storage):

maxconn = 4 × 2 + 1 = 9 → use 10 as a round number

For an application with multiple server processes (for example, 4 Gunicorn workers each with their own pool):

pool_size_per_worker = total_max_connections / number_of_workers
= 100 / 4 = 25 per worker

:::tip PgBouncer for Very High Concurrency If you have hundreds of application processes and cannot reduce connection count, use PgBouncer - a connection pooler that sits between your application and PostgreSQL. Your application connects to PgBouncer (which accepts thousands of connections), and PgBouncer maintains a small pool to the actual database. PgBouncer is the standard solution for Django and Flask deployments at scale. :::

Connection Pool Lifecycle Diagram

Part 4 - Executing Queries

PostgreSQL Placeholder: %s

PostgreSQL drivers use %s as the positional placeholder - not ? as in SQLite. This is the most common mistake when switching between drivers.

# SQLite - uses ?
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))

# PostgreSQL (psycopg2/psycopg3) - uses %s
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))

:::danger Never Use Python's % String Formatting in SQL %s looks identical to Python's old % string-formatting operator, but they are completely different things. psycopg2 intercepts %s in SQL strings and handles escaping safely at the driver level. Python's % operator does not - it produces a plain concatenated string that enables SQL injection.

# SAFE - psycopg2 handles the binding
cursor.execute("SELECT * FROM users WHERE email = %s", (email,))

# DANGEROUS - SQL injection
cursor.execute("SELECT * FROM users WHERE email = %s" % email) # NEVER

# DANGEROUS - same problem with f-strings
cursor.execute(f"SELECT * FROM users WHERE email = '{email}'") # NEVER

Always pass parameters as the second argument tuple to execute(). Never concatenate or format them into the SQL string itself. :::

Named Parameters

psycopg2 also supports named parameters using %(name)s:

cursor.execute(
"SELECT id, name FROM users WHERE email = %(email)s AND active = %(active)s",
{"email": "[email protected]", "active": True}
)

execute(), fetchone(), fetchall()

with get_db() as conn:
with conn.cursor() as cursor:

# Single row - returns a tuple or None
cursor.execute("SELECT id, name, email FROM users WHERE id = %s", (1,))
row = cursor.fetchone()
if row:
user_id, name, email = row

# All rows - returns a list of tuples
cursor.execute("SELECT id, name FROM users WHERE active = %s", (True,))
rows = cursor.fetchall()

# Stream rows - memory-efficient for large result sets
cursor.execute("SELECT id, name FROM users")
for row in cursor:
process(row)

# fetchmany - batch processing
while True:
batch = cursor.fetchmany(500)
if not batch:
break
process_batch(batch)

cursor.rowcount and cursor.statusmessage

with get_db() as conn:
with conn.cursor() as cursor:
cursor.execute(
"UPDATE users SET last_seen = NOW() WHERE id = %s",
(user_id,)
)
print(cursor.rowcount) # e.g. 1
print(cursor.statusmessage) # e.g. "UPDATE 1"

RealDictCursor - Column Access by Name

By default psycopg2 returns plain tuples. For named column access, use RealDictCursor:

import psycopg2
import psycopg2.extras

with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
cursor.execute("SELECT id, name, email FROM users WHERE id = %s", (1,))
row = cursor.fetchone()
print(row["name"]) # access by column name
print(dict(row)) # {'id': 1, 'name': 'Alice', 'email': '[email protected]'}

Or apply it pool-wide by passing it at pool creation time:

pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
dsn=os.environ["DATABASE_URL"],
cursor_factory=psycopg2.extras.RealDictCursor,
)

Part 5 - PostgreSQL-Specific Types

PostgreSQL has rich native types that have no SQLite equivalent. psycopg2 includes adapters that convert between PostgreSQL wire types and Python objects automatically - but most require explicit registration at startup.

JSONB

PostgreSQL's JSONB type stores JSON in a binary decomposed form. It is indexed, queryable with operators, and faster to retrieve than plain JSON.

import psycopg2
import psycopg2.extras

# Register JSONB adapter once at startup
psycopg2.extras.register_default_jsonb(globally=True)

with get_db() as conn:
with conn.cursor() as cursor:
# Insert - use psycopg2.extras.Json() for automatic serialization
metadata = {
"tags": ["python", "backend"],
"version": 2,
"config": {"timeout": 30, "retries": 3}
}
cursor.execute(
"INSERT INTO products (name, metadata) VALUES (%s, %s)",
("Widget", psycopg2.extras.Json(metadata))
)

# ->> extracts a JSONB field as text
cursor.execute(
"SELECT name, metadata->>'version' AS version FROM products WHERE id = %s",
(product_id,)
)

# @> containment - find products tagged 'python'
cursor.execute(
"SELECT id, name FROM products WHERE metadata @> %s",
(psycopg2.extras.Json({"tags": ["python"]}),)
)
rows = cursor.fetchall()

:::tip psycopg2.extras.Json vs json.dumps psycopg2.extras.Json(data) is a type adapter that handles serialization and correct SQL escaping for you. json.dumps() also works but bypasses the adapter layer and requires you to handle quoting manually. For JSONB columns, Json() is the cleaner, safer choice. :::

Arrays

PostgreSQL arrays map naturally to Python lists. psycopg2 converts them automatically in both directions:

# Schema uses: tags TEXT[], scores INTEGER[]
with get_db() as conn:
with conn.cursor() as cursor:
# Python list → PostgreSQL array (automatic)
cursor.execute(
"INSERT INTO articles (title, tags) VALUES (%s, %s)",
("Python Arrays", ["python", "postgresql", "tutorial"])
)

# ANY() to find rows containing a specific value
cursor.execute(
"SELECT id, title FROM articles WHERE %s = ANY(tags)",
("postgresql",)
)

# Retrieve - PostgreSQL array → Python list (automatic)
cursor.execute("SELECT title, tags FROM articles WHERE id = %s", (1,))
row = cursor.fetchone()
title, tags = row
print(tags) # ['python', 'postgresql', 'tutorial']

# && operator - array overlap (any element in common)
cursor.execute(
"SELECT title FROM articles WHERE tags && %s",
(["python", "django"],)
)

UUID

PostgreSQL has a native UUID type. Register the adapter once and psycopg2 converts automatically:

import uuid
import psycopg2.extras

# Register UUID adapter once at startup
psycopg2.extras.register_uuid()

with get_db() as conn:
with conn.cursor() as cursor:
new_id = uuid.uuid4()
cursor.execute(
"INSERT INTO sessions (id, user_id) VALUES (%s, %s)",
(new_id, user_id)
)

# Returned as uuid.UUID - not a plain string
cursor.execute("SELECT id, user_id FROM sessions WHERE id = %s", (new_id,))
row = cursor.fetchone()
session_id = row[0]
print(type(session_id)) # <class 'uuid.UUID'>
print(str(session_id)) # '550e8400-e29b-41d4-a716-446655440000'

Timestamps with Timezone (timestamptz)

Always use TIMESTAMP WITH TIME ZONE - PostgreSQL stores in UTC and converts on display:

from datetime import datetime, timezone

with get_db() as conn:
with conn.cursor() as cursor:
# Insert timezone-aware datetime
now = datetime.now(timezone.utc)
cursor.execute(
"INSERT INTO events (user_id, occurred_at) VALUES (%s, %s)",
(user_id, now)
)

# PostgreSQL NOW() returns the current timestamptz
cursor.execute(
"INSERT INTO logs (message, logged_at) VALUES (%s, NOW())",
("App started",)
)

# Retrieve - psycopg2 returns a timezone-aware datetime
cursor.execute("SELECT occurred_at FROM events WHERE id = %s", (event_id,))
row = cursor.fetchone()
ts = row[0]
print(type(ts)) # <class 'datetime.datetime'>
print(ts.tzinfo) # UTC

:::warning Use Timezone-Aware Datetimes Always use datetime.now(timezone.utc) instead of datetime.utcnow(). The latter returns a naive datetime with no tzinfo - PostgreSQL treats it as the session timezone, which may not be UTC in all environments. datetime.utcnow() is deprecated in Python 3.12 precisely because naive "UTC" datetimes cause subtle, hard-to-reproduce bugs. :::

Part 6 - COPY for Bulk Inserts

Why COPY Is So Much Faster

For large datasets, INSERT is slow because each row goes through:

  • Query parsing and planning overhead
  • Network round-trip cost per statement
  • A separate write-ahead log (WAL) record per row

COPY bypasses most of this - data is streamed as raw bytes in a single network transfer, with dramatically reduced per-row overhead.

Typical speedup: 50x–200x for datasets of 100,000+ rows.

copy_expert() with CSV

import io
import csv
import psycopg2

def bulk_insert_products(conn, products: list[dict]) -> int:
"""
Insert many products using COPY FROM STDIN.
Returns the number of rows inserted.
"""
buffer = io.StringIO()
writer = csv.writer(buffer)
for p in products:
writer.writerow([
p["name"],
p["price"],
p["category"],
p.get("stock", 0),
])

buffer.seek(0)

with conn.cursor() as cursor:
cursor.copy_expert(
"COPY products (name, price, category, stock) FROM STDIN WITH CSV",
buffer,
)
count = cursor.rowcount

conn.commit()
return count

copy_from() - Simpler Tab-Delimited Format

For simple cases where the data contains no tab characters:

import io

def bulk_insert_simple(conn, rows: list[tuple]) -> None:
"""rows is a list of (name, price) tuples."""
buffer = io.StringIO()
for name, price in rows:
buffer.write(f"{name}\t{price}\n")
buffer.seek(0)

with conn.cursor() as cursor:
cursor.copy_from(buffer, "products", columns=("name", "price"))
conn.commit()

:::tip Use copy_expert() for Production copy_from() uses tab-delimited format - any tab character in your data will break parsing silently. copy_expert() with WITH (FORMAT csv) handles quoting and escaping correctly for arbitrary data. Prefer copy_expert() unless your data is guaranteed tab-free. :::

Benchmark: INSERT vs COPY

import io
import time

def benchmark(conn, n_rows: int = 10_000):
rows = [
(f"Product {i}", round(9.99 + i * 0.01, 2), "test", i % 100)
for i in range(n_rows)
]

# Method 1: executemany INSERT
start = time.perf_counter()
with conn.cursor() as cur:
cur.executemany(
"INSERT INTO products (name, price, category, stock) VALUES (%s, %s, %s, %s)",
rows,
)
conn.rollback()
insert_time = time.perf_counter() - start

# Method 2: COPY FROM STDIN
start = time.perf_counter()
buf = io.StringIO()
for row in rows:
buf.write("\t".join(str(v) for v in row) + "\n")
buf.seek(0)
with conn.cursor() as cur:
cur.copy_from(buf, "products", columns=("name", "price", "category", "stock"))
conn.rollback()
copy_time = time.perf_counter() - start

print(f"INSERT: {insert_time:.2f}s | COPY: {copy_time:.2f}s | "
f"Speedup: {insert_time / copy_time:.1f}x")

Part 7 - The RETURNING Clause

A common anti-pattern: insert a row, then immediately query it to get its auto-generated id or server-set created_at. That is two round-trips when one will do.

PostgreSQL's RETURNING clause returns column values from the rows affected by INSERT, UPDATE, or DELETE - in the same statement:

# Anti-pattern: two round-trips
cursor.execute("INSERT INTO users (name, email) VALUES (%s, %s)", (name, email))
cursor.execute("SELECT id, created_at FROM users WHERE email = %s", (email,))
row = cursor.fetchone()

# Correct: one round-trip with RETURNING
cursor.execute(
"INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id, created_at",
(name, email)
)
row = cursor.fetchone()
user_id, created_at = row

RETURNING works on UPDATE and DELETE too:

# Get the full updated row back after a price change
cursor.execute(
"""
UPDATE products
SET price = price * 0.9,
updated_at = NOW()
WHERE id = %s
RETURNING id, name, price, updated_at
""",
(product_id,),
)
updated = cursor.fetchone()

# Get IDs of all expired sessions being deleted
cursor.execute(
"DELETE FROM sessions WHERE expires_at < NOW() RETURNING id, user_id",
)
expired = cursor.fetchall()
print(f"Removed {len(expired)} expired sessions")

Upsert with ON CONFLICT ... RETURNING

cursor.execute(
"""
INSERT INTO user_preferences (user_id, key, value)
VALUES (%s, %s, %s)
ON CONFLICT (user_id, key) DO UPDATE
SET value = EXCLUDED.value,
updated_at = NOW()
RETURNING id, user_id, key, value, updated_at
""",
(user_id, "theme", "dark"),
)
row = cursor.fetchone()

Part 8 - Connection Configuration for Production

Connection Timeouts

Connections that hang indefinitely are a common failure mode in production. Always set explicit timeouts:

pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
dsn=os.environ["DATABASE_URL"],
connect_timeout=5, # give up connecting after 5 seconds
options="-c statement_timeout=30000", # kill queries running longer than 30 seconds
)

Or set timeouts per-session after the connection is established:

with get_db() as conn:
with conn.cursor() as cursor:
cursor.execute("SET statement_timeout = '30s'")
cursor.execute("SET lock_timeout = '5s'")
# your query here

TCP Keepalives

Long-idle connections get silently dropped by firewalls, load balancers, and cloud NAT gateways. TCP keepalives probe idle connections and detect drops before your next query hangs for minutes:

pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
dsn=os.environ["DATABASE_URL"],
keepalives=1, # enable TCP keepalives
keepalives_idle=60, # probe after 60 seconds of idle
keepalives_interval=10, # retransmit every 10 seconds
keepalives_count=5, # give up after 5 failed probes
)

Handling Broken Connections in the Pool

Connections can go stale - the database server restarts, or a firewall silently drops the TCP session. A robust pool wrapper validates connections before use:

def _is_conn_alive(conn) -> bool:
"""Quick liveness check - executes a trivial query."""
try:
with conn.cursor() as cur:
cur.execute("SELECT 1")
return True
except psycopg2.Error:
return False


@contextlib.contextmanager
def get_db():
conn = pool.getconn()
try:
if not _is_conn_alive(conn):
pool.putconn(conn, close=True) # discard the dead connection
conn = pool.getconn() # get a fresh one
yield conn
conn.commit()
except Exception:
try:
conn.rollback()
except psycopg2.Error:
pass
raise
finally:
pool.putconn(conn)

:::note putconn(conn, close=True) Passing close=True to putconn() tells the pool to close and remove the connection rather than returning it to the idle set. The pool will open a new connection the next time one is needed. This is the correct way to evict a dead connection - do not call conn.close() directly on a pooled connection. :::

Full Working Example: Product Catalogue with Connection Pool

This example assembles all the patterns from this lesson into a production-style service that you can run locally.

"""
product_catalogue.py

A production-style PostgreSQL-backed product catalogue.
Demonstrates: connection pooling, parameterized queries,
JSONB, arrays, UUID, RETURNING, and COPY bulk insert.
"""
import io
import csv
import json
import logging
import os
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional

import psycopg2
import psycopg2.extras
import psycopg2.pool

logger = logging.getLogger(__name__)

# Register type adapters once at module load
psycopg2.extras.register_uuid()
psycopg2.extras.register_default_jsonb(globally=True)


# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------

SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS products (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
price NUMERIC(10, 2) NOT NULL CHECK (price >= 0),
category TEXT NOT NULL,
tags TEXT[] NOT NULL DEFAULT '{}',
metadata JSONB NOT NULL DEFAULT '{}',
stock INTEGER NOT NULL DEFAULT 0 CHECK (stock >= 0),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_products_category
ON products (category);
CREATE INDEX IF NOT EXISTS idx_products_tags
ON products USING GIN (tags);
CREATE INDEX IF NOT EXISTS idx_products_metadata
ON products USING GIN (metadata);
"""


# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------

@dataclass
class Product:
id: Optional[uuid.UUID]
name: str
price: float
category: str
tags: list[str] = field(default_factory=list)
metadata: dict = field(default_factory=dict)
stock: int = 0
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None

@classmethod
def from_row(cls, row) -> "Product":
return cls(
id=row["id"],
name=row["name"],
price=float(row["price"]),
category=row["category"],
tags=list(row["tags"]) if row["tags"] else [],
metadata=dict(row["metadata"]) if row["metadata"] else {},
stock=row["stock"],
created_at=row["created_at"],
updated_at=row["updated_at"],
)


# ---------------------------------------------------------------------------
# Pool management
# ---------------------------------------------------------------------------

_pool: Optional[psycopg2.pool.ThreadedConnectionPool] = None


def init_pool(
dsn: Optional[str] = None,
min_conn: int = 2,
max_conn: int = 10,
) -> None:
"""Create the connection pool. Call once at application startup."""
global _pool
dsn = dsn or os.environ["DATABASE_URL"]
_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=min_conn,
maxconn=max_conn,
dsn=dsn,
connect_timeout=5,
keepalives=1,
keepalives_idle=60,
keepalives_interval=10,
keepalives_count=5,
cursor_factory=psycopg2.extras.RealDictCursor,
options="-c statement_timeout=30000",
)
logger.info("Pool ready (min=%d, max=%d)", min_conn, max_conn)


def close_pool() -> None:
"""Close all connections. Call at application shutdown."""
if _pool:
_pool.closeall()
logger.info("Pool closed")


@contextmanager
def get_db():
"""Borrow a connection; commit on clean exit, rollback on exception."""
conn = _pool.getconn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
_pool.putconn(conn)


# ---------------------------------------------------------------------------
# Schema setup
# ---------------------------------------------------------------------------

def init_db() -> None:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(SCHEMA_SQL)


# ---------------------------------------------------------------------------
# CRUD
# ---------------------------------------------------------------------------

def create_product(
name: str,
price: float,
category: str,
tags: Optional[list[str]] = None,
metadata: Optional[dict] = None,
stock: int = 0,
) -> Product:
"""Insert a product and return the full row via RETURNING."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO products (name, price, category, tags, metadata, stock)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id, name, price, category, tags, metadata,
stock, created_at, updated_at
""",
(
name,
price,
category,
tags or [],
psycopg2.extras.Json(metadata or {}),
stock,
),
)
return Product.from_row(cur.fetchone())


def get_product(product_id: uuid.UUID) -> Optional[Product]:
"""Fetch a single product by UUID. Returns None if not found."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, price, category, tags, metadata,
stock, created_at, updated_at
FROM products
WHERE id = %s
""",
(product_id,),
)
row = cur.fetchone()
return Product.from_row(row) if row else None


def list_products_by_category(category: str) -> list[Product]:
"""Return all products in a category ordered by price ascending."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, price, category, tags, metadata,
stock, created_at, updated_at
FROM products
WHERE category = %s
ORDER BY price ASC
""",
(category,),
)
return [Product.from_row(row) for row in cur.fetchall()]


def search_by_tag(tag: str) -> list[Product]:
"""Find all products containing a specific tag (uses GIN index)."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, price, category, tags, metadata,
stock, created_at, updated_at
FROM products
WHERE %s = ANY(tags)
ORDER BY name
""",
(tag,),
)
return [Product.from_row(row) for row in cur.fetchall()]


def search_by_metadata(criteria: dict) -> list[Product]:
"""Find products where JSONB metadata contains the given key-value pairs."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, price, category, tags, metadata,
stock, created_at, updated_at
FROM products
WHERE metadata @> %s
ORDER BY name
""",
(psycopg2.extras.Json(criteria),),
)
return [Product.from_row(row) for row in cur.fetchall()]


def update_stock(product_id: uuid.UUID, delta: int) -> Optional[Product]:
"""
Adjust stock by delta (positive = add, negative = remove).
Returns the updated product, or None if not found.
The CHECK constraint on the column prevents stock from going negative.
"""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE products
SET stock = stock + %s,
updated_at = NOW()
WHERE id = %s
RETURNING id, name, price, category, tags, metadata,
stock, created_at, updated_at
""",
(delta, product_id),
)
row = cur.fetchone()
return Product.from_row(row) if row else None


def delete_product(product_id: uuid.UUID) -> bool:
"""Delete a product. Returns True if a row was deleted."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM products WHERE id = %s RETURNING id",
(product_id,),
)
return cur.fetchone() is not None


# ---------------------------------------------------------------------------
# Bulk insert via COPY
# ---------------------------------------------------------------------------

def bulk_create_products(products: list[dict]) -> int:
"""
Insert many products using COPY FROM STDIN - far faster than INSERT for
large datasets. Returns the number of rows inserted.

Each dict must have: name, price, category.
Optional keys: tags (list[str]), metadata (dict), stock (int).
"""
buffer = io.StringIO()
writer = csv.writer(buffer, quoting=csv.QUOTE_MINIMAL)

for p in products:
# Build PostgreSQL array literal: {tag1,tag2}
tags_literal = "{" + ",".join(p.get("tags", [])) + "}"
writer.writerow([
p["name"],
p["price"],
p["category"],
tags_literal,
json.dumps(p.get("metadata", {})),
p.get("stock", 0),
])

buffer.seek(0)

with get_db() as conn:
with conn.cursor() as cur:
cur.copy_expert(
"""
COPY products (name, price, category, tags, metadata, stock)
FROM STDIN
WITH (FORMAT csv)
""",
buffer,
)
return cur.rowcount


# ---------------------------------------------------------------------------
# Demo
# ---------------------------------------------------------------------------

def demo():
logging.basicConfig(level=logging.INFO)

init_pool(
dsn="postgresql://postgres:postgres@localhost:5432/catalogue",
min_conn=2,
max_conn=10,
)
init_db()

# Single inserts with RETURNING
widget = create_product(
name="Python Widget",
price=29.99,
category="software",
tags=["python", "utility", "open-source"],
metadata={"version": "1.0", "license": "MIT", "python_requires": ">=3.10"},
stock=500,
)
print(f"Created: {widget.name} id={widget.id}")

create_product(
name="Async Gadget",
price=49.99,
category="software",
tags=["python", "async", "networking"],
metadata={"version": "2.1", "license": "Apache-2.0"},
stock=250,
)

# Bulk insert via COPY
catalogue = [
{
"name": f"Bulk Item {i}",
"price": round(5.00 + i * 0.10, 2),
"category": "bulk",
"tags": ["imported", "wholesale"],
"stock": max(0, 1000 - i),
}
for i in range(10_000)
]
n = bulk_create_products(catalogue)
print(f"Bulk inserted {n} items via COPY")

# Query by category
sw = list_products_by_category("software")
print(f"\nSoftware products ({len(sw)}):")
for p in sw:
print(f" {p.name:<22} ${p.price:.2f} tags={p.tags}")

# Search by tag - uses GIN index on tags column
py_products = search_by_tag("python")
print(f"\nProducts tagged 'python': {len(py_products)}")

# Search by JSONB containment - uses GIN index on metadata column
mit_products = search_by_metadata({"license": "MIT"})
print(f"Products with MIT license: {len(mit_products)}")

# Update stock - RETURNING gives us the updated row in one round-trip
updated = update_stock(widget.id, -10)
if updated:
print(f"\nStock after -10: {updated.stock} updated_at={updated.updated_at}")

close_pool()


if __name__ == "__main__":
demo()
Local development setup with Docker and psql verification queries
# Start PostgreSQL 16 in Docker
docker run -d \
--name pg-dev \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=catalogue \
-p 5432:5432 \
postgres:16

# Wait for readiness
docker exec pg-dev pg_isready -U postgres

# Set the connection string environment variable
export DATABASE_URL="postgresql://postgres:postgres@localhost:5432/catalogue"

# Install the driver and run the demo
pip install psycopg2-binary
python product_catalogue.py
-- Connect with psql
psql $DATABASE_URL

-- Inspect the table schema
\d products

-- List all indexes
\di idx_products_*

-- JSONB containment query (uses GIN index)
SELECT name, metadata->>'license' AS license
FROM products
WHERE metadata ? 'license'
ORDER BY name
LIMIT 5;

-- Array overlap query (uses GIN index)
SELECT name, tags
FROM products
WHERE tags && ARRAY['python', 'async']
ORDER BY name;

-- Category summary
SELECT category,
COUNT(*) AS n_products,
SUM(stock) AS total_stock,
ROUND(AVG(price), 2) AS avg_price
FROM products
GROUP BY category
ORDER BY n_products DESC;

Graded Practice Challenges

Beginner \text{---} Parameterized Price Range Query

Write a function get_products_in_price_range(min_price, max_price) that returns all products with a price between min_price and max_price (inclusive), ordered by price ascending.

Requirements:

  • Use \%s parameterized placeholders \text{---} no string formatting in SQL
  • Return a list of Product objects using Product.from_row()
  • Handle the case where no products exist in the range (return an empty list)
Show Solution
def get_products_in_price_range(
min_price: float,
max_price: float,
) -> list[Product]:
"""
Return products in the given price range, cheapest first.
BETWEEN is inclusive on both ends: price >= min AND price <= max.
"""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, name, price, category, tags, metadata,
stock, created_at, updated_at
FROM products
WHERE price BETWEEN \%s AND \%s
ORDER BY price ASC
""",
(min_price, max_price),
)
return [Product.from_row(row) for row in cur.fetchall()]


# Usage
budget = get_products_in_price_range(10.00, 50.00)
for p in budget:
print(f"${p.price:.2f} {p.name}")

Key points:

  • Both min_price and max_price are passed in the parameter tuple - never interpolated into the string
  • BETWEEN %s AND %s is equivalent to price >= %s AND price <= %s - inclusive on both ends
  • fetchall() returns an empty list when no rows match - no special-case needed

Intermediate - Inventory Alert with Aggregation

Write a function get_low_stock_report(threshold) that returns a summary of every category whose total stock falls below threshold. Each item in the returned list should be a dict with:

  • category - the category name
  • total_stock - sum of stock across all products in the category
  • product_count - number of products in the category
  • min_stock - the lowest individual stock value in the category
  • lowest_stock_product - the name of the product with the lowest stock

Use a single SQL query with a CTE, GROUP BY, HAVING, and a correlated subquery.

Show Solution
def get_low_stock_report(threshold: int = 100) -> list[dict]:
"""
Return categories where total stock is below threshold.
Uses a CTE with HAVING for group-level filtering and a correlated
subquery to identify the lowest-stock product in each category.
"""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
WITH category_stats AS (
SELECT
p.category,
SUM(p.stock) AS total_stock,
COUNT(*) AS product_count,
MIN(p.stock) AS min_stock,
(
SELECT p2.name
FROM products p2
WHERE p2.category = p.category
ORDER BY p2.stock ASC
LIMIT 1
) AS lowest_stock_product
FROM products p
GROUP BY p.category
HAVING SUM(p.stock) < %s
)
SELECT
category,
total_stock,
product_count,
min_stock,
lowest_stock_product
FROM category_stats
ORDER BY total_stock ASC
""",
(threshold,),
)
return [dict(row) for row in cur.fetchall()]


# Usage
alerts = get_low_stock_report(threshold=500)
if not alerts:
print("All categories have adequate stock")
else:
header = f"{'Category':<15} {'Total':>7} {'Products':>8} Lowest product"
print(header)
print("-" * len(header))
for a in alerts:
print(
f"{a['category']:<15} {a['total_stock']:>7} "
f"{a['product_count']:>8} "
f"{a['lowest_stock_product']} ({a['min_stock']} units)"
)

Key points:

  • The %s placeholder in HAVING SUM(p.stock) < %s is fully supported - HAVING accepts parameters just like WHERE
  • The correlated subquery for lowest_stock_product executes once per group - acceptable for a small number of categories; for very large category counts, a window function (FIRST_VALUE) would be more efficient
  • dict(row) works because the pool uses RealDictCursor, which returns dict-like RealDictRow objects

Advanced - Connection Pool with Health Monitoring

Build a MonitoredPool class that wraps ThreadedConnectionPool and adds:

  1. A checkout_count metric - total connections ever borrowed across all threads
  2. A total_wait_ms metric - cumulative milliseconds spent waiting for connections
  3. An error_count metric - number of connections that caused a rollback
  4. A health_check() method that returns a status dict including a live database probe
  5. Automatic retry with exponential backoff when getconn() raises PoolError - up to 3 retries with 50 ms, 100 ms, and 200 ms delays between attempts
Show Solution
import time
import threading
import contextlib
from dataclasses import dataclass, field

import psycopg2
import psycopg2.pool
import psycopg2.extras
import os


@dataclass
class _Metrics:
"""Thread-safe counters for pool observability."""
checkout_count: int = 0
total_wait_ms: float = 0.0
error_count: int = 0
_lock: threading.Lock = field(default_factory=threading.Lock, repr=False)

def record_checkout(self, wait_ms: float) -> None:
with self._lock:
self.checkout_count += 1
self.total_wait_ms += wait_ms

def record_error(self) -> None:
with self._lock:
self.error_count += 1

def snapshot(self) -> dict:
with self._lock:
avg = (
self.total_wait_ms / self.checkout_count
if self.checkout_count > 0 else 0.0
)
return {
"checkout_count": self.checkout_count,
"total_wait_ms": round(self.total_wait_ms, 2),
"avg_wait_ms": round(avg, 2),
"error_count": self.error_count,
}


class MonitoredPool:
"""
ThreadedConnectionPool with metrics collection and
automatic retry on pool exhaustion.
"""

_RETRY_DELAYS_MS = [50, 100, 200] # exponential-ish backoff

def __init__(self, min_conn: int, max_conn: int, **kwargs):
self._pool = psycopg2.pool.ThreadedConnectionPool(
min_conn, max_conn, **kwargs
)
self._metrics = _Metrics()
self._min_conn = min_conn
self._max_conn = max_conn

def _getconn_with_retry(self):
"""Attempt getconn() with backoff on PoolError exhaustion."""
last_exc = None
for delay_ms in [0] + self._RETRY_DELAYS_MS:
if delay_ms:
time.sleep(delay_ms / 1000)
try:
return self._pool.getconn()
except psycopg2.pool.PoolError as exc:
last_exc = exc
raise last_exc

@contextlib.contextmanager
def connection(self):
"""
Borrow a connection with metrics tracking.
Commits on success, rolls back on exception, always returns to pool.
"""
t0 = time.perf_counter()
conn = self._getconn_with_retry()
wait_ms = (time.perf_counter() - t0) * 1000
self._metrics.record_checkout(wait_ms)

try:
yield conn
conn.commit()
except Exception:
self._metrics.record_error()
try:
conn.rollback()
except psycopg2.Error:
pass # connection may be dead; putconn will handle cleanup
raise
finally:
self._pool.putconn(conn)

def health_check(self) -> dict:
"""
Return pool metrics plus a live database connectivity probe.
Safe to expose from a /healthz or /ready HTTP endpoint.
"""
metrics = self._metrics.snapshot()
db_ok = False
try:
with self.connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT 1")
db_ok = True
except Exception:
pass

return {
"status": "healthy" if db_ok else "degraded",
"db_reachable": db_ok,
"pool_min": self._min_conn,
"pool_max": self._max_conn,
**metrics,
}

def close(self) -> None:
"""Close all connections in the pool."""
self._pool.closeall()


# --- Demo ---
if __name__ == "__main__":
import json

pool = MonitoredPool(
min_conn=2,
max_conn=5,
dsn=os.environ.get(
"DATABASE_URL",
"postgresql://postgres:postgres@localhost:5432/catalogue",
),
cursor_factory=psycopg2.extras.RealDictCursor,
)

def simulate_request(request_id: int) -> None:
try:
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) AS n FROM products")
row = cur.fetchone()
print(f"Request {request_id:>3}: {row['n']} products")
except Exception as exc:
print(f"Request {request_id:>3} failed: {exc}")

threads = [
threading.Thread(target=simulate_request, args=(i,))
for i in range(20)
]
for t in threads:
t.start()
for t in threads:
t.join()

print("\nPool health report:")
print(json.dumps(pool.health_check(), indent=2))

pool.close()

Key points:

  • _Metrics carries its own threading.Lock - counter updates are atomic without an outer lock on MonitoredPool
  • The retry loop uses [0] + _RETRY_DELAYS_MS so the first attempt is immediate and the loop body stays uniform for all attempts
  • health_check() reuses self.connection() - the liveness probe is a real checkout and shows up in metrics, making it a true end-to-end test
  • _getconn_with_retry() is a separate method rather than inline logic - easier to unit-test in isolation
  • psycopg2.Error (not Exception) is caught on the rollback path to avoid accidentally swallowing unrelated errors

Key Takeaways

  • psycopg2 is the battle-tested standard driver; psycopg3 is the modern successor with native async/await - choose psycopg3 for new projects where async matters
  • Store credentials in environment variables (DATABASE_URL, PGPASSWORD) or a secrets manager - never in source code or config files committed to version control
  • PostgreSQL uses %s placeholders in psycopg2 and psycopg3 - not ? as in SQLite; always pass parameters as the second argument tuple to execute(), never use f-strings or % string formatting
  • Connection pooling with ThreadedConnectionPool is mandatory for multi-threaded production services - opening a new connection per request exhausts PostgreSQL's connection limit under load
  • Size your pool with the formula maxconn = (DB CPU cores × 2) + spindles - for a 4-core cloud instance, 10 connections per application process is a solid starting point
  • Use the RETURNING clause to retrieve inserted or updated row data in a single round-trip - eliminates the common insert-then-select anti-pattern
  • COPY FROM STDIN via copy_expert() is 50–200x faster than executemany() for large bulk inserts
  • PostgreSQL types map cleanly to Python: JSONBdict (via Json()), TEXT[]list, UUIDuuid.UUID (after register_uuid()), TIMESTAMPTZ ↔ timezone-aware datetime
  • Always configure connection timeouts (connect_timeout, statement_timeout) and TCP keepalives in production - silent connection drops from firewalls and cloud networking are among the most common database failure modes

What's Next

Lesson 04 covers Transactions - the mechanism that makes multiple database operations atomic. You have already used transactions implicitly (conn.commit(), conn.rollback()), but production systems require explicit control: savepoints for partial rollbacks, isolation levels for concurrent reads (READ COMMITTED vs REPEATABLE READ vs SERIALIZABLE), and advisory locks for application-level coordination across processes.

Understanding transaction isolation is the difference between a database layer that occasionally loses or duplicates data under concurrent load and one that guarantees correctness regardless of how many application instances are running simultaneously.

© 2026 EngineersOfAI. All rights reserved.