Skip to main content

Transactions and Data Integrity

Reading time: ~35 minutes | Level: Intermediate → Engineering

Before reading further, predict what happens in this scenario:

import psycopg2

conn = psycopg2.connect(DSN)
cur = conn.cursor()

# Step 1: Deduct $500 from Alice's account
cur.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")

# Step 2: Add $500 to Bob's account
# --- network blip here - this line raises an exception ---
cur.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")

conn.commit()

Alice loses 500.Bobgetsnothing.The500. Bob gets nothing. The 500 vanishes from the system entirely.

This is not a hypothetical. Without transactions, any failure between two related writes produces inconsistent data - data that violates the rules your system is supposed to enforce. Transactions exist to prevent exactly this.

What You Will Learn

  • What ACID actually means in practice - not textbook definitions, but the real guarantees
  • How BEGIN, COMMIT, and ROLLBACK map to Python's DB-API
  • The four isolation levels and what anomalies each one allows
  • How to implement transaction-safe code using context managers and explicit error handling
  • Savepoints - how to partially roll back within a transaction
  • Deadlocks - what causes them, how to detect them, and how to handle them with retry logic
  • Optimistic vs pessimistic locking - SELECT FOR UPDATE and version columns
  • Why autocommit mode is dangerous for multi-step operations

Prerequisites

  • Python Intermediate: Module 07, Lessons 01–03 (SQL Fundamentals, SQLite, PostgreSQL)
  • Understanding of Python context managers (with statement)
  • Basic familiarity with psycopg2 for connecting to PostgreSQL

Part 1 - ACID: What the Guarantees Actually Mean

ACID is four properties that a database transaction must satisfy. Every developer has heard the acronym. Few understand what each one means in practice.

Atomicity - All or Nothing

A transaction is a single unit of work. Either every operation in it succeeds and the changes are applied, or the transaction fails and none of the changes are applied. There is no partial success.

Transfer $500 from Alice to Bob:
BEGIN
UPDATE accounts SET balance = balance - 500 WHERE id = 1 -- debit
UPDATE accounts SET balance = balance + 500 WHERE id = 2 -- credit
COMMIT <-- both updates land, or neither does

If the second UPDATE fails, the database rolls back the first one automatically. The $500 is not lost.

Atomicity is the property that solves the opening puzzle.

Consistency - Rules Are Always Enforced

A transaction transitions the database from one valid state to another valid state. The validity rules - constraints, foreign keys, check constraints, triggers - are enforced at commit time. A transaction that would violate a constraint is rejected entirely.

-- A CHECK constraint: balance must never go below 0
ALTER TABLE accounts ADD CONSTRAINT non_negative CHECK (balance >= 0);

If Alice only has 200andyoutrytodeduct200 and you try to deduct 500, the constraint fires and the entire transaction is rolled back. The database stays consistent.

Isolation - Concurrent Transactions Do Not Interfere

Multiple transactions running at the same time should behave as if they ran one after another. In practice, databases trade off isolation for performance - which is where isolation levels come in (Part 3). But the goal is that transaction B cannot see transaction A's uncommitted changes.

Durability - Committed Data Survives Crashes

Once a transaction commits, the data is permanent. If the server crashes one millisecond after a successful commit, the data is still there after restart. Databases implement this with write-ahead logs (WAL) - changes are written to disk before they are applied to the data pages.

:::note ACID and NoSQL Not all databases offer full ACID transactions. Many NoSQL databases sacrifice isolation or durability for throughput. When working with Redis, Cassandra, or DynamoDB, check which guarantees apply - they vary per operation and per consistency setting. :::

Part 2 - Transaction Control: BEGIN, COMMIT, ROLLBACK

SQL Commands

At the SQL level, a transaction looks like this:

BEGIN;
UPDATE accounts SET balance = balance - 500 WHERE id = 1;
UPDATE accounts SET balance = balance + 500 WHERE id = 2;
COMMIT;

If something goes wrong:

BEGIN;
UPDATE accounts SET balance = balance - 500 WHERE id = 1;
-- error occurs
ROLLBACK; -- undo the debit

How Python's DB-API Maps to This

Python's database drivers (psycopg2, sqlite3) follow the DB-API 2.0 specification (PEP 249), which has a specific transaction model:

  • A connection starts in transaction mode by default - an implicit BEGIN is issued automatically before the first SQL statement
  • conn.commit() issues a COMMIT and starts a new implicit transaction
  • conn.rollback() issues a ROLLBACK and starts a new implicit transaction
