Skip to main content

Python Transactions Practice Problems & Exercises

Practice: Transactions and Data Integrity

11 problems4 Easy4 Medium3 Hard50–70 min
← Back to lesson

#1Basic Commit and RollbackEasy
commitrollbackautocommit

Demonstrate the effect of commit versus rollback on database state.

Solution:

def increment(conn):
conn.execute("UPDATE counter SET val = val + 1")
conn.commit()

def failed_increment(conn):
conn.execute("UPDATE counter SET val = val + 1")
conn.rollback()
import sqlite3

conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE counter (val INTEGER)")
conn.execute("INSERT INTO counter VALUES (0)")
conn.commit()

# TODO: Implement two functions:
# - increment(conn): adds 1 to val and commits
# - failed_increment(conn): adds 1 to val then rolls back

def increment(conn):
  pass

def failed_increment(conn):
  pass

increment(conn)
increment(conn)
failed_increment(conn)  # this change should NOT persist
increment(conn)

val = conn.execute("SELECT val FROM counter").fetchone()[0]
print('Final value:', val)  # should be 3
Expected Output
Final value: 3
Hints

Hint 1: conn.execute('UPDATE counter SET val = val + 1') modifies the row.

Hint 2: conn.commit() makes the change permanent; conn.rollback() undoes it.

Hint 3: Without commit(), changes in sqlite3 persist in the current transaction but are lost on rollback.


#2Atomic Multi-Statement TransferEasy
atomicityBEGINCOMMITmulti-statement

Implement an atomic fund transfer that rolls back both updates if any condition is violated.

Solution:

def atomic_transfer(conn, sender_id, receiver_id, amount):
with conn:
row = conn.execute(
"SELECT balance FROM wallets WHERE id = ?", (sender_id,)
).fetchone()
if row[0] < amount:
raise ValueError("Insufficient balance")
conn.execute(
"UPDATE wallets SET balance = balance - ? WHERE id = ?",
(amount, sender_id)
)
conn.execute(
"UPDATE wallets SET balance = balance + ? WHERE id = ?",
(amount, receiver_id)
)
rows = conn.execute(
"SELECT balance FROM wallets WHERE id IN (?,?) ORDER BY id",
(sender_id, receiver_id)
).fetchall()
return (rows[0][0], rows[1][0])
import sqlite3

conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE wallets (id INTEGER PRIMARY KEY, owner TEXT, balance REAL);
INSERT INTO wallets VALUES (1,'Alice',500.0),(2,'Bob',300.0);
""")

# TODO: Transfer amount from sender to receiver atomically.
# If sender balance drops below 0, raise ValueError and roll back.
# Return (sender_new_balance, receiver_new_balance).

def atomic_transfer(conn, sender_id, receiver_id, amount):
  pass

print(atomic_transfer(conn, 1, 2, 200))
try:
  print(atomic_transfer(conn, 2, 1, 1000))
except ValueError as e:
  print('Error:', e)

# Verify state unchanged after failed transfer
balances = conn.execute("SELECT balance FROM wallets ORDER BY id").fetchall()
print('Final balances:', [r[0] for r in balances])
Expected Output
(300.0, 500.0)
Error: Insufficient balance
Final balances: [300.0, 500.0]
Hints

Hint 1: Use 'with conn:' to automatically commit on success or rollback on exception.

Hint 2: Read the sender balance first, check it, then perform both UPDATE statements.

Hint 3: Raise ValueError INSIDE the 'with conn:' block to trigger rollback.


#3Idempotent INSERT with ON CONFLICTEasy
idempotentON CONFLICTupsertIGNORE

Implement an idempotent event-processed marker using INSERT OR IGNORE, returning whether the insert was new.

Solution:

import time

def mark_processed(conn, event_id):
cur = conn.execute(
"INSERT OR IGNORE INTO processed_events (event_id, processed_at) VALUES (?, ?)",
(event_id, time.time())
)
conn.commit()
return cur.rowcount == 1
import sqlite3

conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE processed_events (
  event_id TEXT PRIMARY KEY,
  processed_at REAL
)
""")
conn.commit()

import time

# TODO: Implement a function that marks an event as processed.
# If already processed, it should NOT raise an error (idempotent).
# Return True if the event was newly inserted, False if it already existed.

def mark_processed(conn, event_id):
  pass

print(mark_processed(conn, 'evt-001'))  # True — first time
print(mark_processed(conn, 'evt-002'))  # True
print(mark_processed(conn, 'evt-001'))  # False — already processed
Expected Output
True
True
False
Hints

Hint 1: INSERT OR IGNORE silently skips on conflict; check cursor.rowcount to see if a row was inserted.

