Skip to main content

Python PostgreSQL with Python Practice Problems & Exercises

Practice: PostgreSQL with Python

11 problems4 Easy4 Medium3 Hard55–75 min
← Back to lesson

:::note Running These Problems Problems in this set use sqlite3 as a stand-in for PostgreSQL syntax concepts, so they run without a live Postgres server. Notes are provided where psycopg2/psycopg3 patterns differ. :::


#1Simulate a psycopg2-Style Connection PatternEasy
connectioncursorexecutefetchall

Practice the psycopg2 DB-API 2.0 pattern: cursor-based execution, parameterized inserts, and fetchall.

Solution:

import sqlite3

def postgres_style_query():
conn = sqlite3.connect(':memory:')
cur = conn.cursor()
cur.execute("CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT, price REAL)")
cur.executemany(
"INSERT INTO products VALUES (?, ?, ?)",
[(1, 'Widget', 9.99), (2, 'Gadget', 49.99), (3, 'Doohickey', 24.99)]
)
conn.commit()
cur.execute("SELECT * FROM products ORDER BY id")
rows = cur.fetchall()
conn.close()
return rows
import sqlite3

# psycopg2 pattern uses the same DB-API 2.0 interface as sqlite3.
# conn = psycopg2.connect(dsn)
# cur = conn.cursor()
# cur.execute(sql, params)
# rows = cur.fetchall()
# conn.commit()
# conn.close()

# TODO: Using sqlite3 (same interface), connect to an in-memory DB,
# create a products table, insert 3 products, and return them all.

def postgres_style_query():
  pass

print(postgres_style_query())
Expected Output
[(1, 'Widget', 9.99), (2, 'Gadget', 49.99), (3, 'Doohickey', 24.99)]
Hints

Hint 1: DB-API 2.0 is the same for sqlite3 and psycopg2: connect, cursor, execute, fetchall.

Hint 2: Use cur = conn.cursor() then cur.execute(sql, params) — the psycopg2 way.

Hint 3: Always call conn.commit() after DML (INSERT/UPDATE/DELETE).


#2Named Parameters vs Positional ParametersEasy
named paramsparameterizedpsycopg2 style

Use named-style query parameters (:name in sqlite3, %(name)s in psycopg2) to filter a table.

Solution:

def filter_events(conn, category, min_capacity):
cur = conn.execute("""
SELECT name, capacity
FROM events
WHERE category = :category AND capacity >= :min_capacity
ORDER BY capacity DESC
""", {'category': category, 'min_capacity': min_capacity})
return cur.fetchall()
import sqlite3

conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE events (
  id INTEGER PRIMARY KEY,
  name TEXT,
  category TEXT,
  capacity INTEGER
)
""")
conn.executemany("INSERT INTO events VALUES (?,?,?,?)", [
  (1, 'PyCon', 'conference', 2000),
  (2, 'Django Workshop', 'workshop', 50),
  (3, 'Data Summit', 'conference', 1500),
  (4, 'SQL Bootcamp', 'workshop', 30),
  (5, 'AI Meetup', 'meetup', 100),
])
conn.commit()

# TODO: Filter events by category and min_capacity.
# Use named-style parameters: %(name)s in psycopg2 or :name in sqlite3.
# Return list of (name, capacity) tuples.

def filter_events(conn, category, min_capacity):
  pass

print(filter_events(conn, 'conference', 1000))
print(filter_events(conn, 'workshop', 40))
Expected Output
[('PyCon', 2000), ('Data Summit', 1500)]
[('Django Workshop', 50)]
Hints

Hint 1: sqlite3 named params use :name syntax: WHERE category = :category.

Hint 2: Pass a dict as the params: cur.execute(sql, {'category': category, 'min_capacity': min_capacity}).

Hint 3: psycopg2 uses %(name)s syntax instead — same concept, different placeholder.


#3RETURNING Clause SimulationEasy
RETURNINGINSERTlastrowid

Use the RETURNING clause (supported by PostgreSQL and SQLite 3.35+) to get auto-generated values back from an INSERT.

Solution:

def insert_message(conn, sender, body):
cur = conn.execute("""
INSERT INTO messages (sender, body)
VALUES (?, ?)
RETURNING id, created_at
""", (sender, body))
conn.commit()
return cur.fetchone()
import sqlite3

conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE messages (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  sender TEXT,
  body TEXT,
  created_at TEXT DEFAULT (datetime('now'))
)
""")
conn.commit()