import psycopg2

conn = psycopg2.connect(DSN)
cur = conn.cursor()

try:
cur.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
cur.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
conn.commit() # COMMIT - both changes land
except Exception:
conn.rollback() # ROLLBACK - neither change lands
raise
finally:
cur.close()
conn.close()

This pattern works but is verbose. Python provides a cleaner approach via context managers.

The Connection Context Manager

# psycopg2's connection object is a context manager
# __exit__ commits on success, rolls back on exception

with psycopg2.connect(DSN) as conn:
with conn.cursor() as cur:
cur.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
cur.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
# conn.__exit__ is called here - commits if no exception was raised

:::warning psycopg2 Context Manager Behaviour In psycopg2, the with conn block does not close the connection on exit - it only commits or rolls back the transaction. You must close the connection separately. If you want automatic connection closing, use with psycopg2.connect(...) as conn: ... conn.close() or use a connection pool with its own lifecycle management. :::

Transaction State Machine

:::danger Autocommit Mode By default, psycopg2 (and most Python drivers) operate in transaction mode - every statement is part of an implicit transaction that must be explicitly committed or rolled back.

You can disable this with conn.autocommit = True:

conn = psycopg2.connect(DSN)
conn.autocommit = True # every statement commits immediately

cur = conn.cursor()
cur.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
# ^ This commits immediately. No chance to roll back.
cur.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
# ^ If THIS fails, the debit from line above is already permanent.

With autocommit on, there is no transaction protection. Every statement is its own transaction. A failure between two related statements leaves the database in an inconsistent state with no way to recover.

When to use autocommit:

  • CREATE DATABASE / DROP DATABASE - PostgreSQL requires these outside a transaction
  • VACUUM in PostgreSQL - cannot run inside a transaction
  • Long-running read-only analytical queries - to avoid holding transaction overhead

Never use autocommit for multi-step write operations. :::

Part 3 - Isolation Levels and Concurrency Anomalies

Isolation is the most nuanced of the ACID properties because it trades correctness for performance. SQL defines four isolation levels, each preventing a different set of concurrency anomalies.

The Three Anomalies

Dirty Read: Transaction B reads data written by Transaction A that has not yet committed. If A rolls back, B has read data that never existed.

Non-Repeatable Read: Transaction B reads a row. Transaction A updates and commits that row. Transaction B reads the same row again and gets a different value.

Phantom Read: Transaction B queries for rows matching a condition (e.g., WHERE balance > 1000). Transaction A inserts a new row that matches. Transaction B queries again and sees a new row - a "phantom".

The Four Isolation Levels

LevelDirty ReadNon-Repeatable ReadPhantom Read
READ UNCOMMITTEDPossiblePossiblePossible
READ COMMITTEDPreventedPossiblePossible
REPEATABLE READPreventedPreventedPossible
SERIALIZABLEPreventedPreventedPrevented

PostgreSQL's default is READ COMMITTED. SQLite defaults to SERIALIZABLE.

Setting Isolation Levels in Python

import psycopg2
from psycopg2 import extensions

conn = psycopg2.connect(DSN)

# Set isolation level before starting a transaction
conn.set_isolation_level(extensions.ISOLATION_LEVEL_REPEATABLE_READ)

# Or use the string form (psycopg2 >= 2.7)
conn.set_isolation_level(extensions.ISOLATION_LEVEL_SERIALIZABLE)

Using psycopg2's constants:

from psycopg2 import extensions

ISOLATION_LEVEL_AUTOCOMMIT = extensions.ISOLATION_LEVEL_AUTOCOMMIT
ISOLATION_LEVEL_READ_COMMITTED = extensions.ISOLATION_LEVEL_READ_COMMITTED
ISOLATION_LEVEL_REPEATABLE_READ= extensions.ISOLATION_LEVEL_REPEATABLE_READ
ISOLATION_LEVEL_SERIALIZABLE = extensions.ISOLATION_LEVEL_SERIALIZABLE

Concrete Example: Non-Repeatable Read

# Session A (Transaction 1) - using READ COMMITTED
conn_a = psycopg2.connect(DSN)
conn_a.set_isolation_level(extensions.ISOLATION_LEVEL_READ_COMMITTED)
cur_a = conn_a.cursor()

# Session B (Transaction 2) - runs concurrently
conn_b = psycopg2.connect(DSN)
cur_b = conn_b.cursor()