Hint 2: rowcount == 1 means a new row was inserted; rowcount == 0 means it was ignored.

Hint 3: This pattern ensures exactly-once processing for event-driven systems.


#4Transaction Within a Function — Decorator PatternEasy
decoratortransactionfunctools

Build a @transactional decorator that automatically commits or rolls back around a database function.

Solution:

def transactional(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
conn = args[0]
try:
result = func(*args, **kwargs)
conn.commit()
return result
except Exception:
conn.rollback()
raise
return wrapper
import sqlite3
import functools

# TODO: Write a @transactional decorator that:
# - Begins a transaction before calling the function
# - Commits if no exception is raised
# - Rolls back if an exception is raised (and re-raises it)
# The first argument of the decorated function is always conn.

def transactional(func):
  pass

@transactional
def create_user(conn, name, email):
  conn.execute("INSERT INTO users (name, email) VALUES (?,?)", (name, email))

@transactional
def bad_create(conn, name, email):
  conn.execute("INSERT INTO users (name, email) VALUES (?,?)", (name, email))
  raise RuntimeError("Something went wrong")

conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)")
conn.commit()

create_user(conn, 'Alice', '[email protected]')

try:
  bad_create(conn, 'Bob', '[email protected]')
except RuntimeError:
  pass

count = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
print('Users:', count)  # Should be 1 — Alice only
Expected Output
Users: 1
Hints

Hint 1: Use functools.wraps(func) to preserve metadata.

Hint 2: Wrap the function call in try/except: commit on success, rollback on exception.

Hint 3: Re-raise the exception after rollback so callers can handle it.


#5Savepoints for Partial RollbackMedium
SAVEPOINTROLLBACK TOnested transaction

Use SQL SAVEPOINT to implement partial rollback within a batch transaction, preserving successful sub-operations.

Solution:

def adjust_inventory(conn, adjustments):
results = {}
for item_id, delta in adjustments:
conn.execute("SAVEPOINT adj")
try:
conn.execute(
"UPDATE items SET qty = qty + ? WHERE id = ?",
(delta, item_id)
)
new_qty = conn.execute(
"SELECT qty FROM items WHERE id = ?", (item_id,)
).fetchone()[0]
if new_qty < 0:
raise ValueError("Negative qty")
conn.execute("RELEASE adj")
results[item_id] = (True, new_qty)
except (ValueError, Exception):
conn.execute("ROLLBACK TO adj")
conn.execute("RELEASE adj")
qty = conn.execute(
"SELECT qty FROM items WHERE id = ?", (item_id,)
).fetchone()[0]
results[item_id] = (False, qty)
conn.commit()
return results
import sqlite3

conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, qty INTEGER)")
conn.executemany("INSERT INTO items VALUES (?,?,?)", [
  (1, 'Widget', 100), (2, 'Gadget', 50), (3, 'Doohickey', 75)
])
conn.commit()

# TODO: Adjust inventory for multiple items inside a single transaction.
# Use a savepoint so that if any individual adjustment fails (qty would go negative),
# only THAT item's change is rolled back, not the entire batch.
# Return a dict of {item_id: (success, final_qty)}.

def adjust_inventory(conn, adjustments):
  # adjustments: list of (item_id, delta) where delta can be negative
  pass

result = adjust_inventory(conn, [
  (1, -30),   # OK: 100 - 30 = 70
  (2, -60),   # Fail: 50 - 60 = -10 (not allowed)
  (3, -25),   # OK: 75 - 25 = 50
])
for item_id, (ok, qty) in sorted(result.items()):
  print(f'Item {item_id}: {"ok" if ok else "failed"}, qty={qty}')
Expected Output
Item 1: ok, qty=70
Item 2: failed, qty=50
Item 3: ok, qty=50
Hints

Hint 1: SAVEPOINT sp1 creates a named savepoint within an open transaction.

Hint 2: ROLLBACK TO sp1 rolls back to the savepoint without affecting earlier work.

Hint 3: RELEASE sp1 confirms the savepoint's changes as part of the outer transaction.


#6Optimistic Locking with Version ColumnMedium
optimistic lockingversionCASconcurrent update

Implement optimistic locking using a version column to detect and reject concurrent conflicting updates.

Solution:

def update_document(conn, doc_id, new_content, expected_version):
cur = conn.execute("""
UPDATE documents
SET content = ?, version = version + 1
WHERE id = ? AND version = ?
""", (new_content, doc_id, expected_version))
conn.commit()
return cur.rowcount == 1
import sqlite3
import threading