# PostgreSQL: INSERT INTO messages (sender, body) VALUES (%s, %s) RETURNING id, created_at
# SQLite 3.35+: INSERT INTO messages (sender, body) VALUES (?,?) RETURNING id, created_at
# TODO: Insert a message and return its auto-generated id and created_at timestamp.

def insert_message(conn, sender, body):
  pass

row = insert_message(conn, 'alice', 'Hello, world!')
print('id:', row[0])
print('has timestamp:', row[1] is not None)
Expected Output
id: 1
has timestamp: True
Hints

Hint 1: SQLite 3.35+ supports RETURNING: INSERT INTO ... VALUES (?,?) RETURNING id, created_at.

Hint 2: fetchone() after the INSERT RETURNING gives the returned row.

Hint 3: In psycopg2, the same syntax works: cur.execute(INSERT...RETURNING...) then cur.fetchone().


#4JSON Data Storage and QueryingEasy
JSONJSONBjson_extract

Query JSON-structured data using SQLite's json_extract (analogous to PostgreSQL's ->>).

Solution:

def users_older_than(conn, age):
cur = conn.execute("""
SELECT json_extract(data, '$.name')
FROM profiles
WHERE json_extract(data, '$.age') > ?
ORDER BY json_extract(data, '$.name')
""", (age,))
return [row[0] for row in cur.fetchall()]
import sqlite3
import json

conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE profiles (id INTEGER PRIMARY KEY, data TEXT)")
conn.executemany("INSERT INTO profiles VALUES (?,?)", [
  (1, json.dumps({'name': 'Alice', 'age': 30, 'skills': ['Python', 'SQL']})),
  (2, json.dumps({'name': 'Bob', 'age': 25, 'skills': ['Go', 'Rust']})),
  (3, json.dumps({'name': 'Carol', 'age': 35, 'skills': ['Python', 'Kubernetes']})),
])
conn.commit()

# PostgreSQL: SELECT data->>'name' FROM profiles WHERE (data->>'age')::int > 28
# SQLite: SELECT json_extract(data, '$.name') FROM profiles WHERE json_extract(data, '$.age') > 28

# TODO: Return names of users older than 28 using json_extract.

def users_older_than(conn, age):
  pass

print(users_older_than(conn, 28))
Expected Output
['Alice', 'Carol']
Hints

Hint 1: SQLite json_extract(column, '$.field') is equivalent to PostgreSQL column->>'field'.

Hint 2: You can filter with WHERE json_extract(data, '$.age') > 28.

Hint 3: PostgreSQL JSONB uses -> for JSON objects and ->> for text, plus @> for containment.


#5Server-Side Cursor Simulation (Batch Fetching)Medium
fetchmanyserver-side cursorstreaming

Simulate PostgreSQL server-side cursors using fetchmany to stream a large result set in batches.

Solution:

def batch_process(conn, batch_size=100):
cur = conn.execute("SELECT * FROM big_table ORDER BY id")
batch_count = 0
total_rows = 0
while True:
rows = cur.fetchmany(batch_size)
if not rows:
break
batch_count += 1
total_rows += len(rows)
return batch_count, total_rows
import sqlite3

conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE big_table (id INTEGER PRIMARY KEY, val TEXT)")
conn.executemany(
  "INSERT INTO big_table VALUES (?,?)",
  [(i, f'value_{i}') for i in range(1, 1001)]
)
conn.commit()

# PostgreSQL server-side cursors use:
#   with conn.cursor('server_cursor') as cur:
#       cur.execute(sql)
#       while True:
#           rows = cur.fetchmany(100)
#           if not rows: break
#
# TODO: Simulate this pattern using fetchmany(100) to process
# 1000 rows in batches of 100. Return the count of batches
# and total rows processed.

def batch_process(conn, batch_size=100):
  pass

batches, total = batch_process(conn, 100)
print(f'Batches: {batches}, Total rows: {total}')
Expected Output
Batches: 10, Total rows: 1000
Hints

Hint 1: cursor.fetchmany(size) returns up to 'size' rows; returns [] when exhausted.

