Skip to main content

Python ORM with SQLAlchemy Practice Problems & Exercises

Practice: ORM with SQLAlchemy

11 problems4 Easy4 Medium3 Hard60–80 min
← Back to lesson

:::note Environment These problems use SQLAlchemy with an in-memory SQLite backend. Install with: pip install sqlalchemy :::


#1Declare a Model and Create TablesEasy
DeclarativeBaseColumncreate_all

Define a SQLAlchemy model, create the table, insert rows, and query them back.

Solution:

from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase, Session

class Base(DeclarativeBase):
pass

class Product(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
price = Column(Float, nullable=False)
in_stock = Column(Integer, default=0)

def create_and_insert():
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as session:
session.add_all([
Product(name='Widget', price=9.99),
Product(name='Gadget', price=49.99),
Product(name='Doohickey', price=24.99),
])
session.commit()

with Session(engine) as session:
products = session.query(Product).order_by(Product.id).all()
return [(p.id, p.name, p.price) for p in products]
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase, Session

class Base(DeclarativeBase):
  pass

# TODO: Define a Product model with columns:
# id (Integer, primary key), name (String), price (Float), in_stock (Integer, default=0)
# Create the table, insert 3 products, and return all of them.

class Product(Base):
  __tablename__ = 'products'
  # Add your columns here

def create_and_insert():
  engine = create_engine('sqlite:///:memory:')
  Base.metadata.create_all(engine)

  with Session(engine) as session:
      # Insert 3 products
      pass

  with Session(engine) as session:
      # Return all products as list of (id, name, price)
      pass

print(create_and_insert())
Expected Output
[(1, 'Widget', 9.99), (2, 'Gadget', 49.99), (3, 'Doohickey', 24.99)]
Hints

Hint 1: class Product(Base): __tablename__ = 'products' defines the table.

Hint 2: session.add(obj) stages; session.commit() saves to DB.

Hint 3: session.query(Product).all() returns all Product objects.


#2Query with filter() and filter_by()Easy
filterfilter_byquerywhere

Use filter() with multiple conditions to query employees by department and salary.

Solution:

from sqlalchemy import desc

def high_paid_engineers(session):
employees = (
session.query(Employee)
.filter(Employee.department == 'Engineering', Employee.salary > 90000)
.order_by(desc(Employee.salary))
.all()
)
return [e.name for e in employees]
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase, Session

class Base(DeclarativeBase):
  pass

class Employee(Base):
  __tablename__ = 'employees'
  id = Column(Integer, primary_key=True)
  name = Column(String)
  department = Column(String)
  salary = Column(Float)

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as s:
  s.add_all([
      Employee(name='Alice', department='Engineering', salary=95000),
      Employee(name='Bob', department='Marketing', salary=72000),
      Employee(name='Carol', department='Engineering', salary=105000),
      Employee(name='Dave', department='HR', salary=68000),
      Employee(name='Eve', department='Engineering', salary=88000),
  ])
  s.commit()

# TODO: Return names of engineers earning more than 90000,
# sorted by salary descending.

def high_paid_engineers(session):
  pass

with Session(engine) as session:
  print(high_paid_engineers(session))
Expected Output
['Carol', 'Alice']
Hints

Hint 1: session.query(Employee).filter(Employee.department == 'Engineering', Employee.salary > 90000)

Hint 2: .order_by(Employee.salary.desc()) sorts descending.

Hint 3: Alternatively use .filter_by(department='Engineering') for simple equality checks.


#3Session Add, Update, DeleteEasy
addupdatedeletesession

Perform INSERT, UPDATE, and DELETE operations through the SQLAlchemy session.

Solution:

def manage_tags(engine):
with Session(engine) as session:
session.add_all([
Tag(name='python', color='blue'),
Tag(name='sql', color='green'),
Tag(name='devops', color='orange'),
])
session.commit()

sql_tag = session.query(Tag).filter_by(name='sql').one()
sql_tag.color = 'cyan'

devops_tag = session.query(Tag).filter_by(name='devops').one()
session.delete(devops_tag)

session.commit()

tags = session.query(Tag).order_by(Tag.name).all()
return [(t.name, t.color) for t in tags]
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import DeclarativeBase, Session

class Base(DeclarativeBase):
  pass

class Tag(Base):
  __tablename__ = 'tags'
  id = Column(Integer, primary_key=True)
  name = Column(String, unique=True)
  color = Column(String, default='grey')

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

# TODO:
# 1. Insert tags: 'python' (blue), 'sql' (green), 'devops' (orange)
# 2. Update 'sql' tag color to 'cyan'
# 3. Delete 'devops' tag
# Return remaining tags as list of (name, color) sorted by name.

def manage_tags(engine):
  pass

print(manage_tags(engine))
Expected Output
[('python', 'blue'), ('sql', 'cyan')]
Hints

Hint 1: session.add_all([...]) inserts multiple objects.

Hint 2: Fetch the object, modify its attribute, then session.commit() — SQLAlchemy detects the change.

Hint 3: session.delete(obj) marks for deletion; session.commit() executes it.


#4Count, Sum, Avg with funcEasy
funcaggregatecountsumavg

Use SQLAlchemy's func module to compute COUNT, SUM, and AVG aggregates in a single query.

Solution:

def shipped_stats(session):
count, total, avg = session.query(
func.count(Order.id),
func.sum(Order.total),
func.avg(Order.total)
).filter(Order.status == 'shipped').one()
return {'count': count, 'total': total, 'avg': round(avg, 2)}
from sqlalchemy import create_engine, Column, Integer, String, Float, func
from sqlalchemy.orm import DeclarativeBase, Session

class Base(DeclarativeBase):
  pass

class Order(Base):
  __tablename__ = 'orders'
  id = Column(Integer, primary_key=True)
  customer = Column(String)
  total = Column(Float)
  status = Column(String)

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as s:
  s.add_all([
      Order(customer='Alice', total=250.0, status='shipped'),
      Order(customer='Bob', total=125.5, status='pending'),
      Order(customer='Alice', total=75.0, status='shipped'),
      Order(customer='Carol', total=400.0, status='shipped'),
      Order(customer='Bob', total=320.0, status='shipped'),
  ])
  s.commit()

# TODO: Return a dict with:
# - 'count': total number of shipped orders
# - 'total': sum of shipped order totals
# - 'avg': average shipped order total (rounded to 2 decimals)

def shipped_stats(session):
  pass

with Session(engine) as session:
  print(shipped_stats(session))
Expected Output
{'count': 4, 'total': 1045.0, 'avg': 261.25}
Hints

Hint 1: session.query(func.count(Order.id), func.sum(Order.total), func.avg(Order.total)).filter(...).one()

Hint 2: filter(Order.status == 'shipped') restricts to shipped orders.

Hint 3: round(avg, 2) rounds to 2 decimal places.


#5One-to-Many RelationshipMedium
relationshipForeignKeyone-to-manybackref

Define a one-to-many relationship between Author and Book, then navigate it to list each author's books.

Solution:

from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Session, relationship

class Base(DeclarativeBase):
pass

class Author(Base):
__tablename__ = 'authors'
id = Column(Integer, primary_key=True)
name = Column(String)
books = relationship('Book', back_populates='author', order_by='Book.title')

class Book(Base):
__tablename__ = 'books'
id = Column(Integer, primary_key=True)
title = Column(String)
price = Column(Float)
author_id = Column(Integer, ForeignKey('authors.id'))
author = relationship('Author', back_populates='books')

def authors_with_books(engine):
Base.metadata.create_all(engine)
with Session(engine) as session:
alice = Author(name='Alice Author', books=[
Book(title='Python Cookbook', price=39.99),
Book(title='Data Science Handbook', price=49.99),
])
bob = Author(name='Bob Writer', books=[
Book(title='SQL Mastery', price=29.99),
])
session.add_all([alice, bob])
session.commit()

with Session(engine) as session:
authors = session.query(Author).order_by(Author.name).all()
return [(a.name, [b.title for b in a.books]) for a in authors]
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Session, relationship

class Base(DeclarativeBase):
  pass

class Author(Base):
  __tablename__ = 'authors'
  id = Column(Integer, primary_key=True)
  name = Column(String)
  # TODO: add relationship to books

class Book(Base):
  __tablename__ = 'books'
  id = Column(Integer, primary_key=True)
  title = Column(String)
  price = Column(Float)
  author_id = Column(Integer, ForeignKey('authors.id'))
  # TODO: add back-reference to author

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

# TODO: Insert authors and their books, then return each author's name
# and a list of their book titles, sorted by author name.

def authors_with_books(engine):
  pass

print(authors_with_books(engine))
Expected Output
[('Alice Author', ['Data Science Handbook', 'Python Cookbook']), ('Bob Writer', ['SQL Mastery'])]
Hints

Hint 1: Author.books = relationship('Book', back_populates='author') on Author.

Hint 2: Book.author = relationship('Author', back_populates='books') on Book.

Hint 3: author.books returns the list of Book objects for that author.


#6Eager Loading with joinedloadMedium
joinedloadlazy loadingN+1eager loading

Use joinedload to eagerly fetch related records and avoid the N+1 query problem.

Solution:

def department_headcount(session):
departments = (
session.query(Department)
.options(joinedload(Department.employees))
.order_by(Department.name)
.all()
)
return [(d.name, len(d.employees)) for d in departments]
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Session, relationship, joinedload

class Base(DeclarativeBase):
  pass

class Department(Base):
  __tablename__ = 'departments'
  id = Column(Integer, primary_key=True)
  name = Column(String)
  employees = relationship('Emp', back_populates='dept', lazy='select')

class Emp(Base):
  __tablename__ = 'employees'
  id = Column(Integer, primary_key=True)
  name = Column(String)
  dept_id = Column(Integer, ForeignKey('departments.id'))
  dept = relationship('Department', back_populates='employees')

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as s:
  eng = Department(name='Engineering', employees=[
      Emp(name='Alice'), Emp(name='Carol'), Emp(name='Eve')
  ])
  mkt = Department(name='Marketing', employees=[
      Emp(name='Bob'), Emp(name='Dave')
  ])
  s.add_all([eng, mkt])
  s.commit()

# TODO: Query all departments with their employees using joinedload
# to avoid the N+1 query problem.
# Return (dept_name, employee_count) sorted by dept_name.

def department_headcount(session):
  pass

with Session(engine) as session:
  print(department_headcount(session))
Expected Output
[('Engineering', 3), ('Marketing', 2)]
Hints

Hint 1: session.query(Department).options(joinedload(Department.employees)).all()

Hint 2: joinedload issues a single JOIN query instead of N+1 SELECT queries.

Hint 3: After the query, len(dept.employees) works without additional DB hits.


#7Many-to-Many with Association TableMedium
many-to-manyassociation tablesecondaryrelationship

Define a many-to-many relationship with a secondary association table and query through it.

Solution:

class Student(Base):
__tablename__ = 'students'
id = Column(Integer, primary_key=True)
name = Column(String)
courses = relationship('Course', secondary=enrollments, back_populates='students')

class Course(Base):
__tablename__ = 'courses'
id = Column(Integer, primary_key=True)
title = Column(String)
students = relationship('Student', secondary=enrollments, back_populates='courses')

def enrollment_report(engine):
Base.metadata.create_all(engine)
with Session(engine) as session:
python = Course(title='Python')
databases = Course(title='Databases')
networking = Course(title='Networking')

alice = Student(name='Alice', courses=[python, databases])
bob = Student(name='Bob', courses=[python])
carol = Student(name='Carol', courses=[python, databases, networking])

session.add_all([alice, bob, carol])
session.commit()

with Session(engine) as session:
students = session.query(Student).order_by(Student.name).all()
return [
(s.name, sorted(c.title for c in s.courses))
for s in students
]
from sqlalchemy import create_engine, Column, Integer, String, Table, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Session, relationship

class Base(DeclarativeBase):
  pass

# TODO: Define a many-to-many relationship between Student and Course
# using an association table 'enrollments'.

enrollments = Table(
  'enrollments', Base.metadata,
  Column('student_id', Integer, ForeignKey('students.id')),
  Column('course_id', Integer, ForeignKey('courses.id'))
)

class Student(Base):
  __tablename__ = 'students'
  id = Column(Integer, primary_key=True)
  name = Column(String)
  # TODO: add courses relationship

class Course(Base):
  __tablename__ = 'courses'
  id = Column(Integer, primary_key=True)
  title = Column(String)
  # TODO: add students relationship

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

# Enroll students, then return (student_name, [course_titles]) sorted by name.

def enrollment_report(engine):
  pass

print(enrollment_report(engine))
Expected Output
[('Alice', ['Databases', 'Python']), ('Bob', ['Python']), ('Carol', ['Databases', 'Networking', 'Python'])]
Hints

Hint 1: Student.courses = relationship('Course', secondary=enrollments, back_populates='students')

Hint 2: To enroll: student.courses.append(course) then session.commit().

Hint 3: secondary= points to the association Table object.


#8Bulk Operations with bulk_insert_mappingsMedium
bulk_insert_mappingsbulk operationsperformance

Compare session.add() in a loop versus bulk_insert_mappings for bulk data loading performance.

Solution:

import time

def benchmark_bulk(engine, n=5000):
entries = [
{'level': 'INFO', 'message': f'msg {i}', 'ts': float(i)}
for i in range(n)
]

# Method 1: loop add
with Session(engine) as session:
t0 = time.perf_counter()
for e in entries:
session.add(LogEntry(**e))
session.commit()
slow_ms = (time.perf_counter() - t0) * 1000

# Clear
with Session(engine) as session:
session.query(LogEntry).delete()
session.commit()

# Method 2: bulk insert
with Session(engine) as session:
t0 = time.perf_counter()
session.bulk_insert_mappings(LogEntry, entries)
session.commit()
fast_ms = (time.perf_counter() - t0) * 1000

return slow_ms, fast_ms
from sqlalchemy import create_engine, Column, Integer, String, Float, text
from sqlalchemy.orm import DeclarativeBase, Session
import time

class Base(DeclarativeBase):
  pass

class LogEntry(Base):
  __tablename__ = 'log_entries'
  id = Column(Integer, primary_key=True)
  level = Column(String)
  message = Column(String)
  ts = Column(Float)

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

# TODO: Compare insertion speed:
# Method 1: session.add() in a loop (slow — one INSERT per object)
# Method 2: session.bulk_insert_mappings() (fast — batch insert)
# Insert 5000 rows each way and return (slow_ms, fast_ms).

def benchmark_bulk(engine, n=5000):
  pass

slow, fast = benchmark_bulk(engine)
print(f'Loop add: {slow:.1f}ms')
print(f'Bulk insert: {fast:.1f}ms')
print(f'Speedup: {slow/fast:.1f}x')
Expected Output
Loop add: X.Xms
Bulk insert: X.Xms
Speedup: X.Xx
Hints

Hint 1: session.bulk_insert_mappings(LogEntry, list_of_dicts) inserts without creating ORM objects.

Hint 2: Wrap both inserts in separate sessions so they don't interfere.

Hint 3: time.perf_counter() gives microsecond-precision timing.


#9Repository Pattern — Data Access LayerHard
repository patterndata access layerabstraction

Implement the Repository pattern with a full CRUD data access layer wrapping SQLAlchemy sessions.

Solution:

from typing import Optional, List

class ProductRepository:
def __init__(self, session: Session):
self.session = session

def create(self, name, category, price, stock=0):
product = Product(name=name, category=category, price=price, stock=stock)
self.session.add(product)
self.session.commit()
self.session.refresh(product)
return product

def get_by_id(self, product_id) -> Optional[Product]:
return self.session.query(Product).filter(Product.id == product_id).first()

def list_by_category(self, category) -> List[Product]:
return self.session.query(Product).filter(Product.category == category).all()

def update_price(self, product_id, new_price) -> bool:
product = self.get_by_id(product_id)
if not product:
return False
product.price = new_price
self.session.commit()
return True

def delete(self, product_id) -> bool:
product = self.get_by_id(product_id)
if not product:
return False
self.session.delete(product)
self.session.commit()
return True

def search(self, name_contains) -> List[Product]:
return (
self.session.query(Product)
.filter(Product.name.ilike(f'%{name_contains}%'))
.all()
)
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase, Session
from typing import Optional, List

class Base(DeclarativeBase):
  pass

class Product(Base):
  __tablename__ = 'products'
  id = Column(Integer, primary_key=True, autoincrement=True)
  name = Column(String, nullable=False)
  category = Column(String)
  price = Column(Float)
  stock = Column(Integer, default=0)

# TODO: Implement ProductRepository with these methods:
# - create(name, category, price, stock) -> Product
# - get_by_id(id) -> Optional[Product]
# - list_by_category(category) -> List[Product]
# - update_price(id, new_price) -> bool
# - delete(id) -> bool
# - search(name_contains) -> List[Product]

class ProductRepository:
  def __init__(self, session: Session):
      self.session = session

  def create(self, name, category, price, stock=0):
      pass

  def get_by_id(self, product_id):
      pass

  def list_by_category(self, category):
      pass

  def update_price(self, product_id, new_price):
      pass

  def delete(self, product_id):
      pass

  def search(self, name_contains):
      pass

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as session:
  repo = ProductRepository(session)
  p1 = repo.create('Widget', 'Hardware', 9.99, 100)
  p2 = repo.create('Gadget', 'Electronics', 49.99, 50)
  p3 = repo.create('Super Widget', 'Hardware', 14.99, 75)

  print('Category Hardware:', [p.name for p in repo.list_by_category('Hardware')])
  print('Search "widget":', [p.name for p in repo.search('widget')])
  print('Update price:', repo.update_price(p1.id, 12.99))
  print('After update:', repo.get_by_id(p1.id).price)
  print('Delete:', repo.delete(p2.id))
  print('After delete:', repo.get_by_id(p2.id))
Expected Output
Category Hardware: ['Widget', 'Super Widget']
Search "widget": ['Widget', 'Super Widget']
Update price: True
After update: 12.99
Delete: True
After delete: None
Hints

Hint 1: session.add(product); session.commit(); session.refresh(product) gives the auto-generated id.

Hint 2: session.query(Product).filter(Product.id == id).first() returns None if not found.

Hint 3: For search: filter(Product.name.ilike(f'%{name_contains}%')) is case-insensitive.


#10Hybrid Property and Custom QueryHard
hybrid_propertycustom expressioncolumn_property

Add a hybrid_property for computed order totals and use it to filter items at the SQL level.

Solution:

from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase, Session
from sqlalchemy.ext.hybrid import hybrid_property

class Base(DeclarativeBase):
pass

class OrderItem(Base):
__tablename__ = 'order_items'
id = Column(Integer, primary_key=True)
product = Column(String)
unit_price = Column(Float)
quantity = Column(Integer)
discount_pct = Column(Float, default=0.0)

@hybrid_property
def total(self):
return self.unit_price * self.quantity * (1 - self.discount_pct / 100)

@total.expression
def total(cls):
return cls.unit_price * cls.quantity * (1 - cls.discount_pct / 100)

def expensive_items(session):
items = (
session.query(OrderItem)
.filter(OrderItem.total > 50)
.order_by(OrderItem.total.desc())
.all()
)
return [(i.product, i.total) for i in items]
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase, Session
from sqlalchemy.ext.hybrid import hybrid_property

class Base(DeclarativeBase):
  pass

class OrderItem(Base):
  __tablename__ = 'order_items'
  id = Column(Integer, primary_key=True)
  product = Column(String)
  unit_price = Column(Float)
  quantity = Column(Integer)
  discount_pct = Column(Float, default=0.0)  # e.g., 10.0 means 10%

  # TODO: Add a hybrid property 'total' that computes:
  # unit_price * quantity * (1 - discount_pct / 100)

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

with Session(engine) as s:
  s.add_all([
      OrderItem(product='Widget', unit_price=10.0, quantity=5, discount_pct=10.0),
      OrderItem(product='Gadget', unit_price=50.0, quantity=2, discount_pct=0.0),
      OrderItem(product='Doohickey', unit_price=25.0, quantity=4, discount_pct=20.0),
  ])
  s.commit()

# TODO: Query all order items where total > 50 using the hybrid property.
# Return list of (product, total) sorted by total desc.

def expensive_items(session):
  pass

with Session(engine) as session:
  print(expensive_items(session))
Expected Output
[('Gadget', 100.0), ('Doohickey', 80.0)]
Hints

Hint 1: @hybrid_property def total(self): return self.unit_price * self.quantity * (1 - self.discount_pct / 100)

Hint 2: For SQL-level filtering, add @total.expression returning a SQLAlchemy expression.

Hint 3: Without the expression, use Python-side filtering: [i for i in items if i.total > 50].


#11Unit of Work — Transaction BoundaryHard
unit of worksession scopecontext managertransaction

Implement a Unit of Work context manager that guarantees atomic multi-table writes with audit logging.

Solution:

from contextlib import contextmanager

@contextmanager
def unit_of_work():
session = SessionFactory()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()

def transfer(from_id, to_id, amount):
with unit_of_work() as session:
sender = session.query(Account).filter_by(id=from_id).one()
receiver = session.query(Account).filter_by(id=to_id).one()
if sender.balance < amount:
raise ValueError(f"Insufficient balance: {sender.balance} < {amount}")
sender.balance -= amount
receiver.balance += amount
session.add(AuditLog(
action='transfer',
details=f'{sender.owner} -> {receiver.owner}: {amount}'
))
from sqlalchemy import create_engine, Column, Integer, String, Float, event
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
from contextlib import contextmanager

class Base(DeclarativeBase):
  pass

class Account(Base):
  __tablename__ = 'accounts'
  id = Column(Integer, primary_key=True)
  owner = Column(String)
  balance = Column(Float)

class AuditLog(Base):
  __tablename__ = 'audit_log'
  id = Column(Integer, primary_key=True)
  action = Column(String)
  details = Column(String)

engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)