# T1 reads Alice's balance: 1000
cur_a.execute("SELECT balance FROM accounts WHERE id = 1")
print(cur_a.fetchone()[0]) # 1000

# T2 updates Alice's balance and commits
cur_b.execute("UPDATE accounts SET balance = 500 WHERE id = 1")
conn_b.commit()

# T1 reads again - gets a DIFFERENT value under READ COMMITTED
cur_a.execute("SELECT balance FROM accounts WHERE id = 1")
print(cur_a.fetchone()[0]) # 500 - non-repeatable read!

conn_a.rollback()

Under REPEATABLE READ, the second read by T1 would still return 1000 - the snapshot is fixed at the start of the transaction.

:::tip Choosing the Right Isolation Level

  • READ COMMITTED (default): Use for most OLTP workloads. Good performance, no dirty reads. Accept that reads may see committed changes from other transactions mid-transaction.
  • REPEATABLE READ: Use when a transaction must see a consistent snapshot - reporting queries that touch multiple tables, audit calculations, anything that aggregates multiple reads.
  • SERIALIZABLE: Use for the highest correctness guarantee - financial calculations where even phantom reads would cause errors. Accept the performance overhead and risk of serialization failures (which must be retried). :::

Part 4 - Implementing Transactions in Python

The Standard Pattern

import psycopg2
import logging

logger = logging.getLogger(__name__)

def transfer_funds(conn, from_account_id: int, to_account_id: int, amount: float) -> None:
"""
Transfer amount from one account to another.
Raises on any failure; the caller's conn.rollback() handles cleanup.
"""
with conn.cursor() as cur:
# Check source balance first
cur.execute(
"SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
(from_account_id,)
)
row = cur.fetchone()
if row is None:
raise ValueError(f"Account {from_account_id} not found")

balance = row[0]
if balance < amount:
raise ValueError(
f"Insufficient funds: account {from_account_id} "
f"has {balance:.2f}, needs {amount:.2f}"
)

# Deduct from source
cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account_id)
)

# Add to destination
cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account_id)
)

# Record the transfer in audit log
cur.execute(
"""
INSERT INTO transfer_log (from_account, to_account, amount, created_at)
VALUES (%s, %s, %s, NOW())
""",
(from_account_id, to_account_id, amount)
)


def execute_transfer(from_id: int, to_id: int, amount: float) -> None:
"""
Top-level function that manages the connection and transaction lifecycle.
"""
conn = psycopg2.connect(DSN)
try:
transfer_funds(conn, from_id, to_id, amount)
conn.commit()
logger.info("Transfer of %.2f from %d to %d committed", amount, from_id, to_id)
except Exception:
conn.rollback()
logger.exception("Transfer failed, rolled back")
raise
finally:
conn.close()

Using a Context Manager Wrapper

For cleaner code, define a transaction context manager:

from contextlib import contextmanager
import psycopg2

@contextmanager
def transaction(conn):
"""
Context manager that commits on success and rolls back on exception.
Leaves the connection open for reuse (e.g., from a pool).
"""
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise


# Usage
with psycopg2.connect(DSN) as conn:
with transaction(conn):
with conn.cursor() as cur:
cur.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
cur.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
# transaction commits here - or rolls back if an exception escaped the block

Part 5 - Savepoints: Partial Rollback Within a Transaction

A savepoint is a named marker within a transaction. You can roll back to a savepoint without abandoning the entire transaction. This is useful when part of an operation is optional or speculative.

SQL Syntax

BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;

SAVEPOINT before_bonus;

UPDATE bonuses SET amount = amount + 10 WHERE account_id = 1;
-- if the bonus update fails, we can roll back to before_bonus
-- without losing the main deduction

ROLLBACK TO SAVEPOINT before_bonus;
-- or: RELEASE SAVEPOINT before_bonus; (to confirm and discard the marker)

COMMIT;

Savepoints in Python

def process_order_with_loyalty(conn, order_id: int, customer_id: int) -> dict:
"""
Process an order. Attempt to apply loyalty points, but if loyalty
service fails, still commit the core order without it.
"""
result = {"order_committed": False, "loyalty_applied": False}

with conn.cursor() as cur:
# Core order processing - must succeed
cur.execute(
"INSERT INTO orders (id, customer_id, status) VALUES (%s, %s, 'confirmed')",
(order_id, customer_id)
)
cur.execute(
"UPDATE inventory SET quantity = quantity - 1 WHERE product_id = %s",
(order_id,)
)