Hint 2: Loop: while True: rows = cur.fetchmany(100); if not rows: break.

Hint 3: This is exactly the pattern used with psycopg2 named cursors for streaming large result sets.


#6Connection Pool with SimpleConnectionPool PatternMedium
connection poolpsycopg2SimpleConnectionPool

Build a minimal connection pool that mirrors the psycopg2 SimpleConnectionPool API.

Solution:

import sqlite3
import threading
from queue import Queue

class MinimalPool:
def __init__(self, minconn, maxconn, db_path=':memory:'):
self._pool = Queue()
for _ in range(minconn):
conn = sqlite3.connect(db_path, check_same_thread=False)
self._pool.put(conn)

def getconn(self):
return self._pool.get()

def putconn(self, conn):
self._pool.put(conn)

def closeall(self):
while not self._pool.empty():
self._pool.get().close()
import sqlite3
import threading
from queue import Queue

# psycopg2 pools: psycopg2.pool.SimpleConnectionPool(minconn, maxconn, dsn)
# pool.getconn() — borrow a connection
# pool.putconn(conn) — return it
# pool.closeall() — close all

# TODO: Implement a minimal connection pool mirroring this interface.
# minconn connections are created at startup.
# getconn() blocks if all connections are in use.
# putconn(conn) returns the connection.

class MinimalPool:
  def __init__(self, minconn, maxconn, db_path=':memory:'):
      pass

  def getconn(self):
      pass

  def putconn(self, conn):
      pass

  def closeall(self):
      pass

def test_pool():
  pool = MinimalPool(2, 5)
  results = []
  lock = threading.Lock()

  def worker():
      conn = pool.getconn()
      try:
          cur = conn.execute("SELECT 42")
          val = cur.fetchone()[0]
          with lock:
              results.append(val)
      finally:
          pool.putconn(conn)

  threads = [threading.Thread(target=worker) for _ in range(8)]
  for t in threads:
      t.start()
  for t in threads:
      t.join()
  pool.closeall()
  print(f'Results: {len(results)} queries, all correct: {all(r == 42 for r in results)}')

test_pool()
Expected Output
Results: 8 queries, all correct: True
Hints

Hint 1: Use Queue to hold available connections — put() returns, get() borrows.

Hint 2: Create minconn connections in __init__ and put them all in the queue.

Hint 3: closeall() drains the queue and calls conn.close() on each.


#7COPY-Style Bulk LoadMedium
bulk insertexecutemanyCOPYperformance

Parse a CSV payload and bulk-load it into SQLite, mirroring the PostgreSQL COPY pattern.

Solution:

import csv
import io
import time

def bulk_load_csv(conn, csv_data):
reader = csv.DictReader(io.StringIO(csv_data.strip()))
rows = [
(int(r['sensor_id']), float(r['temperature']), float(r['humidity']), int(r['ts']))
for r in reader
]
start = time.perf_counter()
with conn:
conn.executemany(
"INSERT INTO readings VALUES (?,?,?,?)", rows
)
elapsed = time.perf_counter() - start
count = conn.execute("SELECT COUNT(*) FROM readings").fetchone()[0]
return count, elapsed
import sqlite3
import csv
import io
import time

# PostgreSQL COPY is the fastest bulk load:
#   cur.copy_expert("COPY table FROM STDIN WITH CSV HEADER", file_obj)
# For sqlite3, the closest equivalent is executemany inside a transaction.

# TODO: Parse the CSV data below and bulk-insert into a 'readings' table.
# Return total rows inserted and time taken.

CSV_DATA = """sensor_id,temperature,humidity,ts
1,22.5,60,1000
2,19.3,72,1001
1,23.1,58,1002
3,28.7,45,1003
2,18.9,75,1004
1,24.0,55,1005
3,29.2,43,1006
2,20.1,70,1007
"""

def bulk_load_csv(conn, csv_data):
  pass

conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE readings (sensor_id INTEGER, temperature REAL, humidity REAL, ts INTEGER)")
count, elapsed = bulk_load_csv(conn, CSV_DATA)
print(f'Loaded {count} rows in {elapsed:.4f}s')
Expected Output
Loaded 8 rows in 0.XXXXs
Hints

Hint 1: Use csv.DictReader(io.StringIO(csv_data)) to parse the CSV.

