Python PostgreSQL with Python Practice Problems & Exercises
Practice: PostgreSQL with Python
← Back to lesson:::note Running These Problems Problems in this set use sqlite3 as a stand-in for PostgreSQL syntax concepts, so they run without a live Postgres server. Notes are provided where psycopg2/psycopg3 patterns differ. :::
Practice the psycopg2 DB-API 2.0 pattern: cursor-based execution, parameterized inserts, and fetchall.
Solution:
import sqlite3
def postgres_style_query():
conn = sqlite3.connect(':memory:')
cur = conn.cursor()
cur.execute("CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT, price REAL)")
cur.executemany(
"INSERT INTO products VALUES (?, ?, ?)",
[(1, 'Widget', 9.99), (2, 'Gadget', 49.99), (3, 'Doohickey', 24.99)]
)
conn.commit()
cur.execute("SELECT * FROM products ORDER BY id")
rows = cur.fetchall()
conn.close()
return rows
import sqlite3
# psycopg2 pattern uses the same DB-API 2.0 interface as sqlite3.
# conn = psycopg2.connect(dsn)
# cur = conn.cursor()
# cur.execute(sql, params)
# rows = cur.fetchall()
# conn.commit()
# conn.close()
# TODO: Using sqlite3 (same interface), connect to an in-memory DB,
# create a products table, insert 3 products, and return them all.
def postgres_style_query():
pass
print(postgres_style_query())
Expected Output
[(1, 'Widget', 9.99), (2, 'Gadget', 49.99), (3, 'Doohickey', 24.99)]Hints
Hint 1: DB-API 2.0 is the same for sqlite3 and psycopg2: connect, cursor, execute, fetchall.
Hint 2: Use cur = conn.cursor() then cur.execute(sql, params) — the psycopg2 way.
Hint 3: Always call conn.commit() after DML (INSERT/UPDATE/DELETE).
Use named-style query parameters (:name in sqlite3, %(name)s in psycopg2) to filter a table.
Solution:
def filter_events(conn, category, min_capacity):
cur = conn.execute("""
SELECT name, capacity
FROM events
WHERE category = :category AND capacity >= :min_capacity
ORDER BY capacity DESC
""", {'category': category, 'min_capacity': min_capacity})
return cur.fetchall()
import sqlite3
conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE events (
id INTEGER PRIMARY KEY,
name TEXT,
category TEXT,
capacity INTEGER
)
""")
conn.executemany("INSERT INTO events VALUES (?,?,?,?)", [
(1, 'PyCon', 'conference', 2000),
(2, 'Django Workshop', 'workshop', 50),
(3, 'Data Summit', 'conference', 1500),
(4, 'SQL Bootcamp', 'workshop', 30),
(5, 'AI Meetup', 'meetup', 100),
])
conn.commit()
# TODO: Filter events by category and min_capacity.
# Use named-style parameters: %(name)s in psycopg2 or :name in sqlite3.
# Return list of (name, capacity) tuples.
def filter_events(conn, category, min_capacity):
pass
print(filter_events(conn, 'conference', 1000))
print(filter_events(conn, 'workshop', 40))
Expected Output
[('PyCon', 2000), ('Data Summit', 1500)]
[('Django Workshop', 50)]Hints
Hint 1: sqlite3 named params use :name syntax: WHERE category = :category.
Hint 2: Pass a dict as the params: cur.execute(sql, {'category': category, 'min_capacity': min_capacity}).
Hint 3: psycopg2 uses %(name)s syntax instead — same concept, different placeholder.
Use the RETURNING clause (supported by PostgreSQL and SQLite 3.35+) to get auto-generated values back from an INSERT.
Solution:
def insert_message(conn, sender, body):
cur = conn.execute("""
INSERT INTO messages (sender, body)
VALUES (?, ?)
RETURNING id, created_at
""", (sender, body))
conn.commit()
return cur.fetchone()
import sqlite3
conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender TEXT,
body TEXT,
created_at TEXT DEFAULT (datetime('now'))
)
""")
conn.commit()
# PostgreSQL: INSERT INTO messages (sender, body) VALUES (%s, %s) RETURNING id, created_at
# SQLite 3.35+: INSERT INTO messages (sender, body) VALUES (?,?) RETURNING id, created_at
# TODO: Insert a message and return its auto-generated id and created_at timestamp.
def insert_message(conn, sender, body):
pass
row = insert_message(conn, 'alice', 'Hello, world!')
print('id:', row[0])
print('has timestamp:', row[1] is not None)
Expected Output
id: 1
has timestamp: TrueHints
Hint 1: SQLite 3.35+ supports RETURNING: INSERT INTO ... VALUES (?,?) RETURNING id, created_at.
Hint 2: fetchone() after the INSERT RETURNING gives the returned row.
Hint 3: In psycopg2, the same syntax works: cur.execute(INSERT...RETURNING...) then cur.fetchone().
Query JSON-structured data using SQLite's json_extract (analogous to PostgreSQL's ->>).
Solution:
def users_older_than(conn, age):
cur = conn.execute("""
SELECT json_extract(data, '$.name')
FROM profiles
WHERE json_extract(data, '$.age') > ?
ORDER BY json_extract(data, '$.name')
""", (age,))
return [row[0] for row in cur.fetchall()]
import sqlite3
import json
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE profiles (id INTEGER PRIMARY KEY, data TEXT)")
conn.executemany("INSERT INTO profiles VALUES (?,?)", [
(1, json.dumps({'name': 'Alice', 'age': 30, 'skills': ['Python', 'SQL']})),
(2, json.dumps({'name': 'Bob', 'age': 25, 'skills': ['Go', 'Rust']})),
(3, json.dumps({'name': 'Carol', 'age': 35, 'skills': ['Python', 'Kubernetes']})),
])
conn.commit()
# PostgreSQL: SELECT data->>'name' FROM profiles WHERE (data->>'age')::int > 28
# SQLite: SELECT json_extract(data, '$.name') FROM profiles WHERE json_extract(data, '$.age') > 28
# TODO: Return names of users older than 28 using json_extract.
def users_older_than(conn, age):
pass
print(users_older_than(conn, 28))
Expected Output
['Alice', 'Carol']Hints
Hint 1: SQLite json_extract(column, '$.field') is equivalent to PostgreSQL column->>'field'.
Hint 2: You can filter with WHERE json_extract(data, '$.age') > 28.
Hint 3: PostgreSQL JSONB uses -> for JSON objects and ->> for text, plus @> for containment.
Simulate PostgreSQL server-side cursors using fetchmany to stream a large result set in batches.
Solution:
def batch_process(conn, batch_size=100):
cur = conn.execute("SELECT * FROM big_table ORDER BY id")
batch_count = 0
total_rows = 0
while True:
rows = cur.fetchmany(batch_size)
if not rows:
break
batch_count += 1
total_rows += len(rows)
return batch_count, total_rows
import sqlite3
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE big_table (id INTEGER PRIMARY KEY, val TEXT)")
conn.executemany(
"INSERT INTO big_table VALUES (?,?)",
[(i, f'value_{i}') for i in range(1, 1001)]
)
conn.commit()
# PostgreSQL server-side cursors use:
# with conn.cursor('server_cursor') as cur:
# cur.execute(sql)
# while True:
# rows = cur.fetchmany(100)
# if not rows: break
#
# TODO: Simulate this pattern using fetchmany(100) to process
# 1000 rows in batches of 100. Return the count of batches
# and total rows processed.
def batch_process(conn, batch_size=100):
pass
batches, total = batch_process(conn, 100)
print(f'Batches: {batches}, Total rows: {total}')
Expected Output
Batches: 10, Total rows: 1000Hints
Hint 1: cursor.fetchmany(size) returns up to 'size' rows; returns [] when exhausted.
Hint 2: Loop: while True: rows = cur.fetchmany(100); if not rows: break.
Hint 3: This is exactly the pattern used with psycopg2 named cursors for streaming large result sets.
Build a minimal connection pool that mirrors the psycopg2 SimpleConnectionPool API.
Solution:
import sqlite3
import threading
from queue import Queue
class MinimalPool:
def __init__(self, minconn, maxconn, db_path=':memory:'):
self._pool = Queue()
for _ in range(minconn):
conn = sqlite3.connect(db_path, check_same_thread=False)
self._pool.put(conn)
def getconn(self):
return self._pool.get()
def putconn(self, conn):
self._pool.put(conn)
def closeall(self):
while not self._pool.empty():
self._pool.get().close()
import sqlite3
import threading
from queue import Queue
# psycopg2 pools: psycopg2.pool.SimpleConnectionPool(minconn, maxconn, dsn)
# pool.getconn() — borrow a connection
# pool.putconn(conn) — return it
# pool.closeall() — close all
# TODO: Implement a minimal connection pool mirroring this interface.
# minconn connections are created at startup.
# getconn() blocks if all connections are in use.
# putconn(conn) returns the connection.
class MinimalPool:
def __init__(self, minconn, maxconn, db_path=':memory:'):
pass
def getconn(self):
pass
def putconn(self, conn):
pass
def closeall(self):
pass
def test_pool():
pool = MinimalPool(2, 5)
results = []
lock = threading.Lock()
def worker():
conn = pool.getconn()
try:
cur = conn.execute("SELECT 42")
val = cur.fetchone()[0]
with lock:
results.append(val)
finally:
pool.putconn(conn)
threads = [threading.Thread(target=worker) for _ in range(8)]
for t in threads:
t.start()
for t in threads:
t.join()
pool.closeall()
print(f'Results: {len(results)} queries, all correct: {all(r == 42 for r in results)}')
test_pool()
Expected Output
Results: 8 queries, all correct: TrueHints
Hint 1: Use Queue to hold available connections — put() returns, get() borrows.
Hint 2: Create minconn connections in __init__ and put them all in the queue.
Hint 3: closeall() drains the queue and calls conn.close() on each.
Parse a CSV payload and bulk-load it into SQLite, mirroring the PostgreSQL COPY pattern.
Solution:
import csv
import io
import time
def bulk_load_csv(conn, csv_data):
reader = csv.DictReader(io.StringIO(csv_data.strip()))
rows = [
(int(r['sensor_id']), float(r['temperature']), float(r['humidity']), int(r['ts']))
for r in reader
]
start = time.perf_counter()
with conn:
conn.executemany(
"INSERT INTO readings VALUES (?,?,?,?)", rows
)
elapsed = time.perf_counter() - start
count = conn.execute("SELECT COUNT(*) FROM readings").fetchone()[0]
return count, elapsed
import sqlite3
import csv
import io
import time
# PostgreSQL COPY is the fastest bulk load:
# cur.copy_expert("COPY table FROM STDIN WITH CSV HEADER", file_obj)
# For sqlite3, the closest equivalent is executemany inside a transaction.
# TODO: Parse the CSV data below and bulk-insert into a 'readings' table.
# Return total rows inserted and time taken.
CSV_DATA = """sensor_id,temperature,humidity,ts
1,22.5,60,1000
2,19.3,72,1001
1,23.1,58,1002
3,28.7,45,1003
2,18.9,75,1004
1,24.0,55,1005
3,29.2,43,1006
2,20.1,70,1007
"""
def bulk_load_csv(conn, csv_data):
pass
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE readings (sensor_id INTEGER, temperature REAL, humidity REAL, ts INTEGER)")
count, elapsed = bulk_load_csv(conn, CSV_DATA)
print(f'Loaded {count} rows in {elapsed:.4f}s')
Expected Output
Loaded 8 rows in 0.XXXXsHints
Hint 1: Use csv.DictReader(io.StringIO(csv_data)) to parse the CSV.
Hint 2: Build a list of tuples, then use executemany inside a with conn: block.
Hint 3: SELECT COUNT(*) after insert to verify the row count.
Use json_each to simulate PostgreSQL array containment queries on JSON-stored tag lists.
Solution:
def posts_with_tag(conn, tag):
cur = conn.execute("""
SELECT DISTINCT p.title
FROM posts p, json_each(p.tags) je
WHERE je.value = ?
ORDER BY p.title
""", (tag,))
return [row[0] for row in cur.fetchall()]
import sqlite3
import json
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE posts (id INTEGER PRIMARY KEY, title TEXT, tags TEXT)")
conn.executemany("INSERT INTO posts VALUES (?,?,?)", [
(1, 'Python Tips', json.dumps(['python', 'programming', 'tips'])),
(2, 'SQL Deep Dive', json.dumps(['sql', 'databases', 'performance'])),
(3, 'Async Python', json.dumps(['python', 'async', 'concurrency'])),
(4, 'PostgreSQL Arrays', json.dumps(['sql', 'postgresql', 'arrays'])),
])
conn.commit()
# PostgreSQL arrays: SELECT title FROM posts WHERE 'python' = ANY(tags)
# SQLite equivalent: use json_each to explode the JSON array
# TODO: Find all posts that contain a given tag.
# Return a sorted list of titles.
def posts_with_tag(conn, tag):
pass
print(posts_with_tag(conn, 'python'))
print(posts_with_tag(conn, 'sql'))
Expected Output
['Async Python', 'Python Tips']
['PostgreSQL Arrays', 'SQL Deep Dive']Hints
Hint 1: json_each(tags) is a SQLite table-valued function that expands a JSON array into rows.
Hint 2: JOIN json_each(posts.tags) je ON je.value = ? finds posts containing the tag.
Hint 3: PostgreSQL's equivalent is WHERE tag = ANY(tags_array_column).
Simulate async PostgreSQL queries using asyncio and run_in_executor for non-blocking SQLite access.
Solution:
import asyncio
import sqlite3
import functools
async def async_fetch_users(conn, min_age):
loop = asyncio.get_event_loop()
def _fetch():
cur = conn.execute(
"SELECT name, age FROM users WHERE age >= ? ORDER BY name",
(min_age,)
)
return cur.fetchall()
return await loop.run_in_executor(None, _fetch)
import asyncio
import sqlite3
# In production with asyncpg:
# conn = await asyncpg.connect(dsn)
# rows = await conn.fetch("SELECT * FROM table WHERE id = $1", id)
#
# aiosqlite provides the same async interface for SQLite.
# We'll simulate the async pattern using run_in_executor.
# TODO: Write an async function that runs a blocking sqlite3 query
# in an executor (simulating asyncpg's non-blocking behavior).
# Query all rows from a users table concurrently.
async def async_fetch_users(conn, min_age):
pass
async def main():
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE users (id INTEGER, name TEXT, age INTEGER)")
conn.executemany("INSERT INTO users VALUES (?,?,?)", [
(1, 'Alice', 30), (2, 'Bob', 22), (3, 'Carol', 35), (4, 'Dave', 19)
])
conn.commit()
results = await asyncio.gather(
async_fetch_users(conn, 25),
async_fetch_users(conn, 30),
)
for r in results:
print(r)
conn.close()
asyncio.run(main())
Expected Output
[('Alice', 30), ('Carol', 35)]
[('Alice', 30), ('Carol', 35)]Hints
Hint 1: loop.run_in_executor(None, blocking_fn) runs a blocking function without blocking the event loop.
Hint 2: Use functools.partial or a lambda to pass arguments to the executor function.
Hint 3: asyncpg uses $1, $2 placeholders; aiosqlite uses ? — same concept, different placeholder.
Implement an exponential backoff retry decorator for transient database connection errors.
Solution:
import functools
import time
import random
def retry_query(max_retries=3, base_delay=0.01):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_err = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except sqlite3.OperationalError as e:
last_err = e
wait = base_delay * (2 ** attempt) + random.random() * 0.001
print(f"Retry {attempt + 1}/{max_retries} after {wait:.4f}s: {e}")
time.sleep(wait)
raise last_err
return wrapper
return decorator
import sqlite3
import time
import random
# PostgreSQL transient errors (connection timeout, deadlock) require retry logic.
# Pattern: exponential backoff with jitter.
# TODO: Write a retry_query decorator that:
# - Retries on sqlite3.OperationalError up to max_retries times
# - Uses exponential backoff: wait = base_delay * (2 ** attempt) + random jitter
# - Raises the error after all retries are exhausted
# - Logs each retry attempt
import functools
def retry_query(max_retries=3, base_delay=0.01):
pass
attempt_counter = [0]
@retry_query(max_retries=3, base_delay=0.001)
def flaky_query(conn):
attempt_counter[0] += 1
if attempt_counter[0] < 3:
raise sqlite3.OperationalError("database is locked")
return conn.execute("SELECT 1").fetchone()[0]
conn = sqlite3.connect(':memory:')
result = flaky_query(conn)
print(f'Result: {result}, Attempts: {attempt_counter[0]}')
Expected Output
Result: 1, Attempts: 3Hints
Hint 1: Use functools.wraps(func) to preserve function metadata.
Hint 2: Catch sqlite3.OperationalError inside a for loop over range(max_retries).
Hint 3: time.sleep(base_delay * (2 ** attempt) + random.random() * 0.001) adds jitter.
Build a database health check function that collects table count, row totals, integrity status, and response time.
Solution:
import time
def health_check(conn):
start = time.perf_counter()
try:
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
).fetchall()
table_count = len(tables)
total_rows = 0
for (tname,) in tables:
count = conn.execute(f"SELECT COUNT(*) FROM \"{tname}\"").fetchone()[0]
total_rows += count
page_count = conn.execute("PRAGMA page_count").fetchone()[0]
page_size = conn.execute("PRAGMA page_size").fetchone()[0]
db_size = page_count * page_size
integrity = conn.execute("PRAGMA integrity_check").fetchone()[0]
integrity_ok = (integrity == 'ok')
elapsed_ms = (time.perf_counter() - start) * 1000
return {
'status': 'ok',
'table_count': table_count,
'total_rows': total_rows,
'db_size_bytes': db_size,
'integrity_ok': integrity_ok,
'response_time_ms': round(elapsed_ms, 3),
}
except Exception as e:
return {'status': 'error', 'error': str(e)}
import sqlite3
import time
# PostgreSQL health checks query pg_stat_activity, pg_database_size, etc.
# SQLite equivalent uses PRAGMA commands.
# TODO: Build a health_check() function that returns a dict with:
# - 'status': 'ok' or 'error'
# - 'table_count': number of user tables
# - 'total_rows': total rows across all user tables
# - 'db_size_bytes': approximate database size
# - 'integrity_ok': result of PRAGMA integrity_check
# - 'response_time_ms': time to run the check
def health_check(conn):
pass
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE orders (id INTEGER PRIMARY KEY, total REAL)")
conn.execute("CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT)")
conn.executemany("INSERT INTO orders VALUES (?,?)", [(i, i * 10.5) for i in range(1, 51)])
conn.executemany("INSERT INTO products VALUES (?,?)", [(i, f'Product {i}') for i in range(1, 21)])
conn.commit()
report = health_check(conn)
for k, v in report.items():
print(f'{k}: {v}')
Expected Output
status: ok
table_count: 2
total_rows: 70
db_size_bytes: 0
integrity_ok: True
response_time_ms: X.XXXHints
Hint 1: SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' lists user tables.
Hint 2: Loop over table names and sum SELECT COUNT(*) for each to get total rows.
Hint 3: PRAGMA integrity_check returns 'ok' if the database is healthy.
Hint 4: PRAGMA page_count and PRAGMA page_size give an approximate size in bytes.