# Set a savepoint before the optional loyalty operation
cur.execute("SAVEPOINT before_loyalty")

try:
cur.execute(
"UPDATE loyalty_points SET points = points + 100 WHERE customer_id = %s",
(customer_id,)
)
cur.execute("RELEASE SAVEPOINT before_loyalty")
result["loyalty_applied"] = True
except psycopg2.Error:
# Roll back only the loyalty part - the order survives
cur.execute("ROLLBACK TO SAVEPOINT before_loyalty")
cur.execute("RELEASE SAVEPOINT before_loyalty")

conn.commit()
result["order_committed"] = True
return result

:::note Savepoint Nesting Savepoints can be nested. Each SAVEPOINT name creates a marker; ROLLBACK TO SAVEPOINT name rewinds to that marker; RELEASE SAVEPOINT name discards the marker without rolling back. If you release a savepoint, all savepoints created after it are also released. :::

Part 6 - Deadlocks: Detection, Prevention, and Handling

What Is a Deadlock?

A deadlock occurs when two transactions are each waiting for the other to release a lock they need.

Transaction A holds a lock on Row 1
Transaction B holds a lock on Row 2

Transaction A wants a lock on Row 2 → waits for B
Transaction B wants a lock on Row 1 → waits for A

Both wait forever. Neither can proceed.

The database detects this cycle and kills one transaction, returning an error to the application.

Deadlock Sequence Diagram

The Cause: Lock Ordering

The deadlock above happened because:

  • Transaction A locked accounts in order: id=1, then id=2
  • Transaction B locked accounts in order: id=2, then id=1

The fix is always the same: lock resources in a consistent, canonical order.

def transfer_funds_safe(conn, account_a: int, account_b: int, amount: float) -> None:
"""
Always lock accounts in ascending ID order to prevent deadlocks.
"""
# Sort IDs so both callers lock in the same order regardless of direction
first_id, second_id = sorted([account_a, account_b])

with conn.cursor() as cur:
# Lock in sorted order - no deadlock possible
cur.execute(
"SELECT id, balance FROM accounts WHERE id IN %s ORDER BY id FOR UPDATE",
((first_id, second_id),)
)
rows = {row[0]: row[1] for row in cur.fetchall()}

from_balance = rows[account_a]
if from_balance < amount:
raise ValueError("Insufficient funds")

cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, account_a)
)
cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, account_b)
)

Handling Deadlocks with Retry Logic

Even with good lock ordering, deadlocks can occur in complex systems. The database will always tell you - psycopg2 raises psycopg2.errors.DeadlockDetected. The correct response is to retry the transaction.

import psycopg2
import psycopg2.errors
import time
import random
import logging

logger = logging.getLogger(__name__)

def with_retry(fn, conn, max_attempts: int = 3, base_delay: float = 0.1):
"""
Execute fn(conn) with automatic retry on deadlock.
Uses exponential backoff with jitter to reduce contention storms.
"""
for attempt in range(1, max_attempts + 1):
try:
result = fn(conn)
conn.commit()
return result
except psycopg2.errors.DeadlockDetected:
conn.rollback()
if attempt == max_attempts:
logger.error("Deadlock persisted after %d attempts", max_attempts)
raise
# Exponential backoff: 0.1s, 0.2s, 0.4s... plus random jitter
delay = base_delay * (2 ** (attempt - 1)) + random.uniform(0, 0.05)
logger.warning(
"Deadlock on attempt %d/%d, retrying in %.3fs",
attempt, max_attempts, delay
)
time.sleep(delay)
except Exception:
conn.rollback()
raise


# Usage
def do_transfer(conn):
transfer_funds_safe(conn, from_id=1, to_id=2, amount=500)

conn = psycopg2.connect(DSN)
try:
with_retry(do_transfer, conn)
finally:
conn.close()

:::warning Deadlock vs Serialization Failure PostgreSQL raises two similar errors:

  • DeadlockDetected (40P01): two or more transactions are waiting in a cycle
  • SerializationFailure (40001): two transactions conflict under SERIALIZABLE isolation

Both should be retried with rollback. Neither indicates a bug - they are expected outcomes of concurrent access. Handle both with the same retry decorator.

RETRYABLE_ERRORS = (
psycopg2.errors.DeadlockDetected,
psycopg2.errors.SerializationFailure,
)

:::

Part 7 - Optimistic vs Pessimistic Locking

