Skip to main content

Project 02 - Transaction-Safe Payment Service

Objective

Build a payment processing service that performs fund transfers between accounts with full ACID guarantees. The service must handle concurrent requests safely, reject duplicate transfer requests via idempotency keys, recover from deadlocks automatically, and maintain a complete audit trail of every transfer attempt - successful or failed.

This project is the hardest in the module. It demands that you think about what goes wrong when two operations happen at the same time, when a network call is retried, and when the database refuses to cooperate. Correctness under concurrency is the primary success criterion.

Primary database: PostgreSQL via psycopg2. Fallback: SQLite with sqlite3 - if you do not have PostgreSQL available, use SQLite and note where behavior would differ in a comment.

What to Build

Entities

Account

ColumnTypeConstraints
idUUID / TEXTPRIMARY KEY
owner_nameTEXTNOT NULL
balanceNUMERIC(19,4)NOT NULL, DEFAULT 0, CHECK (balance >= 0)
currencyCHAR(3)NOT NULL, DEFAULT 'USD'
created_atTIMESTAMPTZNOT NULL, DEFAULT NOW()
is_activeBOOLEANNOT NULL, DEFAULT TRUE

Transaction (the payment record)

ColumnTypeConstraints
idUUID / TEXTPRIMARY KEY
from_account_idTEXTNOT NULL, FOREIGN KEY → accounts(id)
to_account_idTEXTNOT NULL, FOREIGN KEY → accounts(id)
amountNUMERIC(19,4)NOT NULL, CHECK (amount > 0)
currencyCHAR(3)NOT NULL
statusTEXTNOT NULL, CHECK (status IN ('pending', 'completed', 'failed', 'duplicate'))
idempotency_keyTEXTUNIQUE, NOT NULL
failure_reasonTEXTnullable
created_atTIMESTAMPTZNOT NULL, DEFAULT NOW()

AuditLog

ColumnTypeConstraints
idBIGSERIAL / INTEGERPRIMARY KEY AUTOINCREMENT
event_typeTEXTNOT NULL
account_idTEXTnullable
transaction_idTEXTnullable
detailsTEXTnullable (JSON string)
occurred_atTIMESTAMPTZNOT NULL, DEFAULT NOW()

Entity Relationship Diagram

Operations to Implement

Account Operations

FunctionDescription
create_account(owner_name, initial_balance, currency) -> strCreate a new account. Generate a UUID for the ID. Return the new account ID. Emit an ACCOUNT_CREATED audit event.
get_account(account_id) -> dictReturn account details. Raise AccountNotFoundError if not found.
get_balance(account_id) -> DecimalReturn the current balance. Raise AccountNotFoundError if not found.
deactivate_account(account_id)Set is_active = FALSE. Reject if account has a non-zero balance.

Transfer Operations

FunctionDescription
transfer(from_id, to_id, amount, currency, idempotency_key) -> dictThe core operation - see transfer requirements below.
get_transaction(transaction_id) -> dictReturn a transaction record by ID.
get_account_history(account_id, limit, offset) -> listReturn paginated transaction history for an account (both sent and received).

Technical Requirements

Requirement 1 - ACID Transfer

The transfer function must perform the following steps within a single database transaction:

  1. Lock both account rows using SELECT ... FOR UPDATE (PostgreSQL) or BEGIN IMMEDIATE (SQLite) to prevent concurrent modifications.
  2. Verify both accounts exist and are active.
  3. Verify the sender has sufficient balance.
  4. Verify the currencies match (or handle conversion - see Extension B).
  5. Debit the sender's account.
  6. Credit the receiver's account.
  7. Insert the Transaction record with status = 'completed'.
  8. COMMIT.

If any step fails, the entire transaction must be rolled back and no money must move.

The database CHECK (balance >= 0) constraint is your last line of defense, not your primary check. Perform the balance check in application code first so you can return a meaningful error message. The DB constraint catches races you might miss.

Requirement 2 - Idempotency

Before starting a transfer, check whether the idempotency_key already exists in the transactions table:

  • If found with status = 'completed': return the existing transaction record immediately. Do not charge again.
  • If found with status = 'failed': return the existing record with the failure reason. Do not retry automatically.
  • If found with status = 'pending': another request with this key is in flight. Raise TransferInProgressError.
  • If not found: proceed with the transfer.

The idempotency check and the transfer must be in the same database transaction, protected by a lock on the idempotency_key (use INSERT ... ON CONFLICT DO NOTHING with a subsequent SELECT, or an advisory lock).

Requirement 3 - Deadlock Detection and Retry

When two transfers happen concurrently in opposite directions (A→B and B→A), the database may raise a deadlock error. Your code must catch it and retry with exponential backoff:

  • Max retries: 3
  • Backoff: base_delay * (2 ** attempt) where base_delay = 0.1 seconds
  • Add jitter: multiply the backoff by a random factor between 0.5 and 1.5 to prevent thundering herd
  • After 3 failed attempts, raise MaxRetriesExceededError