conn = sqlite3.connect(':memory:', check_same_thread=False)
conn.execute("""
CREATE TABLE documents (
  id INTEGER PRIMARY KEY,
  content TEXT,
  version INTEGER DEFAULT 1
)
""")
conn.execute("INSERT INTO documents VALUES (1, 'Original content', 1)")
conn.commit()

# TODO: Implement optimistic locking.
# update_document reads the current version, makes a change,
# then updates ONLY IF the version hasn't changed.
# Return True if update succeeded, False if there was a conflict.

import threading

def update_document(conn, doc_id, new_content, expected_version):
  pass

# Simulate concurrent updates
lock = threading.Lock()
results = []

def worker(content, version):
  with lock:
      ok = update_document(conn, 1, content, version)
      results.append(ok)

# Both start with version 1 — only one should succeed
t1 = threading.Thread(target=worker, args=('Update A', 1))
t2 = threading.Thread(target=worker, args=('Update B', 1))
t1.start(); t1.join()
t2.start(); t2.join()

print('Results:', results)  # [True, False] — one succeeds, one fails
final = conn.execute("SELECT content, version FROM documents").fetchone()
print('Final:', final)
Expected Output
Results: [True, False]
Final: ('Update A', 2)
Hints

Hint 1: UPDATE ... WHERE id = ? AND version = ? only updates if the version matches.

Hint 2: cursor.rowcount == 0 means another transaction already incremented the version.

Hint 3: Increment version in the SET clause: SET content = ?, version = version + 1.


#7Dead Lock Detection and RetryMedium
deadlockretryOperationalErrorlock timeout

Implement an execute-with-retry wrapper that handles lock contention with exponential backoff.

Solution:

import time

def execute_with_retry(conn, fn, max_retries=5):
last_err = None
for attempt in range(max_retries):
try:
return fn(conn)
except sqlite3.OperationalError as e:
last_err = e
time.sleep(0.01 * (2 ** attempt))
raise last_err
import sqlite3
import threading
import time

# Deadlocks happen when two transactions each wait for the other's lock.
# PostgreSQL raises psycopg2.errors.DeadlockDetected.
# SQLite raises OperationalError: database is locked.

# TODO: Write a function execute_with_retry(conn, fn, max_retries=5)
# that calls fn(conn) and retries on OperationalError up to max_retries times.
# Use exponential backoff. Return the result or raise after exhausting retries.

def execute_with_retry(conn, fn, max_retries=5):
  pass

# Simulate a lock contention scenario
attempt = [0]

def flaky_operation(conn):
  attempt[0] += 1
  if attempt[0] < 4:
      raise sqlite3.OperationalError("database is locked")
  return conn.execute("SELECT 42").fetchone()[0]

conn = sqlite3.connect(':memory:')
result = execute_with_retry(conn, flaky_operation)
print(f'Result: {result} after {attempt[0]} attempts')
Expected Output
Result: 42 after 4 attempts
Hints

Hint 1: Use a for loop: for attempt in range(max_retries).

Hint 2: time.sleep(0.01 * (2 ** attempt)) provides exponential backoff.

Hint 3: Re-raise after the loop if all retries failed.


#8Read-Your-Own-Writes Consistency CheckMedium
read-your-writesisolationsession consistency

Demonstrate read-your-own-writes: a SELECT after an UPDATE within the same transaction reflects the uncommitted change.

Solution:

def reserve_stock(conn, product, qty):
with conn:
row = conn.execute(
"SELECT stock FROM inventory WHERE product = ?", (product,)
).fetchone()
before = row[0]
if before < qty:
raise ValueError(f"Insufficient stock: need {qty}, have {before}")
conn.execute(
"UPDATE inventory SET stock = stock - ? WHERE product = ?",
(qty, product)
)
after = conn.execute(
"SELECT stock FROM inventory WHERE product = ?", (product,)
).fetchone()[0]
return before, after
import sqlite3

conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE inventory (product TEXT PRIMARY KEY, stock INTEGER)")
conn.executemany("INSERT INTO inventory VALUES (?,?)", [
  ('Widget', 50), ('Gadget', 30)
])
conn.commit()

# TODO: Implement reserve_stock(conn, product, qty):
# 1. Check current stock in the SAME transaction
# 2. Deduct qty
# 3. Read back the new stock (read-your-own-writes)
# 4. Return (before, after) stock values
# If stock is insufficient raise ValueError.
# Ensure the read-back reflects the uncommitted write.

def reserve_stock(conn, product, qty):
  pass

before, after = reserve_stock(conn, 'Widget', 20)
print(f'Widget: {before} -> {after}')

try:
  reserve_stock(conn, 'Gadget', 50)
except ValueError as e:
  print(f'Gadget: {e}')