Pessimistic Locking: SELECT FOR UPDATE

Pessimistic locking assumes conflicts are likely. It locks rows at read time, preventing others from modifying them until the transaction commits.

def withdraw_pessimistic(conn, account_id: int, amount: float) -> float:
with conn.cursor() as cur:
# Lock the row immediately - no one else can UPDATE it until we COMMIT
cur.execute(
"SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
(account_id,)
)
row = cur.fetchone()
if row is None:
raise ValueError("Account not found")

balance = row[0]
if balance < amount:
raise ValueError("Insufficient funds")

new_balance = balance - amount
cur.execute(
"UPDATE accounts SET balance = %s WHERE id = %s",
(new_balance, account_id)
)
return new_balance

SELECT FOR UPDATE tells PostgreSQL: "I'm going to update this row - lock it now so no one else can change it before I do."

When to use pessimistic locking:

  • High contention: many transactions compete for the same rows
  • Short transactions: the lock hold time is small
  • When correctness is more important than throughput

SELECT FOR UPDATE SKIP LOCKED - a variant useful for job queues:

def claim_next_job(conn) -> dict | None:
"""
Claim the next available job, skipping any already locked by another worker.
Perfect for worker pools processing a jobs table.
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, payload
FROM jobs
WHERE status = 'pending'
ORDER BY created_at
LIMIT 1
FOR UPDATE SKIP LOCKED
"""
)
row = cur.fetchone()
if row is None:
return None

job_id, payload = row
cur.execute(
"UPDATE jobs SET status = 'processing' WHERE id = %s",
(job_id,)
)
return {"id": job_id, "payload": payload}

Optimistic Locking: Version Columns

Optimistic locking assumes conflicts are rare. Instead of locking rows upfront, it detects conflicts at write time using a version column.

-- Schema with a version column
CREATE TABLE accounts (
id SERIAL PRIMARY KEY,
balance NUMERIC(12, 2),
version INTEGER DEFAULT 1
);
def withdraw_optimistic(conn, account_id: int, amount: float) -> float:
"""
Optimistic locking: read without locking, detect conflict at update time.
Raises if someone else modified the row since we read it.
"""
with conn.cursor() as cur:
# Read without locking
cur.execute(
"SELECT balance, version FROM accounts WHERE id = %s",
(account_id,)
)
row = cur.fetchone()
if row is None:
raise ValueError("Account not found")

balance, version = row
if balance < amount:
raise ValueError("Insufficient funds")

new_balance = balance - amount

# Update only if version hasn't changed
cur.execute(
"""
UPDATE accounts
SET balance = %s, version = version + 1
WHERE id = %s AND version = %s
""",
(new_balance, account_id, version)
)

if cur.rowcount == 0:
# Someone else modified this row between our SELECT and UPDATE
raise RuntimeError("Concurrent modification detected - retry the operation")

return new_balance

When to use optimistic locking:

  • Low contention: conflicts are rare
  • Long-running operations: you do not want to hold locks for seconds
  • Read-heavy workloads: most reads succeed without any conflict
ApproachLock heldBest forRisk
Pessimistic (FOR UPDATE)From SELECT to COMMITHigh contention, short transactionsDeadlocks, reduced throughput
Optimistic (version column)Not heldLow contention, long readsRetry overhead on conflict

Full Example - Transaction-Safe Transfer with Retry

Here is a complete, production-quality implementation combining everything from this lesson:

import psycopg2
import psycopg2.errors
import psycopg2.extras
import time
import random
import logging
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Optional

logger = logging.getLogger(__name__)

DSN = "postgresql://user:password@localhost:5432/bankdb"


@dataclass
class TransferResult:
success: bool
from_new_balance: Optional[float] = None
to_new_balance: Optional[float] = None
error: Optional[str] = None
attempts: int = 1


@contextmanager
def get_connection():
"""Connection context manager - always closes the connection."""
conn = psycopg2.connect(DSN, cursor_factory=psycopg2.extras.RealDictCursor)
try:
yield conn
finally:
conn.close()


def _do_transfer(cur, from_id: int, to_id: int, amount: float) -> tuple[float, float]:
"""
Inner transfer logic - runs inside an already-open transaction.
Returns (from_new_balance, to_new_balance).
Lock in sorted ID order to prevent deadlocks.
"""
# Always lock in ascending ID order
lock_ids = tuple(sorted([from_id, to_id]))
cur.execute(
"SELECT id, balance FROM accounts WHERE id = ANY(%s) ORDER BY id FOR UPDATE",
(list(lock_ids),)
)
rows = {row["id"]: row["balance"] for row in cur.fetchall()}