PostgreSQL deadlock error code: psycopg2.errors.DeadlockDetected (SQLSTATE 40P01). SQLite does not support true deadlocks, but simulate the retry logic anyway using sqlite3.OperationalError with message "database is locked".

To prevent most deadlocks before they happen: always lock accounts in a consistent order (e.g., lower UUID first). This eliminates the circular wait condition for the most common case.

Requirement 4 - Audit Logging

Every transfer attempt - successful or not - must produce at least one audit log entry. The audit log is written in a separate transaction from the transfer so that a transfer rollback does not erase the audit record.

Minimum audit events:

Event TypeWhen
TRANSFER_INITIATEDAt the start of every transfer call
TRANSFER_COMPLETEDWhen the transfer commits successfully
TRANSFER_FAILEDWhen the transfer rolls back for any reason
DUPLICATE_TRANSFERWhen an idempotency key is reused
ACCOUNT_CREATEDWhen a new account is created
ACCOUNT_DEACTIVATEDWhen an account is deactivated

The details column holds a JSON string with relevant context (amounts, error messages, retry counts).

Requirement 5 - Balance Constraint

Balance must never go negative. Enforce this at both levels:

  1. Application level: check balance >= amount before issuing the UPDATE. Raise InsufficientFundsError with a clear message including the shortfall.
  2. Database level: CHECK (balance >= 0) constraint on the accounts table. Catch CheckViolationError (psycopg2) or sqlite3.IntegrityError and wrap it in InsufficientFundsError.

Requirement 6 - Thread-Safe Connection Handling

The payment service must be safe to call from multiple threads simultaneously:

Option A (recommended for PostgreSQL): Use a connection pool. psycopg2 ships with psycopg2.pool.ThreadedConnectionPool. Acquire a connection at the start of each operation, release it when done (use try/finally).

Option B (SQLite fallback): Use threading.local() to give each thread its own connection. SQLite connections must not be shared across threads.

Do not use a single module-level connection object that is shared across threads.

Transfer Flow

Deadlock Retry Logic

Acceptance Criteria

Your implementation passes when all of the following are true:

  • transfer(A, B, 100) debits A by exactly 100 and credits B by exactly 100 - verified by querying both balances before and after.
  • transfer(A, B, 100) called twice with the same idempotency_key returns the same transaction record both times and A's balance reflects only one debit.
  • transfer(A, B, amount) where amount > A.balance raises InsufficientFundsError and neither balance changes.
  • After a failed transfer, an audit log entry with event_type = 'TRANSFER_FAILED' exists.
  • After a successful transfer, an audit log entry with event_type = 'TRANSFER_COMPLETED' exists.
  • A TRANSFER_FAILED audit entry exists even when the transfer transaction was rolled back.
  • get_account_history(account_id) returns transactions where the account is either sender or receiver.
  • Calling transfer with a deactivated account raises AccountInactiveError.
  • Attempting to set balance to a negative value via direct SQL raises a constraint error.
  • Two concurrent transfers in opposite directions (A→B and B→A) both complete correctly (run with threading.Thread to verify). Total money in the system must be conserved.
  • The deadlock retry function sleeps between retries (mock time.sleep and assert it was called on retry).
  • create_account with initial_balance < 0 raises ValueError before hitting the database.
  • All SQL uses parameterized queries - no f-strings or string concatenation in SQL.
  • Connections are released back to the pool (or closed) even when exceptions occur.

Custom Exception Hierarchy

Define all exceptions in a single exceptions.py file:

class PaymentServiceError(Exception):
"""Base exception for all payment service errors."""

class AccountNotFoundError(PaymentServiceError):
"""Raised when a referenced account does not exist."""

class AccountInactiveError(PaymentServiceError):
"""Raised when a transfer involves a deactivated account."""

class InsufficientFundsError(PaymentServiceError):
"""Raised when the sender's balance is too low."""
def __init__(self, available: float, required: float):
self.available = available
self.required = required
super().__init__(
f"Insufficient funds: have {available:.2f}, need {required:.2f} "
f"(shortfall: {required - available:.2f})"
)

class DuplicateIdempotencyKeyError(PaymentServiceError):
"""Raised when an idempotency key is reused for a different transfer."""

class TransferInProgressError(PaymentServiceError):
"""Raised when a transfer with the same idempotency key is currently pending."""

class CurrencyMismatchError(PaymentServiceError):
"""Raised when sender and receiver accounts use different currencies."""

