Python ORM with SQLAlchemy Practice Problems & Exercises
Practice: ORM with SQLAlchemy
← Back to lesson:::note Environment
These problems use SQLAlchemy with an in-memory SQLite backend. Install with: pip install sqlalchemy
:::
Define a SQLAlchemy model, create the table, insert rows, and query them back.
Solution:
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase, Session
class Base(DeclarativeBase):
pass
class Product(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
price = Column(Float, nullable=False)
in_stock = Column(Integer, default=0)
def create_and_insert():
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
with Session(engine) as session:
session.add_all([
Product(name='Widget', price=9.99),
Product(name='Gadget', price=49.99),
Product(name='Doohickey', price=24.99),
])
session.commit()
with Session(engine) as session:
products = session.query(Product).order_by(Product.id).all()
return [(p.id, p.name, p.price) for p in products]
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase, Session
class Base(DeclarativeBase):
pass
# TODO: Define a Product model with columns:
# id (Integer, primary key), name (String), price (Float), in_stock (Integer, default=0)
# Create the table, insert 3 products, and return all of them.
class Product(Base):
__tablename__ = 'products'
# Add your columns here
def create_and_insert():
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
with Session(engine) as session:
# Insert 3 products
pass
with Session(engine) as session:
# Return all products as list of (id, name, price)
pass
print(create_and_insert())
Expected Output
[(1, 'Widget', 9.99), (2, 'Gadget', 49.99), (3, 'Doohickey', 24.99)]Hints
Hint 1: class Product(Base): __tablename__ = 'products' defines the table.
Hint 2: session.add(obj) stages; session.commit() saves to DB.
Hint 3: session.query(Product).all() returns all Product objects.
Use filter() with multiple conditions to query employees by department and salary.
Solution:
from sqlalchemy import desc
def high_paid_engineers(session):
employees = (
session.query(Employee)
.filter(Employee.department == 'Engineering', Employee.salary > 90000)
.order_by(desc(Employee.salary))
.all()
)
return [e.name for e in employees]
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase, Session
class Base(DeclarativeBase):
pass
class Employee(Base):
__tablename__ = 'employees'
id = Column(Integer, primary_key=True)
name = Column(String)
department = Column(String)
salary = Column(Float)
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
with Session(engine) as s:
s.add_all([
Employee(name='Alice', department='Engineering', salary=95000),
Employee(name='Bob', department='Marketing', salary=72000),
Employee(name='Carol', department='Engineering', salary=105000),
Employee(name='Dave', department='HR', salary=68000),
Employee(name='Eve', department='Engineering', salary=88000),
])
s.commit()
# TODO: Return names of engineers earning more than 90000,
# sorted by salary descending.
def high_paid_engineers(session):
pass
with Session(engine) as session:
print(high_paid_engineers(session))
Expected Output
['Carol', 'Alice']Hints
Hint 1: session.query(Employee).filter(Employee.department == 'Engineering', Employee.salary > 90000)
Hint 2: .order_by(Employee.salary.desc()) sorts descending.
Hint 3: Alternatively use .filter_by(department='Engineering') for simple equality checks.
Perform INSERT, UPDATE, and DELETE operations through the SQLAlchemy session.
Solution:
def manage_tags(engine):
with Session(engine) as session:
session.add_all([
Tag(name='python', color='blue'),
Tag(name='sql', color='green'),
Tag(name='devops', color='orange'),
])
session.commit()
sql_tag = session.query(Tag).filter_by(name='sql').one()
sql_tag.color = 'cyan'
devops_tag = session.query(Tag).filter_by(name='devops').one()
session.delete(devops_tag)
session.commit()
tags = session.query(Tag).order_by(Tag.name).all()
return [(t.name, t.color) for t in tags]
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import DeclarativeBase, Session
class Base(DeclarativeBase):
pass
class Tag(Base):
__tablename__ = 'tags'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
color = Column(String, default='grey')
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
# TODO:
# 1. Insert tags: 'python' (blue), 'sql' (green), 'devops' (orange)
# 2. Update 'sql' tag color to 'cyan'
# 3. Delete 'devops' tag
# Return remaining tags as list of (name, color) sorted by name.
def manage_tags(engine):
pass
print(manage_tags(engine))
Expected Output
[('python', 'blue'), ('sql', 'cyan')]Hints
Hint 1: session.add_all([...]) inserts multiple objects.
Hint 2: Fetch the object, modify its attribute, then session.commit() — SQLAlchemy detects the change.
Hint 3: session.delete(obj) marks for deletion; session.commit() executes it.
Use SQLAlchemy's func module to compute COUNT, SUM, and AVG aggregates in a single query.
Solution:
def shipped_stats(session):
count, total, avg = session.query(
func.count(Order.id),
func.sum(Order.total),
func.avg(Order.total)
).filter(Order.status == 'shipped').one()
return {'count': count, 'total': total, 'avg': round(avg, 2)}
from sqlalchemy import create_engine, Column, Integer, String, Float, func
from sqlalchemy.orm import DeclarativeBase, Session
class Base(DeclarativeBase):
pass
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
customer = Column(String)
total = Column(Float)
status = Column(String)
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
with Session(engine) as s:
s.add_all([
Order(customer='Alice', total=250.0, status='shipped'),
Order(customer='Bob', total=125.5, status='pending'),
Order(customer='Alice', total=75.0, status='shipped'),
Order(customer='Carol', total=400.0, status='shipped'),
Order(customer='Bob', total=320.0, status='shipped'),
])
s.commit()
# TODO: Return a dict with:
# - 'count': total number of shipped orders
# - 'total': sum of shipped order totals
# - 'avg': average shipped order total (rounded to 2 decimals)
def shipped_stats(session):
pass
with Session(engine) as session:
print(shipped_stats(session))
Expected Output
{'count': 4, 'total': 1045.0, 'avg': 261.25}Hints
Hint 1: session.query(func.count(Order.id), func.sum(Order.total), func.avg(Order.total)).filter(...).one()
Hint 2: filter(Order.status == 'shipped') restricts to shipped orders.
Hint 3: round(avg, 2) rounds to 2 decimal places.
Define a one-to-many relationship between Author and Book, then navigate it to list each author's books.
Solution:
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Session, relationship
class Base(DeclarativeBase):
pass
class Author(Base):
__tablename__ = 'authors'
id = Column(Integer, primary_key=True)
name = Column(String)
books = relationship('Book', back_populates='author', order_by='Book.title')
class Book(Base):
__tablename__ = 'books'
id = Column(Integer, primary_key=True)
title = Column(String)
price = Column(Float)
author_id = Column(Integer, ForeignKey('authors.id'))
author = relationship('Author', back_populates='books')
def authors_with_books(engine):
Base.metadata.create_all(engine)
with Session(engine) as session:
alice = Author(name='Alice Author', books=[
Book(title='Python Cookbook', price=39.99),
Book(title='Data Science Handbook', price=49.99),
])
bob = Author(name='Bob Writer', books=[
Book(title='SQL Mastery', price=29.99),
])
session.add_all([alice, bob])
session.commit()
with Session(engine) as session:
authors = session.query(Author).order_by(Author.name).all()
return [(a.name, [b.title for b in a.books]) for a in authors]
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Session, relationship
class Base(DeclarativeBase):
pass
class Author(Base):
__tablename__ = 'authors'
id = Column(Integer, primary_key=True)
name = Column(String)
# TODO: add relationship to books
class Book(Base):
__tablename__ = 'books'
id = Column(Integer, primary_key=True)
title = Column(String)
price = Column(Float)
author_id = Column(Integer, ForeignKey('authors.id'))
# TODO: add back-reference to author
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
# TODO: Insert authors and their books, then return each author's name
# and a list of their book titles, sorted by author name.
def authors_with_books(engine):
pass
print(authors_with_books(engine))
Expected Output
[('Alice Author', ['Data Science Handbook', 'Python Cookbook']), ('Bob Writer', ['SQL Mastery'])]Hints
Hint 1: Author.books = relationship('Book', back_populates='author') on Author.
Hint 2: Book.author = relationship('Author', back_populates='books') on Book.
Hint 3: author.books returns the list of Book objects for that author.
Use joinedload to eagerly fetch related records and avoid the N+1 query problem.
Solution:
def department_headcount(session):
departments = (
session.query(Department)
.options(joinedload(Department.employees))
.order_by(Department.name)
.all()
)
return [(d.name, len(d.employees)) for d in departments]
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Session, relationship, joinedload
class Base(DeclarativeBase):
pass
class Department(Base):
__tablename__ = 'departments'
id = Column(Integer, primary_key=True)
name = Column(String)
employees = relationship('Emp', back_populates='dept', lazy='select')
class Emp(Base):
__tablename__ = 'employees'
id = Column(Integer, primary_key=True)
name = Column(String)
dept_id = Column(Integer, ForeignKey('departments.id'))
dept = relationship('Department', back_populates='employees')
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
with Session(engine) as s:
eng = Department(name='Engineering', employees=[
Emp(name='Alice'), Emp(name='Carol'), Emp(name='Eve')
])
mkt = Department(name='Marketing', employees=[
Emp(name='Bob'), Emp(name='Dave')
])
s.add_all([eng, mkt])
s.commit()
# TODO: Query all departments with their employees using joinedload
# to avoid the N+1 query problem.
# Return (dept_name, employee_count) sorted by dept_name.
def department_headcount(session):
pass
with Session(engine) as session:
print(department_headcount(session))
Expected Output
[('Engineering', 3), ('Marketing', 2)]Hints
Hint 1: session.query(Department).options(joinedload(Department.employees)).all()
Hint 2: joinedload issues a single JOIN query instead of N+1 SELECT queries.
Hint 3: After the query, len(dept.employees) works without additional DB hits.
Define a many-to-many relationship with a secondary association table and query through it.
Solution:
class Student(Base):
__tablename__ = 'students'
id = Column(Integer, primary_key=True)
name = Column(String)
courses = relationship('Course', secondary=enrollments, back_populates='students')
class Course(Base):
__tablename__ = 'courses'
id = Column(Integer, primary_key=True)
title = Column(String)
students = relationship('Student', secondary=enrollments, back_populates='courses')
def enrollment_report(engine):
Base.metadata.create_all(engine)
with Session(engine) as session:
python = Course(title='Python')
databases = Course(title='Databases')
networking = Course(title='Networking')
alice = Student(name='Alice', courses=[python, databases])
bob = Student(name='Bob', courses=[python])
carol = Student(name='Carol', courses=[python, databases, networking])
session.add_all([alice, bob, carol])
session.commit()
with Session(engine) as session:
students = session.query(Student).order_by(Student.name).all()
return [
(s.name, sorted(c.title for c in s.courses))
for s in students
]
from sqlalchemy import create_engine, Column, Integer, String, Table, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Session, relationship
class Base(DeclarativeBase):
pass
# TODO: Define a many-to-many relationship between Student and Course
# using an association table 'enrollments'.
enrollments = Table(
'enrollments', Base.metadata,
Column('student_id', Integer, ForeignKey('students.id')),
Column('course_id', Integer, ForeignKey('courses.id'))
)
class Student(Base):
__tablename__ = 'students'
id = Column(Integer, primary_key=True)
name = Column(String)
# TODO: add courses relationship
class Course(Base):
__tablename__ = 'courses'
id = Column(Integer, primary_key=True)
title = Column(String)
# TODO: add students relationship
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
# Enroll students, then return (student_name, [course_titles]) sorted by name.
def enrollment_report(engine):
pass
print(enrollment_report(engine))
Expected Output
[('Alice', ['Databases', 'Python']), ('Bob', ['Python']), ('Carol', ['Databases', 'Networking', 'Python'])]Hints
Hint 1: Student.courses = relationship('Course', secondary=enrollments, back_populates='students')
Hint 2: To enroll: student.courses.append(course) then session.commit().
Hint 3: secondary= points to the association Table object.
Compare session.add() in a loop versus bulk_insert_mappings for bulk data loading performance.
Solution:
import time
def benchmark_bulk(engine, n=5000):
entries = [
{'level': 'INFO', 'message': f'msg {i}', 'ts': float(i)}
for i in range(n)
]
# Method 1: loop add
with Session(engine) as session:
t0 = time.perf_counter()
for e in entries:
session.add(LogEntry(**e))
session.commit()
slow_ms = (time.perf_counter() - t0) * 1000
# Clear
with Session(engine) as session:
session.query(LogEntry).delete()
session.commit()
# Method 2: bulk insert
with Session(engine) as session:
t0 = time.perf_counter()
session.bulk_insert_mappings(LogEntry, entries)
session.commit()
fast_ms = (time.perf_counter() - t0) * 1000
return slow_ms, fast_ms
from sqlalchemy import create_engine, Column, Integer, String, Float, text
from sqlalchemy.orm import DeclarativeBase, Session
import time
class Base(DeclarativeBase):
pass
class LogEntry(Base):
__tablename__ = 'log_entries'
id = Column(Integer, primary_key=True)
level = Column(String)
message = Column(String)
ts = Column(Float)
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
# TODO: Compare insertion speed:
# Method 1: session.add() in a loop (slow — one INSERT per object)
# Method 2: session.bulk_insert_mappings() (fast — batch insert)
# Insert 5000 rows each way and return (slow_ms, fast_ms).
def benchmark_bulk(engine, n=5000):
pass
slow, fast = benchmark_bulk(engine)
print(f'Loop add: {slow:.1f}ms')
print(f'Bulk insert: {fast:.1f}ms')
print(f'Speedup: {slow/fast:.1f}x')
Expected Output
Loop add: X.Xms
Bulk insert: X.Xms
Speedup: X.XxHints
Hint 1: session.bulk_insert_mappings(LogEntry, list_of_dicts) inserts without creating ORM objects.
Hint 2: Wrap both inserts in separate sessions so they don't interfere.
Hint 3: time.perf_counter() gives microsecond-precision timing.
Implement the Repository pattern with a full CRUD data access layer wrapping SQLAlchemy sessions.
Solution:
from typing import Optional, List
class ProductRepository:
def __init__(self, session: Session):
self.session = session
def create(self, name, category, price, stock=0):
product = Product(name=name, category=category, price=price, stock=stock)
self.session.add(product)
self.session.commit()
self.session.refresh(product)
return product
def get_by_id(self, product_id) -> Optional[Product]:
return self.session.query(Product).filter(Product.id == product_id).first()
def list_by_category(self, category) -> List[Product]:
return self.session.query(Product).filter(Product.category == category).all()
def update_price(self, product_id, new_price) -> bool:
product = self.get_by_id(product_id)
if not product:
return False
product.price = new_price
self.session.commit()
return True
def delete(self, product_id) -> bool:
product = self.get_by_id(product_id)
if not product:
return False
self.session.delete(product)
self.session.commit()
return True
def search(self, name_contains) -> List[Product]:
return (
self.session.query(Product)
.filter(Product.name.ilike(f'%{name_contains}%'))
.all()
)
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase, Session
from typing import Optional, List
class Base(DeclarativeBase):
pass
class Product(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String, nullable=False)
category = Column(String)
price = Column(Float)
stock = Column(Integer, default=0)
# TODO: Implement ProductRepository with these methods:
# - create(name, category, price, stock) -> Product
# - get_by_id(id) -> Optional[Product]
# - list_by_category(category) -> List[Product]
# - update_price(id, new_price) -> bool
# - delete(id) -> bool
# - search(name_contains) -> List[Product]
class ProductRepository:
def __init__(self, session: Session):
self.session = session
def create(self, name, category, price, stock=0):
pass
def get_by_id(self, product_id):
pass
def list_by_category(self, category):
pass
def update_price(self, product_id, new_price):
pass
def delete(self, product_id):
pass
def search(self, name_contains):
pass
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
with Session(engine) as session:
repo = ProductRepository(session)
p1 = repo.create('Widget', 'Hardware', 9.99, 100)
p2 = repo.create('Gadget', 'Electronics', 49.99, 50)
p3 = repo.create('Super Widget', 'Hardware', 14.99, 75)
print('Category Hardware:', [p.name for p in repo.list_by_category('Hardware')])
print('Search "widget":', [p.name for p in repo.search('widget')])
print('Update price:', repo.update_price(p1.id, 12.99))
print('After update:', repo.get_by_id(p1.id).price)
print('Delete:', repo.delete(p2.id))
print('After delete:', repo.get_by_id(p2.id))
Expected Output
Category Hardware: ['Widget', 'Super Widget']
Search "widget": ['Widget', 'Super Widget']
Update price: True
After update: 12.99
Delete: True
After delete: NoneHints
Hint 1: session.add(product); session.commit(); session.refresh(product) gives the auto-generated id.
Hint 2: session.query(Product).filter(Product.id == id).first() returns None if not found.
Hint 3: For search: filter(Product.name.ilike(f'%{name_contains}%')) is case-insensitive.
Add a hybrid_property for computed order totals and use it to filter items at the SQL level.
Solution:
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase, Session
from sqlalchemy.ext.hybrid import hybrid_property
class Base(DeclarativeBase):
pass
class OrderItem(Base):
__tablename__ = 'order_items'
id = Column(Integer, primary_key=True)
product = Column(String)
unit_price = Column(Float)
quantity = Column(Integer)
discount_pct = Column(Float, default=0.0)
@hybrid_property
def total(self):
return self.unit_price * self.quantity * (1 - self.discount_pct / 100)
@total.expression
def total(cls):
return cls.unit_price * cls.quantity * (1 - cls.discount_pct / 100)
def expensive_items(session):
items = (
session.query(OrderItem)
.filter(OrderItem.total > 50)
.order_by(OrderItem.total.desc())
.all()
)
return [(i.product, i.total) for i in items]
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import DeclarativeBase, Session
from sqlalchemy.ext.hybrid import hybrid_property
class Base(DeclarativeBase):
pass
class OrderItem(Base):
__tablename__ = 'order_items'
id = Column(Integer, primary_key=True)
product = Column(String)
unit_price = Column(Float)
quantity = Column(Integer)
discount_pct = Column(Float, default=0.0) # e.g., 10.0 means 10%
# TODO: Add a hybrid property 'total' that computes:
# unit_price * quantity * (1 - discount_pct / 100)
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
with Session(engine) as s:
s.add_all([
OrderItem(product='Widget', unit_price=10.0, quantity=5, discount_pct=10.0),
OrderItem(product='Gadget', unit_price=50.0, quantity=2, discount_pct=0.0),
OrderItem(product='Doohickey', unit_price=25.0, quantity=4, discount_pct=20.0),
])
s.commit()
# TODO: Query all order items where total > 50 using the hybrid property.
# Return list of (product, total) sorted by total desc.
def expensive_items(session):
pass
with Session(engine) as session:
print(expensive_items(session))
Expected Output
[('Gadget', 100.0), ('Doohickey', 80.0)]Hints
Hint 1: @hybrid_property def total(self): return self.unit_price * self.quantity * (1 - self.discount_pct / 100)
Hint 2: For SQL-level filtering, add @total.expression returning a SQLAlchemy expression.
Hint 3: Without the expression, use Python-side filtering: [i for i in items if i.total > 50].
Implement a Unit of Work context manager that guarantees atomic multi-table writes with audit logging.
Solution:
from contextlib import contextmanager
@contextmanager
def unit_of_work():
session = SessionFactory()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
def transfer(from_id, to_id, amount):
with unit_of_work() as session:
sender = session.query(Account).filter_by(id=from_id).one()
receiver = session.query(Account).filter_by(id=to_id).one()
if sender.balance < amount:
raise ValueError(f"Insufficient balance: {sender.balance} < {amount}")
sender.balance -= amount
receiver.balance += amount
session.add(AuditLog(
action='transfer',
details=f'{sender.owner} -> {receiver.owner}: {amount}'
))
from sqlalchemy import create_engine, Column, Integer, String, Float, event
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
from contextlib import contextmanager
class Base(DeclarativeBase):
pass
class Account(Base):
__tablename__ = 'accounts'
id = Column(Integer, primary_key=True)
owner = Column(String)
balance = Column(Float)
class AuditLog(Base):
__tablename__ = 'audit_log'
id = Column(Integer, primary_key=True)
action = Column(String)
details = Column(String)
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
SessionFactory = sessionmaker(bind=engine)
# TODO: Implement a unit_of_work context manager that:
# - Yields a session
# - Commits on success
# - Rolls back on exception (and re-raises)
# Then implement transfer(from_id, to_id, amount) that:
# - Uses unit_of_work
# - Deducts from sender and credits receiver
# - Writes an audit log entry
# - Is fully atomic
@contextmanager
def unit_of_work():
pass
def transfer(from_id, to_id, amount):
pass
# Setup
with unit_of_work() as session:
session.add_all([
Account(owner='Alice', balance=1000.0),
Account(owner='Bob', balance=500.0),
])
transfer(1, 2, 300)
with unit_of_work() as session:
accounts = session.query(Account).order_by(Account.id).all()
logs = session.query(AuditLog).all()
print('Alice:', accounts[0].balance)
print('Bob:', accounts[1].balance)
print('Audit entries:', len(logs))
Expected Output
Alice: 700.0
Bob: 800.0
Audit entries: 1Hints
Hint 1: The context manager creates a session, yields it, commits or rolls back in the finally/except block.
Hint 2: Within transfer, read both balances, update them, and insert an AuditLog row.
Hint 3: If Alice's balance goes negative, raise ValueError — the context manager will roll back.