if from_id not in rows:
raise ValueError(f"Source account {from_id} not found")
if to_id not in rows:
raise ValueError(f"Destination account {to_id} not found")

from_balance = rows[from_id]
to_balance = rows[to_id]

if from_balance < amount:
raise ValueError(
f"Insufficient funds: balance={from_balance:.2f}, requested={amount:.2f}"
)

cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_id)
)
cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_id)
)
cur.execute(
"""
INSERT INTO transfer_log (from_account, to_account, amount, created_at)
VALUES (%s, %s, %s, NOW())
""",
(from_id, to_id, amount)
)

return from_balance - amount, to_balance + amount


RETRYABLE = (
psycopg2.errors.DeadlockDetected,
psycopg2.errors.SerializationFailure,
)


def transfer_funds(
from_id: int,
to_id: int,
amount: float,
max_attempts: int = 3,
base_delay: float = 0.05,
) -> TransferResult:
"""
Transfer funds between accounts with full transaction safety.
Retries automatically on deadlock or serialization failure.
"""
if amount <= 0:
return TransferResult(success=False, error="Amount must be positive")

for attempt in range(1, max_attempts + 1):
try:
with get_connection() as conn:
with conn.cursor() as cur:
from_bal, to_bal = _do_transfer(cur, from_id, to_id, amount)
conn.commit()

logger.info(
"Transfer %.2f from account %d to %d committed (attempt %d)",
amount, from_id, to_id, attempt
)
return TransferResult(
success=True,
from_new_balance=from_bal,
to_new_balance=to_bal,
attempts=attempt,
)

except RETRYABLE as exc:
logger.warning(
"Retryable error on attempt %d/%d: %s",
attempt, max_attempts, type(exc).__name__
)
if attempt == max_attempts:
return TransferResult(
success=False,
error=f"Failed after {max_attempts} attempts: {exc}",
attempts=attempt,
)
delay = base_delay * (2 ** (attempt - 1)) + random.uniform(0, base_delay)
time.sleep(delay)

except ValueError as exc:
# Business logic error - do not retry
logger.warning("Transfer rejected: %s", exc)
return TransferResult(success=False, error=str(exc), attempts=attempt)

except Exception as exc:
logger.exception("Unexpected error during transfer")
return TransferResult(success=False, error=str(exc), attempts=attempt)

# Should not reach here
return TransferResult(success=False, error="Unknown failure", attempts=max_attempts)


# ── Demo ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)

result = transfer_funds(from_id=1, to_id=2, amount=250.00)
if result.success:
print(f"Transfer complete. "
f"From balance: {result.from_new_balance:.2f}, "
f"To balance: {result.to_new_balance:.2f}")
else:
print(f"Transfer failed: {result.error}")

Common Mistakes

Mistake 1 - Forgetting to Commit

# Wrong - changes are never persisted
conn = psycopg2.connect(DSN)
cur = conn.cursor()
cur.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
conn.close() # session ends, implicit ROLLBACK - row never saved!

# Right
conn = psycopg2.connect(DSN)
cur = conn.cursor()
cur.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
conn.commit() # explicitly persist
conn.close()

Mistake 2 - Catching Exceptions Without Rolling Back

# Wrong - the transaction is now in an error state
try:
cur.execute("UPDATE ...")
except Exception as e:
print(f"Error: {e}")
# No rollback! The connection is now in a broken state.
cur.execute("SELECT ...") # raises InFailedSqlTransaction!

# Right
try:
cur.execute("UPDATE ...")
except Exception as e:
conn.rollback() # reset the transaction state first
print(f"Error: {e}")
# Now safe to issue new statements

Mistake 3 - Long-Running Transactions

# Wrong - this holds locks for the entire loop
conn = psycopg2.connect(DSN)
cur = conn.cursor()
for record in huge_list:
cur.execute("INSERT INTO ...")
conn.commit() # one commit at the very end - holds locks for minutes

# Better - batch commits
conn = psycopg2.connect(DSN)
cur = conn.cursor()
for i, record in enumerate(huge_list):
cur.execute("INSERT INTO ...")
if i % 1000 == 0:
conn.commit() # commit every 1000 rows
conn.commit() # final commit

Graded Practice Challenges

Beginner - Predict the Behaviour