class MaxRetriesExceededError(PaymentServiceError):
"""Raised when deadlock retry limit is exhausted."""
def __init__(self, attempts: int):
super().__init__(f"Transfer failed after {attempts} attempts due to deadlock.")
payment_service/
├── db/
│ ├── schema.sql
│ └── migrations/
├── exceptions.py
├── models.py # dataclasses or namedtuples for Account, Transaction
├── connection.py # connection pool / thread-local setup
├── dal/
│ ├── __init__.py
│ ├── accounts.py
│ ├── transactions.py
│ └── audit.py
├── service.py # PaymentService class - orchestrates DAL calls
├── tests/
│ ├── test_transfer.py
│ ├── test_idempotency.py
│ └── test_concurrency.py
└── demo.py # runnable demo script

Hints

How do I acquire and release connections safely from a pool?
import psycopg2.pool

pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
dsn="postgresql://user:password@localhost/payments",
)

def get_connection():
return pool.getconn()

def release_connection(conn):
pool.putconn(conn)

# In your service functions:
def transfer(from_id, to_id, amount, currency, idempotency_key):
conn = get_connection()
try:
# ... do work
except Exception:
conn.rollback()
raise
finally:
release_connection(conn)

For SQLite with threading.local():

import threading
import sqlite3

_local = threading.local()

def get_connection(db_path: str) -> sqlite3.Connection:
if not hasattr(_local, "conn") or _local.conn is None:
_local.conn = sqlite3.connect(db_path, check_same_thread=False)
_local.conn.row_factory = sqlite3.Row
_local.conn.execute("PRAGMA foreign_keys = ON")
_local.conn.execute("PRAGMA journal_mode = WAL")
return _local.conn
How do I implement the idempotency check atomically?

The key insight: the idempotency check and the transfer must happen in the same transaction, and you must lock the idempotency key row to prevent two concurrent requests with the same key from both passing the check simultaneously.

PostgreSQL approach - use INSERT ... ON CONFLICT:

def transfer(conn, from_id, to_id, amount, currency, idempotency_key):
with conn.cursor() as cur:
cur.execute("BEGIN")

# Try to insert a 'pending' record for this idempotency key
cur.execute("""
INSERT INTO transactions
(id, from_account_id, to_account_id, amount, currency,
status, idempotency_key)
VALUES (%s, %s, %s, %s, %s, 'pending', %s)
ON CONFLICT (idempotency_key) DO NOTHING
RETURNING id
""", (new_uuid(), from_id, to_id, amount, currency, idempotency_key))

result = cur.fetchone()
if result is None:
# Key already existed - fetch the existing record
cur.execute(
"SELECT * FROM transactions WHERE idempotency_key = %s FOR UPDATE",
(idempotency_key,),
)
existing = cur.fetchone()
conn.commit()
return dict(existing) # return original result

tx_id = result["id"]
# ... proceed with the actual transfer using tx_id
How do I implement deadlock retry with exponential backoff?
import time
import random
import psycopg2

MAX_RETRIES = 3
BASE_DELAY = 0.1 # seconds

def with_deadlock_retry(fn, *args, **kwargs):
"""
Call fn(*args, **kwargs) and retry up to MAX_RETRIES times
if a deadlock is detected.
"""
for attempt in range(MAX_RETRIES):
try:
return fn(*args, **kwargs)
except psycopg2.errors.DeadlockDetected:
if attempt == MAX_RETRIES - 1:
raise MaxRetriesExceededError(MAX_RETRIES)
jitter = random.uniform(0.5, 1.5)
delay = BASE_DELAY * (2 ** attempt) * jitter
time.sleep(delay)

# Usage:
result = with_deadlock_retry(transfer_inner, conn, from_id, to_id, amount, key)

For SQLite, catch sqlite3.OperationalError with "database is locked" in the message:

except sqlite3.OperationalError as e:
if "database is locked" not in str(e):
raise
# treat as deadlock and retry
How do I lock rows in the correct order to prevent deadlocks?

The most common cause of deadlocks in payment systems is two concurrent transfers locking the same two rows in opposite order:

  • Transfer 1 (A→B): locks row A, then tries to lock row B.
  • Transfer 2 (B→A): locks row B, then tries to lock row A.

Both are now waiting for each other - deadlock.

Solution: always lock in a consistent, deterministic order. Sort the account IDs before locking:

def lock_accounts_in_order(cur, id_1: str, id_2: str):
"""Lock both accounts in a consistent order to prevent deadlocks."""
ordered = sorted([id_1, id_2])
cur.execute(
"SELECT id, balance, is_active FROM accounts WHERE id = ANY(%s) FOR UPDATE",
(ordered,),
)
rows = {row["id"]: row for row in cur.fetchall()}
return rows[id_1], rows[id_2]

This does not eliminate deadlocks entirely (other processes may lock in different ways), but it eliminates the most common circular-wait pattern. Retry logic handles the rest.

How do I write the audit log in a separate transaction so it survives a rollback?

The trick is to use a separate connection for the audit log, so that rolling back the main transfer transaction does not affect the audit write.