Hint 2: Build a list of tuples, then use executemany inside a with conn: block.

Hint 3: SELECT COUNT(*) after insert to verify the row count.


#8Array-Like Storage with JSON ArraysMedium
arraysJSONjson_eachPostgreSQL arrays

Use json_each to simulate PostgreSQL array containment queries on JSON-stored tag lists.

Solution:

def posts_with_tag(conn, tag):
cur = conn.execute("""
SELECT DISTINCT p.title
FROM posts p, json_each(p.tags) je
WHERE je.value = ?
ORDER BY p.title
""", (tag,))
return [row[0] for row in cur.fetchall()]
import sqlite3
import json

conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE posts (id INTEGER PRIMARY KEY, title TEXT, tags TEXT)")
conn.executemany("INSERT INTO posts VALUES (?,?,?)", [
  (1, 'Python Tips', json.dumps(['python', 'programming', 'tips'])),
  (2, 'SQL Deep Dive', json.dumps(['sql', 'databases', 'performance'])),
  (3, 'Async Python', json.dumps(['python', 'async', 'concurrency'])),
  (4, 'PostgreSQL Arrays', json.dumps(['sql', 'postgresql', 'arrays'])),
])
conn.commit()

# PostgreSQL arrays: SELECT title FROM posts WHERE 'python' = ANY(tags)
# SQLite equivalent: use json_each to explode the JSON array

# TODO: Find all posts that contain a given tag.
# Return a sorted list of titles.

def posts_with_tag(conn, tag):
  pass

print(posts_with_tag(conn, 'python'))
print(posts_with_tag(conn, 'sql'))
Expected Output
['Async Python', 'Python Tips']
['PostgreSQL Arrays', 'SQL Deep Dive']
Hints

Hint 1: json_each(tags) is a SQLite table-valued function that expands a JSON array into rows.

Hint 2: JOIN json_each(posts.tags) je ON je.value = ? finds posts containing the tag.

Hint 3: PostgreSQL's equivalent is WHERE tag = ANY(tags_array_column).


#9Async Query Pattern with asyncioHard
asyncioasync databaseaiosqliteasyncpg

Simulate async PostgreSQL queries using asyncio and run_in_executor for non-blocking SQLite access.

Solution:

import asyncio
import sqlite3
import functools

async def async_fetch_users(conn, min_age):
loop = asyncio.get_event_loop()
def _fetch():
cur = conn.execute(
"SELECT name, age FROM users WHERE age >= ? ORDER BY name",
(min_age,)
)
return cur.fetchall()
return await loop.run_in_executor(None, _fetch)
import asyncio
import sqlite3

# In production with asyncpg:
#   conn = await asyncpg.connect(dsn)
#   rows = await conn.fetch("SELECT * FROM table WHERE id = $1", id)
#
# aiosqlite provides the same async interface for SQLite.
# We'll simulate the async pattern using run_in_executor.

# TODO: Write an async function that runs a blocking sqlite3 query
# in an executor (simulating asyncpg's non-blocking behavior).
# Query all rows from a users table concurrently.

async def async_fetch_users(conn, min_age):
  pass

async def main():
  conn = sqlite3.connect(':memory:')
  conn.execute("CREATE TABLE users (id INTEGER, name TEXT, age INTEGER)")
  conn.executemany("INSERT INTO users VALUES (?,?,?)", [
      (1, 'Alice', 30), (2, 'Bob', 22), (3, 'Carol', 35), (4, 'Dave', 19)
  ])
  conn.commit()

  results = await asyncio.gather(
      async_fetch_users(conn, 25),
      async_fetch_users(conn, 30),
  )
  for r in results:
      print(r)
  conn.close()

asyncio.run(main())
Expected Output
[('Alice', 30), ('Carol', 35)]
[('Alice', 30), ('Carol', 35)]
Hints

Hint 1: loop.run_in_executor(None, blocking_fn) runs a blocking function without blocking the event loop.

Hint 2: Use functools.partial or a lambda to pass arguments to the executor function.

Hint 3: asyncpg uses $1, $2 placeholders; aiosqlite uses ? — same concept, different placeholder.


#10Retry Logic for Transient Database ErrorsHard
retryexponential backoffOperationalErrorresilience

Implement an exponential backoff retry decorator for transient database connection errors.