Expected Output
Widget: 50 -> 30
Gadget: Insufficient stock: need 50, have 30
Hints

Hint 1: Within a single connection/transaction, a SELECT after UPDATE reads the uncommitted write.

Hint 2: SQLite's default isolation is 'deferred' but within one connection you see your own writes.

Hint 3: Use a try/except block: raise ValueError before the UPDATE if stock is insufficient.


#9Two-Phase Commit SimulationHard
two-phase commit2PCdistributed transaction

Simulate two-phase commit (2PC) across two database connections — prepare all, then commit or abort all.

Solution:

class TwoPhaseCoordinator:
def __init__(self, connections):
self.connections = connections
for conn in connections:
conn.execute("CREATE TABLE IF NOT EXISTS txn_log (state TEXT)")
conn.commit()

def execute_on_all(self, sql, params=()):
for conn in self.connections:
conn.execute(sql, params)

def prepare_all(self):
try:
for conn in self.connections:
conn.execute("INSERT INTO txn_log VALUES ('prepared')")
return True
except Exception:
return False

def commit_all(self):
for conn in self.connections:
conn.commit()

def abort_all(self):
for conn in self.connections:
conn.rollback()

def run_distributed_txn(coord, operations, simulate_failure=False):
for conn, sql in operations:
conn.execute(sql)
if simulate_failure or not coord.prepare_all():
coord.abort_all()
return False
coord.commit_all()
return True
import sqlite3

# Two-phase commit (2PC) coordinates transactions across multiple databases.
# Phase 1 (Prepare): all participants say "ready"
# Phase 2 (Commit/Abort): coordinator commits or aborts all

# TODO: Simulate 2PC across two SQLite in-memory databases.
# - prepare(conn) writes a 'prepared' record to a log table and returns True/False
# - commit_all(conns) commits all; abort_all(conns) rolls all back
# Demonstrate a successful 2PC and a failed one (one participant fails prepare).

class TwoPhaseCoordinator:
  def __init__(self, connections):
      self.connections = connections
      for conn in connections:
          conn.execute("CREATE TABLE IF NOT EXISTS txn_log (state TEXT)")
          conn.commit()

  def execute_on_all(self, sql, params=()):
      for conn in self.connections:
          conn.execute(sql, params)

  def prepare_all(self):
      pass

  def commit_all(self):
      pass

  def abort_all(self):
      pass

def run_distributed_txn(coord, operations, simulate_failure=False):
  pass

# Test success
db1 = sqlite3.connect(':memory:')
db2 = sqlite3.connect(':memory:')
for db in [db1, db2]:
  db.execute("CREATE TABLE data (val INTEGER)")
  db.commit()

coord = TwoPhaseCoordinator([db1, db2])
success = run_distributed_txn(coord, [
  (db1, "INSERT INTO data VALUES (1)"),
  (db2, "INSERT INTO data VALUES (2)"),
])
print('Success txn:', success)
print('DB1 rows:', db1.execute("SELECT COUNT(*) FROM data").fetchone()[0])
print('DB2 rows:', db2.execute("SELECT COUNT(*) FROM data").fetchone()[0])
Expected Output
Success txn: True
DB1 rows: 1
DB2 rows: 1
Hints

Hint 1: Phase 1: INSERT 'prepared' into txn_log on each participant. Return True if all succeed.

Hint 2: Phase 2 commit: call conn.commit() on all. Phase 2 abort: call conn.rollback() on all.

Hint 3: If any prepare fails, call abort_all() and return False.


#10Event Sourcing with Immutable Transaction LogHard
event sourcingappend-onlyaudit logreplay

Build an event-sourced ledger using an append-only transaction log; derive balance by replaying events.

Solution:

import time

def append_event(conn, account_id, event_type, amount):
conn.execute(
"INSERT INTO account_events (account_id, event_type, amount, ts) VALUES (?,?,?,?)",
(account_id, event_type, amount, time.time())
)
conn.commit()

def get_balance(conn, account_id):
row = conn.execute("""
SELECT SUM(CASE WHEN event_type = 'deposit' THEN amount ELSE -amount END)
FROM account_events
WHERE account_id = ?
""", (account_id,)).fetchone()
return row[0] or 0.0

def get_history(conn, account_id):
cur = conn.execute(
"SELECT event_type, amount, ts FROM account_events WHERE account_id = ? ORDER BY ts",
(account_id,)
)
return cur.fetchall()
import sqlite3
import json
import time

# Event sourcing: never UPDATE/DELETE — only append events.
# Current state is derived by replaying all events.

