Transactions and Data Integrity
Reading time: ~35 minutes | Level: Intermediate → Engineering
Before reading further, predict what happens in this scenario:
import psycopg2
conn = psycopg2.connect(DSN)
cur = conn.cursor()
# Step 1: Deduct $500 from Alice's account
cur.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
# Step 2: Add $500 to Bob's account
# --- network blip here - this line raises an exception ---
cur.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
conn.commit()
Alice loses 500 vanishes from the system entirely.
This is not a hypothetical. Without transactions, any failure between two related writes produces inconsistent data - data that violates the rules your system is supposed to enforce. Transactions exist to prevent exactly this.
What You Will Learn
- What ACID actually means in practice - not textbook definitions, but the real guarantees
- How
BEGIN,COMMIT, andROLLBACKmap to Python's DB-API - The four isolation levels and what anomalies each one allows
- How to implement transaction-safe code using context managers and explicit error handling
- Savepoints - how to partially roll back within a transaction
- Deadlocks - what causes them, how to detect them, and how to handle them with retry logic
- Optimistic vs pessimistic locking -
SELECT FOR UPDATEand version columns - Why autocommit mode is dangerous for multi-step operations
Prerequisites
- Python Intermediate: Module 07, Lessons 01–03 (SQL Fundamentals, SQLite, PostgreSQL)
- Understanding of Python context managers (
withstatement) - Basic familiarity with
psycopg2for connecting to PostgreSQL
Part 1 - ACID: What the Guarantees Actually Mean
ACID is four properties that a database transaction must satisfy. Every developer has heard the acronym. Few understand what each one means in practice.
Atomicity - All or Nothing
A transaction is a single unit of work. Either every operation in it succeeds and the changes are applied, or the transaction fails and none of the changes are applied. There is no partial success.
Transfer $500 from Alice to Bob:
BEGIN
UPDATE accounts SET balance = balance - 500 WHERE id = 1 -- debit
UPDATE accounts SET balance = balance + 500 WHERE id = 2 -- credit
COMMIT <-- both updates land, or neither does
If the second UPDATE fails, the database rolls back the first one automatically. The $500 is not lost.
Atomicity is the property that solves the opening puzzle.
Consistency - Rules Are Always Enforced
A transaction transitions the database from one valid state to another valid state. The validity rules - constraints, foreign keys, check constraints, triggers - are enforced at commit time. A transaction that would violate a constraint is rejected entirely.
-- A CHECK constraint: balance must never go below 0
ALTER TABLE accounts ADD CONSTRAINT non_negative CHECK (balance >= 0);
If Alice only has 500, the constraint fires and the entire transaction is rolled back. The database stays consistent.
Isolation - Concurrent Transactions Do Not Interfere
Multiple transactions running at the same time should behave as if they ran one after another. In practice, databases trade off isolation for performance - which is where isolation levels come in (Part 3). But the goal is that transaction B cannot see transaction A's uncommitted changes.
Durability - Committed Data Survives Crashes
Once a transaction commits, the data is permanent. If the server crashes one millisecond after a successful commit, the data is still there after restart. Databases implement this with write-ahead logs (WAL) - changes are written to disk before they are applied to the data pages.
:::note ACID and NoSQL Not all databases offer full ACID transactions. Many NoSQL databases sacrifice isolation or durability for throughput. When working with Redis, Cassandra, or DynamoDB, check which guarantees apply - they vary per operation and per consistency setting. :::
Part 2 - Transaction Control: BEGIN, COMMIT, ROLLBACK
SQL Commands
At the SQL level, a transaction looks like this:
BEGIN;
UPDATE accounts SET balance = balance - 500 WHERE id = 1;
UPDATE accounts SET balance = balance + 500 WHERE id = 2;
COMMIT;
If something goes wrong:
BEGIN;
UPDATE accounts SET balance = balance - 500 WHERE id = 1;
-- error occurs
ROLLBACK; -- undo the debit
How Python's DB-API Maps to This
Python's database drivers (psycopg2, sqlite3) follow the DB-API 2.0 specification (PEP 249), which has a specific transaction model:
- A connection starts in transaction mode by default - an implicit
BEGINis issued automatically before the first SQL statement conn.commit()issues aCOMMITand starts a new implicit transactionconn.rollback()issues aROLLBACKand starts a new implicit transaction
import psycopg2
conn = psycopg2.connect(DSN)
cur = conn.cursor()
try:
cur.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
cur.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
conn.commit() # COMMIT - both changes land
except Exception:
conn.rollback() # ROLLBACK - neither change lands
raise
finally:
cur.close()
conn.close()
This pattern works but is verbose. Python provides a cleaner approach via context managers.
The Connection Context Manager
# psycopg2's connection object is a context manager
# __exit__ commits on success, rolls back on exception
with psycopg2.connect(DSN) as conn:
with conn.cursor() as cur:
cur.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
cur.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
# conn.__exit__ is called here - commits if no exception was raised
:::warning psycopg2 Context Manager Behaviour
In psycopg2, the with conn block does not close the connection on exit - it only commits or rolls back the transaction. You must close the connection separately. If you want automatic connection closing, use with psycopg2.connect(...) as conn: ... conn.close() or use a connection pool with its own lifecycle management.
:::
Transaction State Machine
:::danger Autocommit Mode By default, psycopg2 (and most Python drivers) operate in transaction mode - every statement is part of an implicit transaction that must be explicitly committed or rolled back.
You can disable this with conn.autocommit = True:
conn = psycopg2.connect(DSN)
conn.autocommit = True # every statement commits immediately
cur = conn.cursor()
cur.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
# ^ This commits immediately. No chance to roll back.
cur.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
# ^ If THIS fails, the debit from line above is already permanent.
With autocommit on, there is no transaction protection. Every statement is its own transaction. A failure between two related statements leaves the database in an inconsistent state with no way to recover.
When to use autocommit:
CREATE DATABASE/DROP DATABASE- PostgreSQL requires these outside a transactionVACUUMin PostgreSQL - cannot run inside a transaction- Long-running read-only analytical queries - to avoid holding transaction overhead
Never use autocommit for multi-step write operations. :::
Part 3 - Isolation Levels and Concurrency Anomalies
Isolation is the most nuanced of the ACID properties because it trades correctness for performance. SQL defines four isolation levels, each preventing a different set of concurrency anomalies.
The Three Anomalies
Dirty Read: Transaction B reads data written by Transaction A that has not yet committed. If A rolls back, B has read data that never existed.
Non-Repeatable Read: Transaction B reads a row. Transaction A updates and commits that row. Transaction B reads the same row again and gets a different value.
Phantom Read: Transaction B queries for rows matching a condition (e.g., WHERE balance > 1000). Transaction A inserts a new row that matches. Transaction B queries again and sees a new row - a "phantom".
The Four Isolation Levels
| Level | Dirty Read | Non-Repeatable Read | Phantom Read |
|---|---|---|---|
| READ UNCOMMITTED | Possible | Possible | Possible |
| READ COMMITTED | Prevented | Possible | Possible |
| REPEATABLE READ | Prevented | Prevented | Possible |
| SERIALIZABLE | Prevented | Prevented | Prevented |
PostgreSQL's default is READ COMMITTED. SQLite defaults to SERIALIZABLE.
Setting Isolation Levels in Python
import psycopg2
from psycopg2 import extensions
conn = psycopg2.connect(DSN)
# Set isolation level before starting a transaction
conn.set_isolation_level(extensions.ISOLATION_LEVEL_REPEATABLE_READ)
# Or use the string form (psycopg2 >= 2.7)
conn.set_isolation_level(extensions.ISOLATION_LEVEL_SERIALIZABLE)
Using psycopg2's constants:
from psycopg2 import extensions
ISOLATION_LEVEL_AUTOCOMMIT = extensions.ISOLATION_LEVEL_AUTOCOMMIT
ISOLATION_LEVEL_READ_COMMITTED = extensions.ISOLATION_LEVEL_READ_COMMITTED
ISOLATION_LEVEL_REPEATABLE_READ= extensions.ISOLATION_LEVEL_REPEATABLE_READ
ISOLATION_LEVEL_SERIALIZABLE = extensions.ISOLATION_LEVEL_SERIALIZABLE
Concrete Example: Non-Repeatable Read
# Session A (Transaction 1) - using READ COMMITTED
conn_a = psycopg2.connect(DSN)
conn_a.set_isolation_level(extensions.ISOLATION_LEVEL_READ_COMMITTED)
cur_a = conn_a.cursor()
# Session B (Transaction 2) - runs concurrently
conn_b = psycopg2.connect(DSN)
cur_b = conn_b.cursor()
# T1 reads Alice's balance: 1000
cur_a.execute("SELECT balance FROM accounts WHERE id = 1")
print(cur_a.fetchone()[0]) # 1000
# T2 updates Alice's balance and commits
cur_b.execute("UPDATE accounts SET balance = 500 WHERE id = 1")
conn_b.commit()
# T1 reads again - gets a DIFFERENT value under READ COMMITTED
cur_a.execute("SELECT balance FROM accounts WHERE id = 1")
print(cur_a.fetchone()[0]) # 500 - non-repeatable read!
conn_a.rollback()
Under REPEATABLE READ, the second read by T1 would still return 1000 - the snapshot is fixed at the start of the transaction.
:::tip Choosing the Right Isolation Level
- READ COMMITTED (default): Use for most OLTP workloads. Good performance, no dirty reads. Accept that reads may see committed changes from other transactions mid-transaction.
- REPEATABLE READ: Use when a transaction must see a consistent snapshot - reporting queries that touch multiple tables, audit calculations, anything that aggregates multiple reads.
- SERIALIZABLE: Use for the highest correctness guarantee - financial calculations where even phantom reads would cause errors. Accept the performance overhead and risk of serialization failures (which must be retried). :::
Part 4 - Implementing Transactions in Python
The Standard Pattern
import psycopg2
import logging
logger = logging.getLogger(__name__)
def transfer_funds(conn, from_account_id: int, to_account_id: int, amount: float) -> None:
"""
Transfer amount from one account to another.
Raises on any failure; the caller's conn.rollback() handles cleanup.
"""
with conn.cursor() as cur:
# Check source balance first
cur.execute(
"SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
(from_account_id,)
)
row = cur.fetchone()
if row is None:
raise ValueError(f"Account {from_account_id} not found")
balance = row[0]
if balance < amount:
raise ValueError(
f"Insufficient funds: account {from_account_id} "
f"has {balance:.2f}, needs {amount:.2f}"
)
# Deduct from source
cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account_id)
)
# Add to destination
cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account_id)
)
# Record the transfer in audit log
cur.execute(
"""
INSERT INTO transfer_log (from_account, to_account, amount, created_at)
VALUES (%s, %s, %s, NOW())
""",
(from_account_id, to_account_id, amount)
)
def execute_transfer(from_id: int, to_id: int, amount: float) -> None:
"""
Top-level function that manages the connection and transaction lifecycle.
"""
conn = psycopg2.connect(DSN)
try:
transfer_funds(conn, from_id, to_id, amount)
conn.commit()
logger.info("Transfer of %.2f from %d to %d committed", amount, from_id, to_id)
except Exception:
conn.rollback()
logger.exception("Transfer failed, rolled back")
raise
finally:
conn.close()
Using a Context Manager Wrapper
For cleaner code, define a transaction context manager:
from contextlib import contextmanager
import psycopg2
@contextmanager
def transaction(conn):
"""
Context manager that commits on success and rolls back on exception.
Leaves the connection open for reuse (e.g., from a pool).
"""
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
# Usage
with psycopg2.connect(DSN) as conn:
with transaction(conn):
with conn.cursor() as cur:
cur.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
cur.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
# transaction commits here - or rolls back if an exception escaped the block
Part 5 - Savepoints: Partial Rollback Within a Transaction
A savepoint is a named marker within a transaction. You can roll back to a savepoint without abandoning the entire transaction. This is useful when part of an operation is optional or speculative.
SQL Syntax
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
SAVEPOINT before_bonus;
UPDATE bonuses SET amount = amount + 10 WHERE account_id = 1;
-- if the bonus update fails, we can roll back to before_bonus
-- without losing the main deduction
ROLLBACK TO SAVEPOINT before_bonus;
-- or: RELEASE SAVEPOINT before_bonus; (to confirm and discard the marker)
COMMIT;
Savepoints in Python
def process_order_with_loyalty(conn, order_id: int, customer_id: int) -> dict:
"""
Process an order. Attempt to apply loyalty points, but if loyalty
service fails, still commit the core order without it.
"""
result = {"order_committed": False, "loyalty_applied": False}
with conn.cursor() as cur:
# Core order processing - must succeed
cur.execute(
"INSERT INTO orders (id, customer_id, status) VALUES (%s, %s, 'confirmed')",
(order_id, customer_id)
)
cur.execute(
"UPDATE inventory SET quantity = quantity - 1 WHERE product_id = %s",
(order_id,)
)
# Set a savepoint before the optional loyalty operation
cur.execute("SAVEPOINT before_loyalty")
try:
cur.execute(
"UPDATE loyalty_points SET points = points + 100 WHERE customer_id = %s",
(customer_id,)
)
cur.execute("RELEASE SAVEPOINT before_loyalty")
result["loyalty_applied"] = True
except psycopg2.Error:
# Roll back only the loyalty part - the order survives
cur.execute("ROLLBACK TO SAVEPOINT before_loyalty")
cur.execute("RELEASE SAVEPOINT before_loyalty")
conn.commit()
result["order_committed"] = True
return result
:::note Savepoint Nesting
Savepoints can be nested. Each SAVEPOINT name creates a marker; ROLLBACK TO SAVEPOINT name rewinds to that marker; RELEASE SAVEPOINT name discards the marker without rolling back. If you release a savepoint, all savepoints created after it are also released.
:::
Part 6 - Deadlocks: Detection, Prevention, and Handling
What Is a Deadlock?
A deadlock occurs when two transactions are each waiting for the other to release a lock they need.
Transaction A holds a lock on Row 1
Transaction B holds a lock on Row 2
Transaction A wants a lock on Row 2 → waits for B
Transaction B wants a lock on Row 1 → waits for A
Both wait forever. Neither can proceed.
The database detects this cycle and kills one transaction, returning an error to the application.
Deadlock Sequence Diagram
The Cause: Lock Ordering
The deadlock above happened because:
- Transaction A locked accounts in order: id=1, then id=2
- Transaction B locked accounts in order: id=2, then id=1
The fix is always the same: lock resources in a consistent, canonical order.
def transfer_funds_safe(conn, account_a: int, account_b: int, amount: float) -> None:
"""
Always lock accounts in ascending ID order to prevent deadlocks.
"""
# Sort IDs so both callers lock in the same order regardless of direction
first_id, second_id = sorted([account_a, account_b])
with conn.cursor() as cur:
# Lock in sorted order - no deadlock possible
cur.execute(
"SELECT id, balance FROM accounts WHERE id IN %s ORDER BY id FOR UPDATE",
((first_id, second_id),)
)
rows = {row[0]: row[1] for row in cur.fetchall()}
from_balance = rows[account_a]
if from_balance < amount:
raise ValueError("Insufficient funds")
cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, account_a)
)
cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, account_b)
)
Handling Deadlocks with Retry Logic
Even with good lock ordering, deadlocks can occur in complex systems. The database will always tell you - psycopg2 raises psycopg2.errors.DeadlockDetected. The correct response is to retry the transaction.
import psycopg2
import psycopg2.errors
import time
import random
import logging
logger = logging.getLogger(__name__)
def with_retry(fn, conn, max_attempts: int = 3, base_delay: float = 0.1):
"""
Execute fn(conn) with automatic retry on deadlock.
Uses exponential backoff with jitter to reduce contention storms.
"""
for attempt in range(1, max_attempts + 1):
try:
result = fn(conn)
conn.commit()
return result
except psycopg2.errors.DeadlockDetected:
conn.rollback()
if attempt == max_attempts:
logger.error("Deadlock persisted after %d attempts", max_attempts)
raise
# Exponential backoff: 0.1s, 0.2s, 0.4s... plus random jitter
delay = base_delay * (2 ** (attempt - 1)) + random.uniform(0, 0.05)
logger.warning(
"Deadlock on attempt %d/%d, retrying in %.3fs",
attempt, max_attempts, delay
)
time.sleep(delay)
except Exception:
conn.rollback()
raise
# Usage
def do_transfer(conn):
transfer_funds_safe(conn, from_id=1, to_id=2, amount=500)
conn = psycopg2.connect(DSN)
try:
with_retry(do_transfer, conn)
finally:
conn.close()
:::warning Deadlock vs Serialization Failure PostgreSQL raises two similar errors:
DeadlockDetected(40P01): two or more transactions are waiting in a cycleSerializationFailure(40001): two transactions conflict under SERIALIZABLE isolation
Both should be retried with rollback. Neither indicates a bug - they are expected outcomes of concurrent access. Handle both with the same retry decorator.
RETRYABLE_ERRORS = (
psycopg2.errors.DeadlockDetected,
psycopg2.errors.SerializationFailure,
)
:::
Part 7 - Optimistic vs Pessimistic Locking
Pessimistic Locking: SELECT FOR UPDATE
Pessimistic locking assumes conflicts are likely. It locks rows at read time, preventing others from modifying them until the transaction commits.
def withdraw_pessimistic(conn, account_id: int, amount: float) -> float:
with conn.cursor() as cur:
# Lock the row immediately - no one else can UPDATE it until we COMMIT
cur.execute(
"SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
(account_id,)
)
row = cur.fetchone()
if row is None:
raise ValueError("Account not found")
balance = row[0]
if balance < amount:
raise ValueError("Insufficient funds")
new_balance = balance - amount
cur.execute(
"UPDATE accounts SET balance = %s WHERE id = %s",
(new_balance, account_id)
)
return new_balance
SELECT FOR UPDATE tells PostgreSQL: "I'm going to update this row - lock it now so no one else can change it before I do."
When to use pessimistic locking:
- High contention: many transactions compete for the same rows
- Short transactions: the lock hold time is small
- When correctness is more important than throughput
SELECT FOR UPDATE SKIP LOCKED - a variant useful for job queues:
def claim_next_job(conn) -> dict | None:
"""
Claim the next available job, skipping any already locked by another worker.
Perfect for worker pools processing a jobs table.
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, payload
FROM jobs
WHERE status = 'pending'
ORDER BY created_at
LIMIT 1
FOR UPDATE SKIP LOCKED
"""
)
row = cur.fetchone()
if row is None:
return None
job_id, payload = row
cur.execute(
"UPDATE jobs SET status = 'processing' WHERE id = %s",
(job_id,)
)
return {"id": job_id, "payload": payload}
Optimistic Locking: Version Columns
Optimistic locking assumes conflicts are rare. Instead of locking rows upfront, it detects conflicts at write time using a version column.
-- Schema with a version column
CREATE TABLE accounts (
id SERIAL PRIMARY KEY,
balance NUMERIC(12, 2),
version INTEGER DEFAULT 1
);
def withdraw_optimistic(conn, account_id: int, amount: float) -> float:
"""
Optimistic locking: read without locking, detect conflict at update time.
Raises if someone else modified the row since we read it.
"""
with conn.cursor() as cur:
# Read without locking
cur.execute(
"SELECT balance, version FROM accounts WHERE id = %s",
(account_id,)
)
row = cur.fetchone()
if row is None:
raise ValueError("Account not found")
balance, version = row
if balance < amount:
raise ValueError("Insufficient funds")
new_balance = balance - amount
# Update only if version hasn't changed
cur.execute(
"""
UPDATE accounts
SET balance = %s, version = version + 1
WHERE id = %s AND version = %s
""",
(new_balance, account_id, version)
)
if cur.rowcount == 0:
# Someone else modified this row between our SELECT and UPDATE
raise RuntimeError("Concurrent modification detected - retry the operation")
return new_balance
When to use optimistic locking:
- Low contention: conflicts are rare
- Long-running operations: you do not want to hold locks for seconds
- Read-heavy workloads: most reads succeed without any conflict
| Approach | Lock held | Best for | Risk |
|---|---|---|---|
| Pessimistic (FOR UPDATE) | From SELECT to COMMIT | High contention, short transactions | Deadlocks, reduced throughput |
| Optimistic (version column) | Not held | Low contention, long reads | Retry overhead on conflict |
Full Example - Transaction-Safe Transfer with Retry
Here is a complete, production-quality implementation combining everything from this lesson:
import psycopg2
import psycopg2.errors
import psycopg2.extras
import time
import random
import logging
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Optional
logger = logging.getLogger(__name__)
DSN = "postgresql://user:password@localhost:5432/bankdb"
@dataclass
class TransferResult:
success: bool
from_new_balance: Optional[float] = None
to_new_balance: Optional[float] = None
error: Optional[str] = None
attempts: int = 1
@contextmanager
def get_connection():
"""Connection context manager - always closes the connection."""
conn = psycopg2.connect(DSN, cursor_factory=psycopg2.extras.RealDictCursor)
try:
yield conn
finally:
conn.close()
def _do_transfer(cur, from_id: int, to_id: int, amount: float) -> tuple[float, float]:
"""
Inner transfer logic - runs inside an already-open transaction.
Returns (from_new_balance, to_new_balance).
Lock in sorted ID order to prevent deadlocks.
"""
# Always lock in ascending ID order
lock_ids = tuple(sorted([from_id, to_id]))
cur.execute(
"SELECT id, balance FROM accounts WHERE id = ANY(%s) ORDER BY id FOR UPDATE",
(list(lock_ids),)
)
rows = {row["id"]: row["balance"] for row in cur.fetchall()}
if from_id not in rows:
raise ValueError(f"Source account {from_id} not found")
if to_id not in rows:
raise ValueError(f"Destination account {to_id} not found")
from_balance = rows[from_id]
to_balance = rows[to_id]
if from_balance < amount:
raise ValueError(
f"Insufficient funds: balance={from_balance:.2f}, requested={amount:.2f}"
)
cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_id)
)
cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_id)
)
cur.execute(
"""
INSERT INTO transfer_log (from_account, to_account, amount, created_at)
VALUES (%s, %s, %s, NOW())
""",
(from_id, to_id, amount)
)
return from_balance - amount, to_balance + amount
RETRYABLE = (
psycopg2.errors.DeadlockDetected,
psycopg2.errors.SerializationFailure,
)
def transfer_funds(
from_id: int,
to_id: int,
amount: float,
max_attempts: int = 3,
base_delay: float = 0.05,
) -> TransferResult:
"""
Transfer funds between accounts with full transaction safety.
Retries automatically on deadlock or serialization failure.
"""
if amount <= 0:
return TransferResult(success=False, error="Amount must be positive")
for attempt in range(1, max_attempts + 1):
try:
with get_connection() as conn:
with conn.cursor() as cur:
from_bal, to_bal = _do_transfer(cur, from_id, to_id, amount)
conn.commit()
logger.info(
"Transfer %.2f from account %d to %d committed (attempt %d)",
amount, from_id, to_id, attempt
)
return TransferResult(
success=True,
from_new_balance=from_bal,
to_new_balance=to_bal,
attempts=attempt,
)
except RETRYABLE as exc:
logger.warning(
"Retryable error on attempt %d/%d: %s",
attempt, max_attempts, type(exc).__name__
)
if attempt == max_attempts:
return TransferResult(
success=False,
error=f"Failed after {max_attempts} attempts: {exc}",
attempts=attempt,
)
delay = base_delay * (2 ** (attempt - 1)) + random.uniform(0, base_delay)
time.sleep(delay)
except ValueError as exc:
# Business logic error - do not retry
logger.warning("Transfer rejected: %s", exc)
return TransferResult(success=False, error=str(exc), attempts=attempt)
except Exception as exc:
logger.exception("Unexpected error during transfer")
return TransferResult(success=False, error=str(exc), attempts=attempt)
# Should not reach here
return TransferResult(success=False, error="Unknown failure", attempts=max_attempts)
# ── Demo ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
result = transfer_funds(from_id=1, to_id=2, amount=250.00)
if result.success:
print(f"Transfer complete. "
f"From balance: {result.from_new_balance:.2f}, "
f"To balance: {result.to_new_balance:.2f}")
else:
print(f"Transfer failed: {result.error}")
Common Mistakes
Mistake 1 - Forgetting to Commit
# Wrong - changes are never persisted
conn = psycopg2.connect(DSN)
cur = conn.cursor()
cur.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
conn.close() # session ends, implicit ROLLBACK - row never saved!
# Right
conn = psycopg2.connect(DSN)
cur = conn.cursor()
cur.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
conn.commit() # explicitly persist
conn.close()
Mistake 2 - Catching Exceptions Without Rolling Back
# Wrong - the transaction is now in an error state
try:
cur.execute("UPDATE ...")
except Exception as e:
print(f"Error: {e}")
# No rollback! The connection is now in a broken state.
cur.execute("SELECT ...") # raises InFailedSqlTransaction!
# Right
try:
cur.execute("UPDATE ...")
except Exception as e:
conn.rollback() # reset the transaction state first
print(f"Error: {e}")
# Now safe to issue new statements
Mistake 3 - Long-Running Transactions
# Wrong - this holds locks for the entire loop
conn = psycopg2.connect(DSN)
cur = conn.cursor()
for record in huge_list:
cur.execute("INSERT INTO ...")
conn.commit() # one commit at the very end - holds locks for minutes
# Better - batch commits
conn = psycopg2.connect(DSN)
cur = conn.cursor()
for i, record in enumerate(huge_list):
cur.execute("INSERT INTO ...")
if i % 1000 == 0:
conn.commit() # commit every 1000 rows
conn.commit() # final commit
Graded Practice Challenges
Beginner - Predict the Behaviour
What happens in each scenario? Explain whether data is lost, corrupted, or safe:
Scenario A:
conn = psycopg2.connect(DSN)
cur = conn.cursor()
cur.execute("UPDATE accounts SET balance = 1000 WHERE id = 1")
cur.execute("UPDATE accounts SET balance = 2000 WHERE id = 2")
# Power cut here - server crashes before commit
Scenario B:
conn = psycopg2.connect(DSN)
conn.autocommit = True
cur = conn.cursor()
cur.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
# Exception raised here - division by zero in application code
cur.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
Show Answer
Scenario A: The data is safe. Because neither update was committed before the crash, the database's WAL (Write-Ahead Log) will replay the crash recovery and find no committed transaction to apply. Both accounts remain at their pre-crash values. This is the Atomicity + Durability guarantee.
Scenario B: The data is corrupted. With autocommit on, the first UPDATE commits immediately and permanently. When the exception fires, the second UPDATE never runs. Alice loses $500 and Bob gains nothing. There is no way to recover this without a manual correction. This is exactly the scenario autocommit makes dangerous.
Intermediate - Implement a Savepoint Pattern
You are building an e-commerce order system. Implement process_order(conn, order_id, customer_id, use_coupon) that:
- Inserts the order row (must succeed)
- Deducts inventory (must succeed)
- Applies a coupon discount if
use_coupon=True(optional - if the coupon table raises an error, the order still commits without the discount)
Use savepoints to make step 3 optional without abandoning steps 1 and 2.
Show Reference Solution
import psycopg2
import logging
logger = logging.getLogger(__name__)
def process_order(
conn,
order_id: int,
customer_id: int,
product_id: int,
price: float,
use_coupon: bool = False,
) -> dict:
"""
Process an order atomically. Coupon application is optional via savepoint.
"""
result = {
"order_id": order_id,
"price_charged": price,
"coupon_applied": False,
}
with conn.cursor() as cur:
# Step 1: Insert order (mandatory)
cur.execute(
"""
INSERT INTO orders (id, customer_id, product_id, price, status)
VALUES (%s, %s, %s, %s, 'confirmed')
""",
(order_id, customer_id, product_id, price)
)
# Step 2: Deduct inventory (mandatory)
cur.execute(
"UPDATE inventory SET quantity = quantity - 1 WHERE product_id = %s",
(product_id,)
)
if cur.rowcount == 0:
raise ValueError(f"Product {product_id} not found in inventory")
# Step 3: Apply coupon (optional)
if use_coupon:
cur.execute("SAVEPOINT before_coupon")
try:
cur.execute(
"""
UPDATE coupons
SET used = TRUE
WHERE customer_id = %s AND used = FALSE
RETURNING discount_amount
""",
(customer_id,)
)
row = cur.fetchone()
if row:
discount = row[0]
final_price = price - discount
cur.execute(
"UPDATE orders SET price = %s WHERE id = %s",
(final_price, order_id)
)
result["price_charged"] = final_price
result["coupon_applied"] = True
cur.execute("RELEASE SAVEPOINT before_coupon")
else:
# No valid coupon found - release savepoint, continue without discount
cur.execute("RELEASE SAVEPOINT before_coupon")
except psycopg2.Error as exc:
logger.warning("Coupon application failed: %s - continuing without", exc)
cur.execute("ROLLBACK TO SAVEPOINT before_coupon")
cur.execute("RELEASE SAVEPOINT before_coupon")
conn.commit()
return result
Advanced - Design a Deadlock-Free Job Queue
Design a JobQueue class backed by PostgreSQL that:
enqueue(payload: dict) -> int- inserts a job, returns job IDclaim_job() -> dict | None- atomically claims the next pending job for a worker (no two workers can claim the same job)complete_job(job_id: int, result: dict)- marks a job done with its resultfail_job(job_id: int, error: str)- marks a job failed- Handles the case where a worker crashes mid-job (jobs stuck in 'processing' for >5 minutes are automatically reclaimed)
Show Reference Solution
import psycopg2
import psycopg2.extras
import json
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
SCHEMA = """
CREATE TABLE IF NOT EXISTS jobs (
id SERIAL PRIMARY KEY,
payload JSONB NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
result JSONB,
error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
claimed_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_jobs_status_created ON jobs (status, created_at)
WHERE status = 'pending';
"""
class JobQueue:
def __init__(self, conn):
self.conn = conn
self._ensure_schema()
def _ensure_schema(self):
with self.conn.cursor() as cur:
cur.execute(SCHEMA)
self.conn.commit()
def enqueue(self, payload: dict) -> int:
with self.conn.cursor() as cur:
cur.execute(
"INSERT INTO jobs (payload) VALUES (%s) RETURNING id",
(json.dumps(payload),)
)
job_id = cur.fetchone()[0]
self.conn.commit()
return job_id
def claim_job(self) -> dict | None:
"""
Atomically claim one pending job.
SKIP LOCKED ensures two workers never claim the same job.
Also reclaims jobs stuck in 'processing' for more than 5 minutes.
"""
with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
WITH candidate AS (
SELECT id FROM jobs
WHERE
status = 'pending'
OR (
status = 'processing'
AND claimed_at < NOW() - INTERVAL '5 minutes'
)
ORDER BY created_at
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE jobs
SET status = 'processing', claimed_at = NOW()
FROM candidate
WHERE jobs.id = candidate.id
RETURNING jobs.id, jobs.payload
"""
)
row = cur.fetchone()
if row is None:
self.conn.commit()
return None
self.conn.commit()
return {"id": row["id"], "payload": row["payload"]}
def complete_job(self, job_id: int, result: dict) -> None:
with self.conn.cursor() as cur:
cur.execute(
"""
UPDATE jobs
SET status = 'done',
result = %s,
finished_at = NOW()
WHERE id = %s AND status = 'processing'
""",
(json.dumps(result), job_id)
)
if cur.rowcount == 0:
logger.warning("complete_job: job %d not found or not in processing", job_id)
self.conn.commit()
def fail_job(self, job_id: int, error: str) -> None:
with self.conn.cursor() as cur:
cur.execute(
"""
UPDATE jobs
SET status = 'failed',
error = %s,
finished_at = NOW()
WHERE id = %s
""",
(error, job_id)
)
self.conn.commit()
# ── Worker loop example ───────────────────────────────────────────────────────
def worker_loop(conn):
queue = JobQueue(conn)
while True:
job = queue.claim_job()
if job is None:
break # no more jobs
try:
# Do the actual work
result = {"processed": True, "input_keys": list(job["payload"].keys())}
queue.complete_job(job["id"], result)
except Exception as exc:
queue.fail_job(job["id"], str(exc))
Key design decisions:
SKIP LOCKEDmakesclaim_jobnon-blocking - workers skip rows already locked by another worker- The
WITH candidateCTE ensures theSELECTandUPDATEare atomic - Stuck job reclamation uses
claimed_at < NOW() - INTERVAL '5 minutes'inline - no separate cleanup job needed - All state transitions go through single
UPDATEstatements - no multi-step state machine that could be interrupted
Key Takeaways
- ACID guarantees four things: Atomicity (all-or-nothing), Consistency (rules enforced), Isolation (concurrent transactions do not interfere), Durability (committed data survives crashes)
- Python's DB-API issues an implicit
BEGINbefore the first statement; you must explicitly callconn.commit()orconn.rollback() - Autocommit mode removes transaction protection - never use it for multi-step write operations
- Isolation levels trade correctness for performance:
READ COMMITTEDis the default;SERIALIZABLEis the strongest guarantee - Use a context manager (
with transaction(conn)) to ensure commit/rollback always happens even if an exception escapes - Savepoints let you partially roll back within a transaction - useful for optional operations that should not abort the main transaction on failure
- Deadlocks are caused by lock ordering conflicts - always acquire locks in a consistent, canonical order (e.g., sorted by primary key)
- Handle
DeadlockDetectedandSerializationFailurewith retry logic and exponential backoff - Pessimistic locking (
SELECT FOR UPDATE) prevents conflicts upfront; optimistic locking (version columns) detects them at write time SELECT FOR UPDATE SKIP LOCKEDis the correct pattern for building deadlock-free job queues
What's Next
Lesson 05 - Indexing and Query Optimization covers why some queries take milliseconds and others take minutes on the same hardware.
You will learn how B-tree indexes work internally, how to read EXPLAIN ANALYZE output from Python, the leftmost prefix rule for composite indexes, and when adding an index actually makes performance worse. Understanding indexes is what separates developers who write slow queries from those who write fast ones.
