Project 02 - Transaction-Safe Payment Service
Objective
Build a payment processing service that performs fund transfers between accounts with full ACID guarantees. The service must handle concurrent requests safely, reject duplicate transfer requests via idempotency keys, recover from deadlocks automatically, and maintain a complete audit trail of every transfer attempt - successful or failed.
This project is the hardest in the module. It demands that you think about what goes wrong when two operations happen at the same time, when a network call is retried, and when the database refuses to cooperate. Correctness under concurrency is the primary success criterion.
Primary database: PostgreSQL via psycopg2.
Fallback: SQLite with sqlite3 - if you do not have PostgreSQL available, use SQLite and note where behavior would differ in a comment.
What to Build
Entities
Account
| Column | Type | Constraints |
|---|---|---|
id | UUID / TEXT | PRIMARY KEY |
owner_name | TEXT | NOT NULL |
balance | NUMERIC(19,4) | NOT NULL, DEFAULT 0, CHECK (balance >= 0) |
currency | CHAR(3) | NOT NULL, DEFAULT 'USD' |
created_at | TIMESTAMPTZ | NOT NULL, DEFAULT NOW() |
is_active | BOOLEAN | NOT NULL, DEFAULT TRUE |
Transaction (the payment record)
| Column | Type | Constraints |
|---|---|---|
id | UUID / TEXT | PRIMARY KEY |
from_account_id | TEXT | NOT NULL, FOREIGN KEY → accounts(id) |
to_account_id | TEXT | NOT NULL, FOREIGN KEY → accounts(id) |
amount | NUMERIC(19,4) | NOT NULL, CHECK (amount > 0) |
currency | CHAR(3) | NOT NULL |
status | TEXT | NOT NULL, CHECK (status IN ('pending', 'completed', 'failed', 'duplicate')) |
idempotency_key | TEXT | UNIQUE, NOT NULL |
failure_reason | TEXT | nullable |
created_at | TIMESTAMPTZ | NOT NULL, DEFAULT NOW() |
AuditLog
| Column | Type | Constraints |
|---|---|---|
id | BIGSERIAL / INTEGER | PRIMARY KEY AUTOINCREMENT |
event_type | TEXT | NOT NULL |
account_id | TEXT | nullable |
transaction_id | TEXT | nullable |
details | TEXT | nullable (JSON string) |
occurred_at | TIMESTAMPTZ | NOT NULL, DEFAULT NOW() |
Entity Relationship Diagram
Operations to Implement
Account Operations
| Function | Description |
|---|---|
create_account(owner_name, initial_balance, currency) -> str | Create a new account. Generate a UUID for the ID. Return the new account ID. Emit an ACCOUNT_CREATED audit event. |
get_account(account_id) -> dict | Return account details. Raise AccountNotFoundError if not found. |
get_balance(account_id) -> Decimal | Return the current balance. Raise AccountNotFoundError if not found. |
deactivate_account(account_id) | Set is_active = FALSE. Reject if account has a non-zero balance. |
Transfer Operations
| Function | Description |
|---|---|
transfer(from_id, to_id, amount, currency, idempotency_key) -> dict | The core operation - see transfer requirements below. |
get_transaction(transaction_id) -> dict | Return a transaction record by ID. |
get_account_history(account_id, limit, offset) -> list | Return paginated transaction history for an account (both sent and received). |
Technical Requirements
Requirement 1 - ACID Transfer
The transfer function must perform the following steps within a single database transaction:
- Lock both account rows using
SELECT ... FOR UPDATE(PostgreSQL) orBEGIN IMMEDIATE(SQLite) to prevent concurrent modifications. - Verify both accounts exist and are active.
- Verify the sender has sufficient balance.
- Verify the currencies match (or handle conversion - see Extension B).
- Debit the sender's account.
- Credit the receiver's account.
- Insert the
Transactionrecord withstatus = 'completed'. COMMIT.
If any step fails, the entire transaction must be rolled back and no money must move.
The database CHECK (balance >= 0) constraint is your last line of defense, not your primary check. Perform the balance check in application code first so you can return a meaningful error message. The DB constraint catches races you might miss.
Requirement 2 - Idempotency
Before starting a transfer, check whether the idempotency_key already exists in the transactions table:
- If found with
status = 'completed': return the existing transaction record immediately. Do not charge again. - If found with
status = 'failed': return the existing record with the failure reason. Do not retry automatically. - If found with
status = 'pending': another request with this key is in flight. RaiseTransferInProgressError. - If not found: proceed with the transfer.
The idempotency check and the transfer must be in the same database transaction, protected by a lock on the idempotency_key (use INSERT ... ON CONFLICT DO NOTHING with a subsequent SELECT, or an advisory lock).
Requirement 3 - Deadlock Detection and Retry
When two transfers happen concurrently in opposite directions (A→B and B→A), the database may raise a deadlock error. Your code must catch it and retry with exponential backoff:
- Max retries: 3
- Backoff:
base_delay * (2 ** attempt)wherebase_delay = 0.1seconds - Add jitter: multiply the backoff by a random factor between 0.5 and 1.5 to prevent thundering herd
- After 3 failed attempts, raise
MaxRetriesExceededError
PostgreSQL deadlock error code: psycopg2.errors.DeadlockDetected (SQLSTATE 40P01).
SQLite does not support true deadlocks, but simulate the retry logic anyway using sqlite3.OperationalError with message "database is locked".
To prevent most deadlocks before they happen: always lock accounts in a consistent order (e.g., lower UUID first). This eliminates the circular wait condition for the most common case.
Requirement 4 - Audit Logging
Every transfer attempt - successful or not - must produce at least one audit log entry. The audit log is written in a separate transaction from the transfer so that a transfer rollback does not erase the audit record.
Minimum audit events:
| Event Type | When |
|---|---|
TRANSFER_INITIATED | At the start of every transfer call |
TRANSFER_COMPLETED | When the transfer commits successfully |
TRANSFER_FAILED | When the transfer rolls back for any reason |
DUPLICATE_TRANSFER | When an idempotency key is reused |
ACCOUNT_CREATED | When a new account is created |
ACCOUNT_DEACTIVATED | When an account is deactivated |
The details column holds a JSON string with relevant context (amounts, error messages, retry counts).
Requirement 5 - Balance Constraint
Balance must never go negative. Enforce this at both levels:
- Application level: check
balance >= amountbefore issuing theUPDATE. RaiseInsufficientFundsErrorwith a clear message including the shortfall. - Database level:
CHECK (balance >= 0)constraint on theaccountstable. CatchCheckViolationError(psycopg2) orsqlite3.IntegrityErrorand wrap it inInsufficientFundsError.
Requirement 6 - Thread-Safe Connection Handling
The payment service must be safe to call from multiple threads simultaneously:
Option A (recommended for PostgreSQL): Use a connection pool. psycopg2 ships with psycopg2.pool.ThreadedConnectionPool. Acquire a connection at the start of each operation, release it when done (use try/finally).
Option B (SQLite fallback): Use threading.local() to give each thread its own connection. SQLite connections must not be shared across threads.
Do not use a single module-level connection object that is shared across threads.
Transfer Flow
Deadlock Retry Logic
Acceptance Criteria
Your implementation passes when all of the following are true:
-
transfer(A, B, 100)debits A by exactly 100 and credits B by exactly 100 - verified by querying both balances before and after. -
transfer(A, B, 100)called twice with the sameidempotency_keyreturns the same transaction record both times and A's balance reflects only one debit. -
transfer(A, B, amount)whereamount > A.balanceraisesInsufficientFundsErrorand neither balance changes. - After a failed transfer, an audit log entry with
event_type = 'TRANSFER_FAILED'exists. - After a successful transfer, an audit log entry with
event_type = 'TRANSFER_COMPLETED'exists. - A
TRANSFER_FAILEDaudit entry exists even when the transfer transaction was rolled back. -
get_account_history(account_id)returns transactions where the account is either sender or receiver. - Calling
transferwith a deactivated account raisesAccountInactiveError. - Attempting to set
balanceto a negative value via direct SQL raises a constraint error. - Two concurrent transfers in opposite directions (A→B and B→A) both complete correctly (run with
threading.Threadto verify). Total money in the system must be conserved. - The deadlock retry function sleeps between retries (mock
time.sleepand assert it was called on retry). -
create_accountwithinitial_balance < 0raisesValueErrorbefore hitting the database. - All SQL uses parameterized queries - no f-strings or string concatenation in SQL.
- Connections are released back to the pool (or closed) even when exceptions occur.
Custom Exception Hierarchy
Define all exceptions in a single exceptions.py file:
class PaymentServiceError(Exception):
"""Base exception for all payment service errors."""
class AccountNotFoundError(PaymentServiceError):
"""Raised when a referenced account does not exist."""
class AccountInactiveError(PaymentServiceError):
"""Raised when a transfer involves a deactivated account."""
class InsufficientFundsError(PaymentServiceError):
"""Raised when the sender's balance is too low."""
def __init__(self, available: float, required: float):
self.available = available
self.required = required
super().__init__(
f"Insufficient funds: have {available:.2f}, need {required:.2f} "
f"(shortfall: {required - available:.2f})"
)
class DuplicateIdempotencyKeyError(PaymentServiceError):
"""Raised when an idempotency key is reused for a different transfer."""
class TransferInProgressError(PaymentServiceError):
"""Raised when a transfer with the same idempotency key is currently pending."""
class CurrencyMismatchError(PaymentServiceError):
"""Raised when sender and receiver accounts use different currencies."""
class MaxRetriesExceededError(PaymentServiceError):
"""Raised when deadlock retry limit is exhausted."""
def __init__(self, attempts: int):
super().__init__(f"Transfer failed after {attempts} attempts due to deadlock.")
Recommended Project Structure
payment_service/
├── db/
│ ├── schema.sql
│ └── migrations/
├── exceptions.py
├── models.py # dataclasses or namedtuples for Account, Transaction
├── connection.py # connection pool / thread-local setup
├── dal/
│ ├── __init__.py
│ ├── accounts.py
│ ├── transactions.py
│ └── audit.py
├── service.py # PaymentService class - orchestrates DAL calls
├── tests/
│ ├── test_transfer.py
│ ├── test_idempotency.py
│ └── test_concurrency.py
└── demo.py # runnable demo script
Hints
How do I acquire and release connections safely from a pool?
import psycopg2.pool
pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
dsn="postgresql://user:password@localhost/payments",
)
def get_connection():
return pool.getconn()
def release_connection(conn):
pool.putconn(conn)
# In your service functions:
def transfer(from_id, to_id, amount, currency, idempotency_key):
conn = get_connection()
try:
# ... do work
except Exception:
conn.rollback()
raise
finally:
release_connection(conn)
For SQLite with threading.local():
import threading
import sqlite3
_local = threading.local()
def get_connection(db_path: str) -> sqlite3.Connection:
if not hasattr(_local, "conn") or _local.conn is None:
_local.conn = sqlite3.connect(db_path, check_same_thread=False)
_local.conn.row_factory = sqlite3.Row
_local.conn.execute("PRAGMA foreign_keys = ON")
_local.conn.execute("PRAGMA journal_mode = WAL")
return _local.conn
How do I implement the idempotency check atomically?
The key insight: the idempotency check and the transfer must happen in the same transaction, and you must lock the idempotency key row to prevent two concurrent requests with the same key from both passing the check simultaneously.
PostgreSQL approach - use INSERT ... ON CONFLICT:
def transfer(conn, from_id, to_id, amount, currency, idempotency_key):
with conn.cursor() as cur:
cur.execute("BEGIN")
# Try to insert a 'pending' record for this idempotency key
cur.execute("""
INSERT INTO transactions
(id, from_account_id, to_account_id, amount, currency,
status, idempotency_key)
VALUES (%s, %s, %s, %s, %s, 'pending', %s)
ON CONFLICT (idempotency_key) DO NOTHING
RETURNING id
""", (new_uuid(), from_id, to_id, amount, currency, idempotency_key))
result = cur.fetchone()
if result is None:
# Key already existed - fetch the existing record
cur.execute(
"SELECT * FROM transactions WHERE idempotency_key = %s FOR UPDATE",
(idempotency_key,),
)
existing = cur.fetchone()
conn.commit()
return dict(existing) # return original result
tx_id = result["id"]
# ... proceed with the actual transfer using tx_id
How do I implement deadlock retry with exponential backoff?
import time
import random
import psycopg2
MAX_RETRIES = 3
BASE_DELAY = 0.1 # seconds
def with_deadlock_retry(fn, *args, **kwargs):
"""
Call fn(*args, **kwargs) and retry up to MAX_RETRIES times
if a deadlock is detected.
"""
for attempt in range(MAX_RETRIES):
try:
return fn(*args, **kwargs)
except psycopg2.errors.DeadlockDetected:
if attempt == MAX_RETRIES - 1:
raise MaxRetriesExceededError(MAX_RETRIES)
jitter = random.uniform(0.5, 1.5)
delay = BASE_DELAY * (2 ** attempt) * jitter
time.sleep(delay)
# Usage:
result = with_deadlock_retry(transfer_inner, conn, from_id, to_id, amount, key)
For SQLite, catch sqlite3.OperationalError with "database is locked" in the message:
except sqlite3.OperationalError as e:
if "database is locked" not in str(e):
raise
# treat as deadlock and retry
How do I lock rows in the correct order to prevent deadlocks?
The most common cause of deadlocks in payment systems is two concurrent transfers locking the same two rows in opposite order:
- Transfer 1 (A→B): locks row A, then tries to lock row B.
- Transfer 2 (B→A): locks row B, then tries to lock row A.
Both are now waiting for each other - deadlock.
Solution: always lock in a consistent, deterministic order. Sort the account IDs before locking:
def lock_accounts_in_order(cur, id_1: str, id_2: str):
"""Lock both accounts in a consistent order to prevent deadlocks."""
ordered = sorted([id_1, id_2])
cur.execute(
"SELECT id, balance, is_active FROM accounts WHERE id = ANY(%s) FOR UPDATE",
(ordered,),
)
rows = {row["id"]: row for row in cur.fetchall()}
return rows[id_1], rows[id_2]
This does not eliminate deadlocks entirely (other processes may lock in different ways), but it eliminates the most common circular-wait pattern. Retry logic handles the rest.
How do I write the audit log in a separate transaction so it survives a rollback?
The trick is to use a separate connection for the audit log, so that rolling back the main transfer transaction does not affect the audit write.
def log_audit_event(audit_conn, event_type, account_id=None,
transaction_id=None, details=None):
import json
with audit_conn:
audit_conn.execute(
"""
INSERT INTO audit_log (event_type, account_id, transaction_id, details)
VALUES (?, ?, ?, ?)
""",
(event_type, account_id, transaction_id,
json.dumps(details) if details else None),
)
def transfer(transfer_conn, audit_conn, from_id, to_id, amount, key):
log_audit_event(audit_conn, "TRANSFER_INITIATED",
account_id=from_id,
details={"to": to_id, "amount": str(amount), "key": key})
try:
result = _do_transfer(transfer_conn, from_id, to_id, amount, key)
log_audit_event(audit_conn, "TRANSFER_COMPLETED",
transaction_id=result["id"],
details={"amount": str(amount)})
return result
except Exception as e:
log_audit_event(audit_conn, "TRANSFER_FAILED",
account_id=from_id,
details={"error": str(e), "amount": str(amount)})
raise
How do I write a concurrency test to verify money is conserved?
import threading
from decimal import Decimal
def test_concurrent_transfers_conserve_money(service):
# Create two accounts with known balances
a_id = service.create_account("Alice", Decimal("500.00"), "USD")
b_id = service.create_account("Bob", Decimal("500.00"), "USD")
total_before = Decimal("1000.00")
errors = []
def transfer_ab(key):
try:
service.transfer(a_id, b_id, Decimal("10.00"), "USD", key)
except Exception as e:
errors.append(e)
def transfer_ba(key):
try:
service.transfer(b_id, a_id, Decimal("10.00"), "USD", key)
except Exception as e:
errors.append(e)
threads = []
for i in range(20):
threads.append(threading.Thread(target=transfer_ab, args=(f"ab-{i}",)))
threads.append(threading.Thread(target=transfer_ba, args=(f"ba-{i}",)))
for t in threads:
t.start()
for t in threads:
t.join()
balance_a = service.get_balance(a_id)
balance_b = service.get_balance(b_id)
total_after = balance_a + balance_b
assert total_after == total_before, (
f"Money not conserved! Before: {total_before}, After: {total_after}"
)
print(f"All transfers completed. Errors: {len(errors)}")
print(f"Final balances - Alice: {balance_a}, Bob: {balance_b}")
Extension Challenges
These are optional. Attempt them only after all acceptance criteria pass.
Extension A - Batch Transfers
Add a batch_transfer function that accepts a list of (from_id, to_id, amount, idempotency_key) tuples and processes them as a single database transaction:
- All transfers in the batch succeed, or none do.
- The function returns a list of transaction records in the same order as the input.
- Partial failures (e.g., one sender has insufficient funds) cause the entire batch to roll back.
- Each individual transfer in the batch still generates its own audit log entry.
Signature:
def batch_transfer(transfers: list[dict]) -> list[dict]:
"""
transfers: list of dicts, each with keys:
from_account_id, to_account_id, amount, currency, idempotency_key
Returns: list of transaction dicts in the same order.
Raises: BatchTransferError if any transfer fails (includes index of first failure).
"""
Extension B - Currency Conversion
Allow transfers between accounts with different currencies:
- Add an
exchange_ratestable:(from_currency CHAR(3), to_currency CHAR(3), rate NUMERIC(19,8), updated_at TIMESTAMPTZ). - A transfer from a USD account to a EUR account converts the amount at the stored rate and records both
amount_sent(in sender's currency) andamount_received(in receiver's currency) on the transaction. - Rate updates must not affect in-flight transactions (take a snapshot of the rate at the start of the transaction).
- Add a
get_rate(from_currency, to_currency)function that raisesExchangeRateNotFoundErrorif the pair is unknown.
Extension C - Scheduled Transfers
Add a scheduled_transfers table:
| Column | Type |
|---|---|
id | UUID PRIMARY KEY |
from_account_id | TEXT REFERENCES accounts(id) |
to_account_id | TEXT REFERENCES accounts(id) |
amount | NUMERIC(19,4) |
currency | CHAR(3) |
schedule_at | TIMESTAMPTZ |
status | TEXT CHECK (status IN ('scheduled', 'executed', 'cancelled', 'failed')) |
created_at | TIMESTAMPTZ DEFAULT NOW() |
Write a process_due_transfers() function that:
- Selects all scheduled transfers where
schedule_at <= NOW()andstatus = 'scheduled'. - Locks each selected row with
SELECT ... FOR UPDATE SKIP LOCKEDso concurrent workers don't double-process. - Calls
transfer()for each (generating a unique idempotency key from the scheduled transfer's ID). - Updates the status to
'executed'or'failed'accordingly. - Can be called from a cron job or a background thread every 60 seconds.