Solution:

import functools
import time
import random

def retry_query(max_retries=3, base_delay=0.01):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_err = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except sqlite3.OperationalError as e:
last_err = e
wait = base_delay * (2 ** attempt) + random.random() * 0.001
print(f"Retry {attempt + 1}/{max_retries} after {wait:.4f}s: {e}")
time.sleep(wait)
raise last_err
return wrapper
return decorator
import sqlite3
import time
import random

# PostgreSQL transient errors (connection timeout, deadlock) require retry logic.
# Pattern: exponential backoff with jitter.

# TODO: Write a retry_query decorator that:
# - Retries on sqlite3.OperationalError up to max_retries times
# - Uses exponential backoff: wait = base_delay * (2 ** attempt) + random jitter
# - Raises the error after all retries are exhausted
# - Logs each retry attempt

import functools

def retry_query(max_retries=3, base_delay=0.01):
  pass

attempt_counter = [0]

@retry_query(max_retries=3, base_delay=0.001)
def flaky_query(conn):
  attempt_counter[0] += 1
  if attempt_counter[0] < 3:
      raise sqlite3.OperationalError("database is locked")
  return conn.execute("SELECT 1").fetchone()[0]

conn = sqlite3.connect(':memory:')
result = flaky_query(conn)
print(f'Result: {result}, Attempts: {attempt_counter[0]}')
Expected Output
Result: 1, Attempts: 3
Hints

Hint 1: Use functools.wraps(func) to preserve function metadata.

Hint 2: Catch sqlite3.OperationalError inside a for loop over range(max_retries).

Hint 3: time.sleep(base_delay * (2 ** attempt) + random.random() * 0.001) adds jitter.


#11Database Health Check and Metrics CollectorHard
health checkPRAGMApg_statmetrics

Build a database health check function that collects table count, row totals, integrity status, and response time.

Solution:

import time

def health_check(conn):
start = time.perf_counter()
try:
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
).fetchall()
table_count = len(tables)

total_rows = 0
for (tname,) in tables:
count = conn.execute(f"SELECT COUNT(*) FROM \"{tname}\"").fetchone()[0]
total_rows += count

page_count = conn.execute("PRAGMA page_count").fetchone()[0]
page_size = conn.execute("PRAGMA page_size").fetchone()[0]
db_size = page_count * page_size

integrity = conn.execute("PRAGMA integrity_check").fetchone()[0]
integrity_ok = (integrity == 'ok')

elapsed_ms = (time.perf_counter() - start) * 1000
return {
'status': 'ok',
'table_count': table_count,
'total_rows': total_rows,
'db_size_bytes': db_size,
'integrity_ok': integrity_ok,
'response_time_ms': round(elapsed_ms, 3),
}
except Exception as e:
return {'status': 'error', 'error': str(e)}
import sqlite3
import time

# PostgreSQL health checks query pg_stat_activity, pg_database_size, etc.
# SQLite equivalent uses PRAGMA commands.

# TODO: Build a health_check() function that returns a dict with:
# - 'status': 'ok' or 'error'
# - 'table_count': number of user tables
# - 'total_rows': total rows across all user tables
# - 'db_size_bytes': approximate database size
# - 'integrity_ok': result of PRAGMA integrity_check
# - 'response_time_ms': time to run the check

def health_check(conn):
  pass

conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE orders (id INTEGER PRIMARY KEY, total REAL)")
conn.execute("CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT)")
conn.executemany("INSERT INTO orders VALUES (?,?)", [(i, i * 10.5) for i in range(1, 51)])
conn.executemany("INSERT INTO products VALUES (?,?)", [(i, f'Product {i}') for i in range(1, 21)])
conn.commit()

report = health_check(conn)
for k, v in report.items():
  print(f'{k}: {v}')
Expected Output
status: ok
table_count: 2
total_rows: 70
db_size_bytes: 0
integrity_ok: True
response_time_ms: X.XXX
Hints

Hint 1: SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' lists user tables.

Hint 2: Loop over table names and sum SELECT COUNT(*) for each to get total rows.

Hint 3: PRAGMA integrity_check returns 'ok' if the database is healthy.

Hint 4: PRAGMA page_count and PRAGMA page_size give an approximate size in bytes.

© 2026 EngineersOfAI. All rights reserved.