What happens in each scenario? Explain whether data is lost, corrupted, or safe:

Scenario A:

conn = psycopg2.connect(DSN)
cur = conn.cursor()
cur.execute("UPDATE accounts SET balance = 1000 WHERE id = 1")
cur.execute("UPDATE accounts SET balance = 2000 WHERE id = 2")
# Power cut here - server crashes before commit

Scenario B:

conn = psycopg2.connect(DSN)
conn.autocommit = True
cur = conn.cursor()
cur.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
# Exception raised here - division by zero in application code
cur.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
Show Answer

Scenario A: The data is safe. Because neither update was committed before the crash, the database's WAL (Write-Ahead Log) will replay the crash recovery and find no committed transaction to apply. Both accounts remain at their pre-crash values. This is the Atomicity + Durability guarantee.

Scenario B: The data is corrupted. With autocommit on, the first UPDATE commits immediately and permanently. When the exception fires, the second UPDATE never runs. Alice loses $500 and Bob gains nothing. There is no way to recover this without a manual correction. This is exactly the scenario autocommit makes dangerous.

Intermediate - Implement a Savepoint Pattern

You are building an e-commerce order system. Implement process_order(conn, order_id, customer_id, use_coupon) that:

  1. Inserts the order row (must succeed)
  2. Deducts inventory (must succeed)
  3. Applies a coupon discount if use_coupon=True (optional - if the coupon table raises an error, the order still commits without the discount)

Use savepoints to make step 3 optional without abandoning steps 1 and 2.

Show Reference Solution
import psycopg2
import logging

logger = logging.getLogger(__name__)

def process_order(
conn,
order_id: int,
customer_id: int,
product_id: int,
price: float,
use_coupon: bool = False,
) -> dict:
"""
Process an order atomically. Coupon application is optional via savepoint.
"""
result = {
"order_id": order_id,
"price_charged": price,
"coupon_applied": False,
}

with conn.cursor() as cur:
# Step 1: Insert order (mandatory)
cur.execute(
"""
INSERT INTO orders (id, customer_id, product_id, price, status)
VALUES (%s, %s, %s, %s, 'confirmed')
""",
(order_id, customer_id, product_id, price)
)

# Step 2: Deduct inventory (mandatory)
cur.execute(
"UPDATE inventory SET quantity = quantity - 1 WHERE product_id = %s",
(product_id,)
)
if cur.rowcount == 0:
raise ValueError(f"Product {product_id} not found in inventory")

# Step 3: Apply coupon (optional)
if use_coupon:
cur.execute("SAVEPOINT before_coupon")
try:
cur.execute(
"""
UPDATE coupons
SET used = TRUE
WHERE customer_id = %s AND used = FALSE
RETURNING discount_amount
""",
(customer_id,)
)
row = cur.fetchone()
if row:
discount = row[0]
final_price = price - discount
cur.execute(
"UPDATE orders SET price = %s WHERE id = %s",
(final_price, order_id)
)
result["price_charged"] = final_price
result["coupon_applied"] = True
cur.execute("RELEASE SAVEPOINT before_coupon")
else:
# No valid coupon found - release savepoint, continue without discount
cur.execute("RELEASE SAVEPOINT before_coupon")
except psycopg2.Error as exc:
logger.warning("Coupon application failed: %s - continuing without", exc)
cur.execute("ROLLBACK TO SAVEPOINT before_coupon")
cur.execute("RELEASE SAVEPOINT before_coupon")

conn.commit()
return result

Advanced - Design a Deadlock-Free Job Queue

Design a JobQueue class backed by PostgreSQL that:

  1. enqueue(payload: dict) -> int - inserts a job, returns job ID
  2. claim_job() -> dict | None - atomically claims the next pending job for a worker (no two workers can claim the same job)
  3. complete_job(job_id: int, result: dict) - marks a job done with its result
  4. fail_job(job_id: int, error: str) - marks a job failed
  5. Handles the case where a worker crashes mid-job (jobs stuck in 'processing' for >5 minutes are automatically reclaimed)
Show Reference Solution
import psycopg2
import psycopg2.extras
import json
import logging
from datetime import datetime

logger = logging.getLogger(__name__)

SCHEMA = """
CREATE TABLE IF NOT EXISTS jobs (
id SERIAL PRIMARY KEY,
payload JSONB NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
result JSONB,
error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
claimed_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_jobs_status_created ON jobs (status, created_at)
WHERE status = 'pending';
"""

