Python Transactions Practice Problems & Exercises
Practice: Transactions and Data Integrity
← Back to lessonDemonstrate the effect of commit versus rollback on database state.
Solution:
def increment(conn):
conn.execute("UPDATE counter SET val = val + 1")
conn.commit()
def failed_increment(conn):
conn.execute("UPDATE counter SET val = val + 1")
conn.rollback()
import sqlite3
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE counter (val INTEGER)")
conn.execute("INSERT INTO counter VALUES (0)")
conn.commit()
# TODO: Implement two functions:
# - increment(conn): adds 1 to val and commits
# - failed_increment(conn): adds 1 to val then rolls back
def increment(conn):
pass
def failed_increment(conn):
pass
increment(conn)
increment(conn)
failed_increment(conn) # this change should NOT persist
increment(conn)
val = conn.execute("SELECT val FROM counter").fetchone()[0]
print('Final value:', val) # should be 3
Expected Output
Final value: 3Hints
Hint 1: conn.execute('UPDATE counter SET val = val + 1') modifies the row.
Hint 2: conn.commit() makes the change permanent; conn.rollback() undoes it.
Hint 3: Without commit(), changes in sqlite3 persist in the current transaction but are lost on rollback.
Implement an atomic fund transfer that rolls back both updates if any condition is violated.
Solution:
def atomic_transfer(conn, sender_id, receiver_id, amount):
with conn:
row = conn.execute(
"SELECT balance FROM wallets WHERE id = ?", (sender_id,)
).fetchone()
if row[0] < amount:
raise ValueError("Insufficient balance")
conn.execute(
"UPDATE wallets SET balance = balance - ? WHERE id = ?",
(amount, sender_id)
)
conn.execute(
"UPDATE wallets SET balance = balance + ? WHERE id = ?",
(amount, receiver_id)
)
rows = conn.execute(
"SELECT balance FROM wallets WHERE id IN (?,?) ORDER BY id",
(sender_id, receiver_id)
).fetchall()
return (rows[0][0], rows[1][0])
import sqlite3
conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE wallets (id INTEGER PRIMARY KEY, owner TEXT, balance REAL);
INSERT INTO wallets VALUES (1,'Alice',500.0),(2,'Bob',300.0);
""")
# TODO: Transfer amount from sender to receiver atomically.
# If sender balance drops below 0, raise ValueError and roll back.
# Return (sender_new_balance, receiver_new_balance).
def atomic_transfer(conn, sender_id, receiver_id, amount):
pass
print(atomic_transfer(conn, 1, 2, 200))
try:
print(atomic_transfer(conn, 2, 1, 1000))
except ValueError as e:
print('Error:', e)
# Verify state unchanged after failed transfer
balances = conn.execute("SELECT balance FROM wallets ORDER BY id").fetchall()
print('Final balances:', [r[0] for r in balances])
Expected Output
(300.0, 500.0)
Error: Insufficient balance
Final balances: [300.0, 500.0]Hints
Hint 1: Use 'with conn:' to automatically commit on success or rollback on exception.
Hint 2: Read the sender balance first, check it, then perform both UPDATE statements.
Hint 3: Raise ValueError INSIDE the 'with conn:' block to trigger rollback.
Implement an idempotent event-processed marker using INSERT OR IGNORE, returning whether the insert was new.
Solution:
import time
def mark_processed(conn, event_id):
cur = conn.execute(
"INSERT OR IGNORE INTO processed_events (event_id, processed_at) VALUES (?, ?)",
(event_id, time.time())
)
conn.commit()
return cur.rowcount == 1
import sqlite3
conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE processed_events (
event_id TEXT PRIMARY KEY,
processed_at REAL
)
""")
conn.commit()
import time
# TODO: Implement a function that marks an event as processed.
# If already processed, it should NOT raise an error (idempotent).
# Return True if the event was newly inserted, False if it already existed.
def mark_processed(conn, event_id):
pass
print(mark_processed(conn, 'evt-001')) # True — first time
print(mark_processed(conn, 'evt-002')) # True
print(mark_processed(conn, 'evt-001')) # False — already processed
Expected Output
True
True
FalseHints
Hint 1: INSERT OR IGNORE silently skips on conflict; check cursor.rowcount to see if a row was inserted.
Hint 2: rowcount == 1 means a new row was inserted; rowcount == 0 means it was ignored.
Hint 3: This pattern ensures exactly-once processing for event-driven systems.
Build a @transactional decorator that automatically commits or rolls back around a database function.
Solution:
def transactional(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
conn = args[0]
try:
result = func(*args, **kwargs)
conn.commit()
return result
except Exception:
conn.rollback()
raise
return wrapper
import sqlite3
import functools
# TODO: Write a @transactional decorator that:
# - Begins a transaction before calling the function
# - Commits if no exception is raised
# - Rolls back if an exception is raised (and re-raises it)
# The first argument of the decorated function is always conn.
def transactional(func):
pass
@transactional
def create_user(conn, name, email):
conn.execute("INSERT INTO users (name, email) VALUES (?,?)", (name, email))
@transactional
def bad_create(conn, name, email):
conn.execute("INSERT INTO users (name, email) VALUES (?,?)", (name, email))
raise RuntimeError("Something went wrong")
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)")
conn.commit()
create_user(conn, 'Alice', '[email protected]')
try:
bad_create(conn, 'Bob', '[email protected]')
except RuntimeError:
pass
count = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
print('Users:', count) # Should be 1 — Alice only
Expected Output
Users: 1Hints
Hint 1: Use functools.wraps(func) to preserve metadata.
Hint 2: Wrap the function call in try/except: commit on success, rollback on exception.
Hint 3: Re-raise the exception after rollback so callers can handle it.
Use SQL SAVEPOINT to implement partial rollback within a batch transaction, preserving successful sub-operations.
Solution:
def adjust_inventory(conn, adjustments):
results = {}
for item_id, delta in adjustments:
conn.execute("SAVEPOINT adj")
try:
conn.execute(
"UPDATE items SET qty = qty + ? WHERE id = ?",
(delta, item_id)
)
new_qty = conn.execute(
"SELECT qty FROM items WHERE id = ?", (item_id,)
).fetchone()[0]
if new_qty < 0:
raise ValueError("Negative qty")
conn.execute("RELEASE adj")
results[item_id] = (True, new_qty)
except (ValueError, Exception):
conn.execute("ROLLBACK TO adj")
conn.execute("RELEASE adj")
qty = conn.execute(
"SELECT qty FROM items WHERE id = ?", (item_id,)
).fetchone()[0]
results[item_id] = (False, qty)
conn.commit()
return results
import sqlite3
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, qty INTEGER)")
conn.executemany("INSERT INTO items VALUES (?,?,?)", [
(1, 'Widget', 100), (2, 'Gadget', 50), (3, 'Doohickey', 75)
])
conn.commit()
# TODO: Adjust inventory for multiple items inside a single transaction.
# Use a savepoint so that if any individual adjustment fails (qty would go negative),
# only THAT item's change is rolled back, not the entire batch.
# Return a dict of {item_id: (success, final_qty)}.
def adjust_inventory(conn, adjustments):
# adjustments: list of (item_id, delta) where delta can be negative
pass
result = adjust_inventory(conn, [
(1, -30), # OK: 100 - 30 = 70
(2, -60), # Fail: 50 - 60 = -10 (not allowed)
(3, -25), # OK: 75 - 25 = 50
])
for item_id, (ok, qty) in sorted(result.items()):
print(f'Item {item_id}: {"ok" if ok else "failed"}, qty={qty}')
Expected Output
Item 1: ok, qty=70
Item 2: failed, qty=50
Item 3: ok, qty=50Hints
Hint 1: SAVEPOINT sp1 creates a named savepoint within an open transaction.
Hint 2: ROLLBACK TO sp1 rolls back to the savepoint without affecting earlier work.
Hint 3: RELEASE sp1 confirms the savepoint's changes as part of the outer transaction.
Implement optimistic locking using a version column to detect and reject concurrent conflicting updates.
Solution:
def update_document(conn, doc_id, new_content, expected_version):
cur = conn.execute("""
UPDATE documents
SET content = ?, version = version + 1
WHERE id = ? AND version = ?
""", (new_content, doc_id, expected_version))
conn.commit()
return cur.rowcount == 1
import sqlite3
import threading
conn = sqlite3.connect(':memory:', check_same_thread=False)
conn.execute("""
CREATE TABLE documents (
id INTEGER PRIMARY KEY,
content TEXT,
version INTEGER DEFAULT 1
)
""")
conn.execute("INSERT INTO documents VALUES (1, 'Original content', 1)")
conn.commit()
# TODO: Implement optimistic locking.
# update_document reads the current version, makes a change,
# then updates ONLY IF the version hasn't changed.
# Return True if update succeeded, False if there was a conflict.
import threading
def update_document(conn, doc_id, new_content, expected_version):
pass
# Simulate concurrent updates
lock = threading.Lock()
results = []
def worker(content, version):
with lock:
ok = update_document(conn, 1, content, version)
results.append(ok)
# Both start with version 1 — only one should succeed
t1 = threading.Thread(target=worker, args=('Update A', 1))
t2 = threading.Thread(target=worker, args=('Update B', 1))
t1.start(); t1.join()
t2.start(); t2.join()
print('Results:', results) # [True, False] — one succeeds, one fails
final = conn.execute("SELECT content, version FROM documents").fetchone()
print('Final:', final)
Expected Output
Results: [True, False]
Final: ('Update A', 2)Hints
Hint 1: UPDATE ... WHERE id = ? AND version = ? only updates if the version matches.
Hint 2: cursor.rowcount == 0 means another transaction already incremented the version.
Hint 3: Increment version in the SET clause: SET content = ?, version = version + 1.
Implement an execute-with-retry wrapper that handles lock contention with exponential backoff.
Solution:
import time
def execute_with_retry(conn, fn, max_retries=5):
last_err = None
for attempt in range(max_retries):
try:
return fn(conn)
except sqlite3.OperationalError as e:
last_err = e
time.sleep(0.01 * (2 ** attempt))
raise last_err
import sqlite3
import threading
import time
# Deadlocks happen when two transactions each wait for the other's lock.
# PostgreSQL raises psycopg2.errors.DeadlockDetected.
# SQLite raises OperationalError: database is locked.
# TODO: Write a function execute_with_retry(conn, fn, max_retries=5)
# that calls fn(conn) and retries on OperationalError up to max_retries times.
# Use exponential backoff. Return the result or raise after exhausting retries.
def execute_with_retry(conn, fn, max_retries=5):
pass
# Simulate a lock contention scenario
attempt = [0]
def flaky_operation(conn):
attempt[0] += 1
if attempt[0] < 4:
raise sqlite3.OperationalError("database is locked")
return conn.execute("SELECT 42").fetchone()[0]
conn = sqlite3.connect(':memory:')
result = execute_with_retry(conn, flaky_operation)
print(f'Result: {result} after {attempt[0]} attempts')
Expected Output
Result: 42 after 4 attemptsHints
Hint 1: Use a for loop: for attempt in range(max_retries).
Hint 2: time.sleep(0.01 * (2 ** attempt)) provides exponential backoff.
Hint 3: Re-raise after the loop if all retries failed.
Demonstrate read-your-own-writes: a SELECT after an UPDATE within the same transaction reflects the uncommitted change.
Solution:
def reserve_stock(conn, product, qty):
with conn:
row = conn.execute(
"SELECT stock FROM inventory WHERE product = ?", (product,)
).fetchone()
before = row[0]
if before < qty:
raise ValueError(f"Insufficient stock: need {qty}, have {before}")
conn.execute(
"UPDATE inventory SET stock = stock - ? WHERE product = ?",
(qty, product)
)
after = conn.execute(
"SELECT stock FROM inventory WHERE product = ?", (product,)
).fetchone()[0]
return before, after
import sqlite3
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE inventory (product TEXT PRIMARY KEY, stock INTEGER)")
conn.executemany("INSERT INTO inventory VALUES (?,?)", [
('Widget', 50), ('Gadget', 30)
])
conn.commit()
# TODO: Implement reserve_stock(conn, product, qty):
# 1. Check current stock in the SAME transaction
# 2. Deduct qty
# 3. Read back the new stock (read-your-own-writes)
# 4. Return (before, after) stock values
# If stock is insufficient raise ValueError.
# Ensure the read-back reflects the uncommitted write.
def reserve_stock(conn, product, qty):
pass
before, after = reserve_stock(conn, 'Widget', 20)
print(f'Widget: {before} -> {after}')
try:
reserve_stock(conn, 'Gadget', 50)
except ValueError as e:
print(f'Gadget: {e}')
Expected Output
Widget: 50 -> 30
Gadget: Insufficient stock: need 50, have 30Hints
Hint 1: Within a single connection/transaction, a SELECT after UPDATE reads the uncommitted write.
Hint 2: SQLite's default isolation is 'deferred' but within one connection you see your own writes.
Hint 3: Use a try/except block: raise ValueError before the UPDATE if stock is insufficient.
Simulate two-phase commit (2PC) across two database connections — prepare all, then commit or abort all.
Solution:
class TwoPhaseCoordinator:
def __init__(self, connections):
self.connections = connections
for conn in connections:
conn.execute("CREATE TABLE IF NOT EXISTS txn_log (state TEXT)")
conn.commit()
def execute_on_all(self, sql, params=()):
for conn in self.connections:
conn.execute(sql, params)
def prepare_all(self):
try:
for conn in self.connections:
conn.execute("INSERT INTO txn_log VALUES ('prepared')")
return True
except Exception:
return False
def commit_all(self):
for conn in self.connections:
conn.commit()
def abort_all(self):
for conn in self.connections:
conn.rollback()
def run_distributed_txn(coord, operations, simulate_failure=False):
for conn, sql in operations:
conn.execute(sql)
if simulate_failure or not coord.prepare_all():
coord.abort_all()
return False
coord.commit_all()
return True
import sqlite3
# Two-phase commit (2PC) coordinates transactions across multiple databases.
# Phase 1 (Prepare): all participants say "ready"
# Phase 2 (Commit/Abort): coordinator commits or aborts all
# TODO: Simulate 2PC across two SQLite in-memory databases.
# - prepare(conn) writes a 'prepared' record to a log table and returns True/False
# - commit_all(conns) commits all; abort_all(conns) rolls all back
# Demonstrate a successful 2PC and a failed one (one participant fails prepare).
class TwoPhaseCoordinator:
def __init__(self, connections):
self.connections = connections
for conn in connections:
conn.execute("CREATE TABLE IF NOT EXISTS txn_log (state TEXT)")
conn.commit()
def execute_on_all(self, sql, params=()):
for conn in self.connections:
conn.execute(sql, params)
def prepare_all(self):
pass
def commit_all(self):
pass
def abort_all(self):
pass
def run_distributed_txn(coord, operations, simulate_failure=False):
pass
# Test success
db1 = sqlite3.connect(':memory:')
db2 = sqlite3.connect(':memory:')
for db in [db1, db2]:
db.execute("CREATE TABLE data (val INTEGER)")
db.commit()
coord = TwoPhaseCoordinator([db1, db2])
success = run_distributed_txn(coord, [
(db1, "INSERT INTO data VALUES (1)"),
(db2, "INSERT INTO data VALUES (2)"),
])
print('Success txn:', success)
print('DB1 rows:', db1.execute("SELECT COUNT(*) FROM data").fetchone()[0])
print('DB2 rows:', db2.execute("SELECT COUNT(*) FROM data").fetchone()[0])
Expected Output
Success txn: True
DB1 rows: 1
DB2 rows: 1Hints
Hint 1: Phase 1: INSERT 'prepared' into txn_log on each participant. Return True if all succeed.
Hint 2: Phase 2 commit: call conn.commit() on all. Phase 2 abort: call conn.rollback() on all.
Hint 3: If any prepare fails, call abort_all() and return False.
Build an event-sourced ledger using an append-only transaction log; derive balance by replaying events.
Solution:
import time
def append_event(conn, account_id, event_type, amount):
conn.execute(
"INSERT INTO account_events (account_id, event_type, amount, ts) VALUES (?,?,?,?)",
(account_id, event_type, amount, time.time())
)
conn.commit()
def get_balance(conn, account_id):
row = conn.execute("""
SELECT SUM(CASE WHEN event_type = 'deposit' THEN amount ELSE -amount END)
FROM account_events
WHERE account_id = ?
""", (account_id,)).fetchone()
return row[0] or 0.0
def get_history(conn, account_id):
cur = conn.execute(
"SELECT event_type, amount, ts FROM account_events WHERE account_id = ? ORDER BY ts",
(account_id,)
)
return cur.fetchall()
import sqlite3
import json
import time
# Event sourcing: never UPDATE/DELETE — only append events.
# Current state is derived by replaying all events.
# TODO: Implement an event-sourced account system.
# - append_event(conn, account_id, event_type, amount): insert event
# - get_balance(conn, account_id): replay events to compute current balance
# - get_history(conn, account_id): return list of (event_type, amount, ts) tuples
def append_event(conn, account_id, event_type, amount):
pass
def get_balance(conn, account_id):
pass
def get_history(conn, account_id):
pass
conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE account_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id TEXT,
event_type TEXT,
amount REAL,
ts REAL
)
""")
conn.commit()
append_event(conn, 'ACC1', 'deposit', 1000)
append_event(conn, 'ACC1', 'deposit', 500)
append_event(conn, 'ACC1', 'withdrawal', 200)
append_event(conn, 'ACC2', 'deposit', 750)
print('ACC1 balance:', get_balance(conn, 'ACC1'))
print('ACC2 balance:', get_balance(conn, 'ACC2'))
print('ACC1 events:', len(get_history(conn, 'ACC1')))
Expected Output
ACC1 balance: 1300.0
ACC2 balance: 750.0
ACC1 events: 3Hints
Hint 1: Only use INSERT — never UPDATE or DELETE on account_events.
Hint 2: For balance: SUM deposits - SUM withdrawals using CASE WHEN.
Hint 3: For history: SELECT event_type, amount, ts ORDER BY ts returns the event log.
Implement the Saga pattern with compensating transactions to safely roll back partial distributed operations.
Solution:
def run_saga(conn, steps):
completed = []
try:
for step in steps:
step.action()
completed.append(step)
return True, len(completed), 0
except Exception:
rolled_back = []
for step in reversed(completed):
try:
step.compensate()
rolled_back.append(step.name)
except Exception:
pass
return False, len(completed), len(rolled_back)
import sqlite3
# The Saga pattern decomposes a distributed transaction into local transactions.
# Each step has a compensating transaction to undo it.
# Order placement saga:
# Step 1: Reserve inventory
# Step 2: Charge payment
# Step 3: Create shipment
# If any step fails, run compensating transactions in reverse.
conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE inventory (product TEXT PRIMARY KEY, stock INTEGER);
CREATE TABLE payments (id INTEGER PRIMARY KEY AUTOINCREMENT, order_id TEXT, amount REAL, status TEXT);
CREATE TABLE shipments (id INTEGER PRIMARY KEY AUTOINCREMENT, order_id TEXT, status TEXT);
INSERT INTO inventory VALUES ('Widget', 10);
""")
class SagaStep:
def __init__(self, name, action, compensate):
self.name = name
self.action = action
self.compensate = compensate
# TODO: Implement run_saga(conn, steps) that:
# - Executes each step in order
# - If any step raises an exception, runs compensating actions in reverse
# - Returns (success, completed_steps, rolled_back_steps)
def run_saga(conn, steps):
pass
def test_successful_saga():
order_id = 'ORD-001'
steps = [
SagaStep(
'reserve_inventory',
lambda: conn.execute("UPDATE inventory SET stock = stock - 1 WHERE product = 'Widget' AND stock > 0"),
lambda: conn.execute("UPDATE inventory SET stock = stock + 1 WHERE product = 'Widget'")
),
SagaStep(
'charge_payment',
lambda: conn.execute("INSERT INTO payments (order_id, amount, status) VALUES (?,?,?)", (order_id, 99.99, 'charged')),
lambda: conn.execute("UPDATE payments SET status = 'refunded' WHERE order_id = ?", (order_id,))
),
SagaStep(
'create_shipment',
lambda: conn.execute("INSERT INTO shipments (order_id, status) VALUES (?,?)", (order_id, 'pending')),
lambda: conn.execute("UPDATE shipments SET status = 'cancelled' WHERE order_id = ?", (order_id,))
),
]
success, done, rolled = run_saga(conn, steps)
conn.commit()
return success, done, rolled
ok, done, rolled = test_successful_saga()
print(f'Success: {ok}, Steps: {done}, Rolled back: {rolled}')
stock = conn.execute("SELECT stock FROM inventory").fetchone()[0]
print(f'Remaining stock: {stock}')
Expected Output
Success: True, Steps: 3, Rolled back: 0
Remaining stock: 9Hints
Hint 1: Track completed_steps as a list; append each step name after it succeeds.
Hint 2: On exception, iterate reversed(completed_steps) and call each step's compensate().
Hint 3: Return (True, len(completed), 0) on success or (False, len(completed), len(rolled)) on failure.