SessionFactory = sessionmaker(bind=engine)

# TODO: Implement a unit_of_work context manager that:
# - Yields a session
# - Commits on success
# - Rolls back on exception (and re-raises)
# Then implement transfer(from_id, to_id, amount) that:
# - Uses unit_of_work
# - Deducts from sender and credits receiver
# - Writes an audit log entry
# - Is fully atomic

@contextmanager
def unit_of_work():
  pass

def transfer(from_id, to_id, amount):
  pass

# Setup
with unit_of_work() as session:
  session.add_all([
      Account(owner='Alice', balance=1000.0),
      Account(owner='Bob', balance=500.0),
  ])

transfer(1, 2, 300)

with unit_of_work() as session:
  accounts = session.query(Account).order_by(Account.id).all()
  logs = session.query(AuditLog).all()
  print('Alice:', accounts[0].balance)
  print('Bob:', accounts[1].balance)
  print('Audit entries:', len(logs))
Expected Output
Alice: 700.0
Bob: 800.0
Audit entries: 1
Hints

Hint 1: The context manager creates a session, yields it, commits or rolls back in the finally/except block.

Hint 2: Within transfer, read both balances, update them, and insert an AuditLog row.

Hint 3: If Alice's balance goes negative, raise ValueError — the context manager will roll back.

© 2026 EngineersOfAI. All rights reserved.