class JobQueue:
def __init__(self, conn):
self.conn = conn
self._ensure_schema()

def _ensure_schema(self):
with self.conn.cursor() as cur:
cur.execute(SCHEMA)
self.conn.commit()

def enqueue(self, payload: dict) -> int:
with self.conn.cursor() as cur:
cur.execute(
"INSERT INTO jobs (payload) VALUES (%s) RETURNING id",
(json.dumps(payload),)
)
job_id = cur.fetchone()[0]
self.conn.commit()
return job_id

def claim_job(self) -> dict | None:
"""
Atomically claim one pending job.
SKIP LOCKED ensures two workers never claim the same job.
Also reclaims jobs stuck in 'processing' for more than 5 minutes.
"""
with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
WITH candidate AS (
SELECT id FROM jobs
WHERE
status = 'pending'
OR (
status = 'processing'
AND claimed_at < NOW() - INTERVAL '5 minutes'
)
ORDER BY created_at
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE jobs
SET status = 'processing', claimed_at = NOW()
FROM candidate
WHERE jobs.id = candidate.id
RETURNING jobs.id, jobs.payload
"""
)
row = cur.fetchone()
if row is None:
self.conn.commit()
return None

self.conn.commit()
return {"id": row["id"], "payload": row["payload"]}

def complete_job(self, job_id: int, result: dict) -> None:
with self.conn.cursor() as cur:
cur.execute(
"""
UPDATE jobs
SET status = 'done',
result = %s,
finished_at = NOW()
WHERE id = %s AND status = 'processing'
""",
(json.dumps(result), job_id)
)
if cur.rowcount == 0:
logger.warning("complete_job: job %d not found or not in processing", job_id)
self.conn.commit()

def fail_job(self, job_id: int, error: str) -> None:
with self.conn.cursor() as cur:
cur.execute(
"""
UPDATE jobs
SET status = 'failed',
error = %s,
finished_at = NOW()
WHERE id = %s
""",
(error, job_id)
)
self.conn.commit()


# ── Worker loop example ───────────────────────────────────────────────────────
def worker_loop(conn):
queue = JobQueue(conn)
while True:
job = queue.claim_job()
if job is None:
break # no more jobs

try:
# Do the actual work
result = {"processed": True, "input_keys": list(job["payload"].keys())}
queue.complete_job(job["id"], result)
except Exception as exc:
queue.fail_job(job["id"], str(exc))

Key design decisions:

  • SKIP LOCKED makes claim_job non-blocking - workers skip rows already locked by another worker
  • The WITH candidate CTE ensures the SELECT and UPDATE are atomic
  • Stuck job reclamation uses claimed_at < NOW() - INTERVAL '5 minutes' inline - no separate cleanup job needed
  • All state transitions go through single UPDATE statements - no multi-step state machine that could be interrupted

Key Takeaways

  • ACID guarantees four things: Atomicity (all-or-nothing), Consistency (rules enforced), Isolation (concurrent transactions do not interfere), Durability (committed data survives crashes)
  • Python's DB-API issues an implicit BEGIN before the first statement; you must explicitly call conn.commit() or conn.rollback()
  • Autocommit mode removes transaction protection - never use it for multi-step write operations
  • Isolation levels trade correctness for performance: READ COMMITTED is the default; SERIALIZABLE is the strongest guarantee
  • Use a context manager (with transaction(conn)) to ensure commit/rollback always happens even if an exception escapes
  • Savepoints let you partially roll back within a transaction - useful for optional operations that should not abort the main transaction on failure
  • Deadlocks are caused by lock ordering conflicts - always acquire locks in a consistent, canonical order (e.g., sorted by primary key)
  • Handle DeadlockDetected and SerializationFailure with retry logic and exponential backoff
  • Pessimistic locking (SELECT FOR UPDATE) prevents conflicts upfront; optimistic locking (version columns) detects them at write time
  • SELECT FOR UPDATE SKIP LOCKED is the correct pattern for building deadlock-free job queues

What's Next

Lesson 05 - Indexing and Query Optimization covers why some queries take milliseconds and others take minutes on the same hardware.

You will learn how B-tree indexes work internally, how to read EXPLAIN ANALYZE output from Python, the leftmost prefix rule for composite indexes, and when adding an index actually makes performance worse. Understanding indexes is what separates developers who write slow queries from those who write fast ones.

© 2026 EngineersOfAI. All rights reserved.