# TODO: Implement an event-sourced account system.
# - append_event(conn, account_id, event_type, amount): insert event
# - get_balance(conn, account_id): replay events to compute current balance
# - get_history(conn, account_id): return list of (event_type, amount, ts) tuples

def append_event(conn, account_id, event_type, amount):
  pass

def get_balance(conn, account_id):
  pass

def get_history(conn, account_id):
  pass

conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE account_events (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  account_id TEXT,
  event_type TEXT,
  amount REAL,
  ts REAL
)
""")
conn.commit()

append_event(conn, 'ACC1', 'deposit', 1000)
append_event(conn, 'ACC1', 'deposit', 500)
append_event(conn, 'ACC1', 'withdrawal', 200)
append_event(conn, 'ACC2', 'deposit', 750)

print('ACC1 balance:', get_balance(conn, 'ACC1'))
print('ACC2 balance:', get_balance(conn, 'ACC2'))
print('ACC1 events:', len(get_history(conn, 'ACC1')))
Expected Output
ACC1 balance: 1300.0
ACC2 balance: 750.0
ACC1 events: 3
Hints

Hint 1: Only use INSERT — never UPDATE or DELETE on account_events.

Hint 2: For balance: SUM deposits - SUM withdrawals using CASE WHEN.

Hint 3: For history: SELECT event_type, amount, ts ORDER BY ts returns the event log.


#11Distributed Saga PatternHard
sagacompensating transactiondistributedrollback

Implement the Saga pattern with compensating transactions to safely roll back partial distributed operations.

Solution:

def run_saga(conn, steps):
completed = []
try:
for step in steps:
step.action()
completed.append(step)
return True, len(completed), 0
except Exception:
rolled_back = []
for step in reversed(completed):
try:
step.compensate()
rolled_back.append(step.name)
except Exception:
pass
return False, len(completed), len(rolled_back)
import sqlite3

# The Saga pattern decomposes a distributed transaction into local transactions.
# Each step has a compensating transaction to undo it.

# Order placement saga:
# Step 1: Reserve inventory
# Step 2: Charge payment
# Step 3: Create shipment
# If any step fails, run compensating transactions in reverse.

conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE inventory (product TEXT PRIMARY KEY, stock INTEGER);
CREATE TABLE payments (id INTEGER PRIMARY KEY AUTOINCREMENT, order_id TEXT, amount REAL, status TEXT);
CREATE TABLE shipments (id INTEGER PRIMARY KEY AUTOINCREMENT, order_id TEXT, status TEXT);
INSERT INTO inventory VALUES ('Widget', 10);
""")

class SagaStep:
  def __init__(self, name, action, compensate):
      self.name = name
      self.action = action
      self.compensate = compensate

# TODO: Implement run_saga(conn, steps) that:
# - Executes each step in order
# - If any step raises an exception, runs compensating actions in reverse
# - Returns (success, completed_steps, rolled_back_steps)

def run_saga(conn, steps):
  pass

def test_successful_saga():
  order_id = 'ORD-001'
  steps = [
      SagaStep(
          'reserve_inventory',
          lambda: conn.execute("UPDATE inventory SET stock = stock - 1 WHERE product = 'Widget' AND stock > 0"),
          lambda: conn.execute("UPDATE inventory SET stock = stock + 1 WHERE product = 'Widget'")
      ),
      SagaStep(
          'charge_payment',
          lambda: conn.execute("INSERT INTO payments (order_id, amount, status) VALUES (?,?,?)", (order_id, 99.99, 'charged')),
          lambda: conn.execute("UPDATE payments SET status = 'refunded' WHERE order_id = ?", (order_id,))
      ),
      SagaStep(
          'create_shipment',
          lambda: conn.execute("INSERT INTO shipments (order_id, status) VALUES (?,?)", (order_id, 'pending')),
          lambda: conn.execute("UPDATE shipments SET status = 'cancelled' WHERE order_id = ?", (order_id,))
      ),
  ]
  success, done, rolled = run_saga(conn, steps)
  conn.commit()
  return success, done, rolled

ok, done, rolled = test_successful_saga()
print(f'Success: {ok}, Steps: {done}, Rolled back: {rolled}')
stock = conn.execute("SELECT stock FROM inventory").fetchone()[0]
print(f'Remaining stock: {stock}')
Expected Output
Success: True, Steps: 3, Rolled back: 0
Remaining stock: 9
Hints

Hint 1: Track completed_steps as a list; append each step name after it succeeds.

Hint 2: On exception, iterate reversed(completed_steps) and call each step's compensate().

Hint 3: Return (True, len(completed), 0) on success or (False, len(completed), len(rolled)) on failure.

© 2026 EngineersOfAI. All rights reserved.