def log_audit_event(audit_conn, event_type, account_id=None,
transaction_id=None, details=None):
import json
with audit_conn:
audit_conn.execute(
"""
INSERT INTO audit_log (event_type, account_id, transaction_id, details)
VALUES (?, ?, ?, ?)
""",
(event_type, account_id, transaction_id,
json.dumps(details) if details else None),
)

def transfer(transfer_conn, audit_conn, from_id, to_id, amount, key):
log_audit_event(audit_conn, "TRANSFER_INITIATED",
account_id=from_id,
details={"to": to_id, "amount": str(amount), "key": key})
try:
result = _do_transfer(transfer_conn, from_id, to_id, amount, key)
log_audit_event(audit_conn, "TRANSFER_COMPLETED",
transaction_id=result["id"],
details={"amount": str(amount)})
return result
except Exception as e:
log_audit_event(audit_conn, "TRANSFER_FAILED",
account_id=from_id,
details={"error": str(e), "amount": str(amount)})
raise
How do I write a concurrency test to verify money is conserved?
import threading
from decimal import Decimal

def test_concurrent_transfers_conserve_money(service):
# Create two accounts with known balances
a_id = service.create_account("Alice", Decimal("500.00"), "USD")
b_id = service.create_account("Bob", Decimal("500.00"), "USD")
total_before = Decimal("1000.00")

errors = []
def transfer_ab(key):
try:
service.transfer(a_id, b_id, Decimal("10.00"), "USD", key)
except Exception as e:
errors.append(e)

def transfer_ba(key):
try:
service.transfer(b_id, a_id, Decimal("10.00"), "USD", key)
except Exception as e:
errors.append(e)

threads = []
for i in range(20):
threads.append(threading.Thread(target=transfer_ab, args=(f"ab-{i}",)))
threads.append(threading.Thread(target=transfer_ba, args=(f"ba-{i}",)))

for t in threads:
t.start()
for t in threads:
t.join()

balance_a = service.get_balance(a_id)
balance_b = service.get_balance(b_id)
total_after = balance_a + balance_b

assert total_after == total_before, (
f"Money not conserved! Before: {total_before}, After: {total_after}"
)
print(f"All transfers completed. Errors: {len(errors)}")
print(f"Final balances - Alice: {balance_a}, Bob: {balance_b}")

Extension Challenges

These are optional. Attempt them only after all acceptance criteria pass.

Extension A - Batch Transfers

Add a batch_transfer function that accepts a list of (from_id, to_id, amount, idempotency_key) tuples and processes them as a single database transaction:

  • All transfers in the batch succeed, or none do.
  • The function returns a list of transaction records in the same order as the input.
  • Partial failures (e.g., one sender has insufficient funds) cause the entire batch to roll back.
  • Each individual transfer in the batch still generates its own audit log entry.

Signature:

def batch_transfer(transfers: list[dict]) -> list[dict]:
"""
transfers: list of dicts, each with keys:
from_account_id, to_account_id, amount, currency, idempotency_key
Returns: list of transaction dicts in the same order.
Raises: BatchTransferError if any transfer fails (includes index of first failure).
"""

Extension B - Currency Conversion

Allow transfers between accounts with different currencies:

  • Add an exchange_rates table: (from_currency CHAR(3), to_currency CHAR(3), rate NUMERIC(19,8), updated_at TIMESTAMPTZ).
  • A transfer from a USD account to a EUR account converts the amount at the stored rate and records both amount_sent (in sender's currency) and amount_received (in receiver's currency) on the transaction.
  • Rate updates must not affect in-flight transactions (take a snapshot of the rate at the start of the transaction).
  • Add a get_rate(from_currency, to_currency) function that raises ExchangeRateNotFoundError if the pair is unknown.

Extension C - Scheduled Transfers

Add a scheduled_transfers table:

ColumnType
idUUID PRIMARY KEY
from_account_idTEXT REFERENCES accounts(id)
to_account_idTEXT REFERENCES accounts(id)
amountNUMERIC(19,4)
currencyCHAR(3)
schedule_atTIMESTAMPTZ
statusTEXT CHECK (status IN ('scheduled', 'executed', 'cancelled', 'failed'))
created_atTIMESTAMPTZ DEFAULT NOW()

Write a process_due_transfers() function that:

  1. Selects all scheduled transfers where schedule_at <= NOW() and status = 'scheduled'.
  2. Locks each selected row with SELECT ... FOR UPDATE SKIP LOCKED so concurrent workers don't double-process.
  3. Calls transfer() for each (generating a unique idempotency key from the scheduled transfer's ID).
  4. Updates the status to 'executed' or 'failed' accordingly.
  5. Can be called from a cron job or a background thread every 60 seconds.
© 2026 EngineersOfAI. All rights reserved.