ORM with SQLAlchemy
Reading time: ~35 minutes | Level: Intermediate → Engineering
Before reading further, consider this puzzle:
from flask import Flask
from myapp.models import User, db
app = Flask(__name__)
@app.get("/dashboard")
def dashboard():
users = db.session.query(User).all() # 1 SQL query
results = []
for user in users: # 1000 users
results.append({
"name": user.name,
"order_count": len(user.orders), # user.orders accessed here
})
return {"users": results}
With 1000 users in the database, how many SQL queries does this endpoint execute?
The answer is 1001. The first query fetches all users. Then, because user.orders is a relationship with default lazy loading, SQLAlchemy fires a separate SELECT query for every single user the first time user.orders is accessed. 1000 users = 1000 extra queries.
This is the N+1 problem - and it is one of the most common performance killers hidden inside ORM code. It does not surface in development (5 users in your local database) and only shows up in production under real load. By the end of this lesson you will know exactly why it happens, how to detect it, and how to eliminate it entirely with SQLAlchemy's eager loading strategies.
What You Will Learn
- SQLAlchemy's architecture: Core vs ORM, the Engine, and the connection pool
- Declaring models with
DeclarativeBase,mapped_column, and Python type annotations - Relationships:
relationship(),ForeignKey, one-to-many, and many-to-many with an association table - The Session - SQLAlchemy's unit-of-work pattern and object lifecycle
- Querying with SQLAlchemy 2.0 style:
select(),where(),join(),scalars() - Lazy vs eager loading - the root cause of N+1 and how to avoid it
selectinloadandjoinedload- eliminating N+1 with explicit eager loading- Bulk insert and update operations for high-throughput scenarios
- SQLAlchemy with FastAPI - the session dependency injection pattern
Prerequisites
- Lesson 01 (SQL Fundamentals) - understanding raw SQL is essential for reading what SQLAlchemy generates
- Lesson 03 (PostgreSQL) - connection strings, transactions, and driver behaviour
- Lesson 04 (Transactions) - SQLAlchemy's session is built on top of database transactions
Part 1 - Architecture: Core vs ORM, Engine, and Connection Pool
SQLAlchemy is two libraries in one package:
Your Application Code
↓
┌───────────────────────────────────┐
│ SQLAlchemy ORM │ ← Declarative models, Session, relationships
│ (Unit of Work, Identity Map) │
└────────────────┬──────────────────┘
↓
┌───────────────────────────────────┐
│ SQLAlchemy Core │ ← SQL expression language, Table, Column
│ (select(), insert(), join()) │
└────────────────┬──────────────────┘
↓
┌───────────────────────────────────┐
│ Engine + Connection Pool │ ← Manages raw DBAPI connections
└────────────────┬──────────────────┘
↓
┌───────────────────────────────────┐
│ DBAPI Driver (psycopg2, etc.) │ ← Speaks the wire protocol to the DB
└───────────────────────────────────┘
SQLAlchemy Core is a Python-level SQL expression language. You build SQL statements as Python objects - select(users).where(users.c.id == 1) - and SQLAlchemy compiles them to dialect-specific SQL. Core is ideal for complex queries, bulk operations, and places where you want full control over the SQL generated.
SQLAlchemy ORM adds the object-relational mapping layer on top of Core. You define Python classes, map them to tables, and SQLAlchemy translates object operations (add, delete, attribute access) into Core expressions. The ORM also maintains an identity map (a session-level cache mapping primary keys to Python objects) and implements the unit of work pattern (tracking changes and flushing them together).
The Engine is the entry point to SQLAlchemy. It holds the connection pool and the dialect (the component that knows how to speak PostgreSQL, SQLite, MySQL). You create it once at application startup:
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
"postgresql+psycopg2://user:password@localhost:5432/mydb",
pool_size=10, # number of persistent connections kept open
max_overflow=20, # connections beyond pool_size allowed under load
pool_timeout=30, # seconds to wait for a connection before raising
pool_pre_ping=True, # test connections before use (detects stale connections)
echo=False, # set True to log all SQL - invaluable for debugging
)
Set echo=True during development to see every SQL statement SQLAlchemy generates. In production, use a logging configuration that captures SQLAlchemy's sqlalchemy.engine logger at WARNING level - this logs only slow queries and errors, not every statement.
The connection pool (QueuePool by default) keeps a set of open database connections and hands them out to sessions. When a session finishes, its connection is returned to the pool - not closed. This avoids the overhead of opening a TCP connection and authenticating for every request. For a web application handling 100 req/s, a pool of 10–20 connections is typically sufficient because most requests spend most of their time doing application logic, not waiting on the database.
Part 2 - Declarative Models
SQLAlchemy 2.0 introduced the fully type-annotated declarative API using DeclarativeBase and mapped_column. This is the current recommended style:
from datetime import datetime, timezone
from decimal import Decimal
from typing import Optional
from sqlalchemy import String, Numeric, DateTime, Text, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
# All models inherit from this base class
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
# Mapped[T] declares the Python type; mapped_column() provides column options
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
username: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
is_active: Mapped[bool] = mapped_column(default=True, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(), # DB-level default: NOW()
nullable=False,
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(), # DB updates this column on every UPDATE
nullable=False,
)
# Optional field - Mapped[Optional[str]] allows NULL
bio: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
# Relationships declared here - covered in Part 3
orders: Mapped[list["Order"]] = relationship(back_populates="user")
def __repr__(self) -> str:
return f"<User id={self.id} email={self.email!r}>"
Type Annotation to Column Mapping
The Mapped[T] annotation and mapped_column() cooperate:
| Python annotation | SQL type produced |
|---|---|
Mapped[int] | INTEGER NOT NULL |
Mapped[Optional[int]] | INTEGER NULL |
Mapped[str] | VARCHAR NOT NULL |
Mapped[bool] | BOOLEAN NOT NULL |
Mapped[datetime] | DATETIME NOT NULL |
Mapped[Decimal] | NUMERIC NOT NULL |
Mapped[float] | FLOAT NOT NULL |
Mapped[int] implies NOT NULL. Mapped[Optional[int]] (equivalent to Mapped[int | None]) implies NULL. This is inferred directly from the Python type annotation - you do not need to repeat nullable=True or nullable=False unless you want to be explicit.
Creating Tables
# Create all tables that don't exist yet - safe to call at startup
Base.metadata.create_all(engine)
# Drop and recreate all tables - use only in tests
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
In production, never use create_all to manage schema changes. Use Alembic migrations (covered in Lesson 07) instead. Reserve create_all for test fixtures and local development.
Part 3 - Relationships
One-to-Many: User → Orders
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
# ForeignKey references the table name and column name (not the class)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False, index=True)
status: Mapped[str] = mapped_column(String(50), default="pending", nullable=False)
total_amount: Mapped[Decimal] = mapped_column(Numeric(10, 2), nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
# Many side: each Order has one User
user: Mapped["User"] = relationship(back_populates="orders")
# One side: each Order has many items
items: Mapped[list["OrderItem"]] = relationship(back_populates="order", cascade="all, delete-orphan")
def __repr__(self) -> str:
return f"<Order id={self.id} user_id={self.user_id} status={self.status!r}>"
The back_populates parameter wires both sides of the relationship together. When you set order.user = some_user, SQLAlchemy automatically adds order to some_user.orders. Both sides stay in sync in memory.
Many-to-Many: Orders ↔ Products via Association Table
# Association table - no class needed for simple many-to-many
from sqlalchemy import Table, Column
order_products = Table(
"order_products",
Base.metadata,
Column("order_id", ForeignKey("orders.id"), primary_key=True),
Column("product_id", ForeignKey("products.id"), primary_key=True),
Column("quantity", default=1, nullable=False),
)
class Product(Base):
__tablename__ = "products"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(255), nullable=False)
sku: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
price: Mapped[Decimal] = mapped_column(Numeric(10, 2), nullable=False)
stock_quantity: Mapped[int] = mapped_column(default=0, nullable=False)
# Many-to-many: a product can appear in many orders
orders: Mapped[list["Order"]] = relationship(
secondary=order_products, back_populates="products"
)
def __repr__(self) -> str:
return f"<Product id={self.id} sku={self.sku!r}>"
class OrderItem(Base):
__tablename__ = "order_items"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
order_id: Mapped[int] = mapped_column(ForeignKey("orders.id"), nullable=False, index=True)
product_id: Mapped[int] = mapped_column(ForeignKey("products.id"), nullable=False)
quantity: Mapped[int] = mapped_column(nullable=False)
unit_price: Mapped[Decimal] = mapped_column(Numeric(10, 2), nullable=False)
order: Mapped["Order"] = relationship(back_populates="items")
product: Mapped["Product"] = relationship()
Model Relationship Diagram
Part 4 - The Session: Unit of Work Pattern
The Session is the primary interface between your application and the database. It implements two important patterns:
- Unit of Work: tracks all changes made to Python objects within a session and flushes them together as a single batch of SQL statements
- Identity Map: maintains a dictionary mapping
(class, primary_key)to Python objects, ensuring that within one session there is never two different Python objects representing the same database row
Session Lifecycle
Creating and Using a Session
from sqlalchemy.orm import Session, sessionmaker
# SessionLocal is a factory for creating Session objects
# Create once at application startup
SessionLocal = sessionmaker(
bind=engine,
autocommit=False, # never commit automatically
autoflush=True, # flush before queries so the session state is consistent
expire_on_commit=True, # expire attributes after commit (forces reload on next access)
)
# Pattern 1: context manager - commits on success, rolls back on exception
def create_user(email: str, username: str, password_hash: str) -> User:
with SessionLocal() as session:
user = User(email=email, username=username, password_hash=password_hash)
session.add(user)
session.commit()
session.refresh(user) # reload from DB to get server-generated defaults
return user
# Pattern 2: explicit session management
def transfer_funds(from_account_id: int, to_account_id: int, amount: Decimal) -> None:
with SessionLocal() as session:
try:
from_acc = session.get(Account, from_account_id)
to_acc = session.get(Account, to_account_id)
if from_acc.balance < amount:
raise ValueError("Insufficient funds")
from_acc.balance -= amount
to_acc.balance += amount
session.commit() # both changes committed atomically
except Exception:
session.rollback() # undo all changes in this session
raise
Key Session Methods
| Method | What It Does |
|---|---|
session.add(obj) | Move object from Transient/Detached to Pending/Persistent state |
session.add_all([obj1, obj2]) | Add multiple objects at once |
session.get(Model, pk) | Fetch by primary key; checks identity map first |
session.flush() | Send pending SQL to DB within the current transaction (no commit) |
session.commit() | Commit the current transaction; expires all objects |
session.rollback() | Discard all changes since last commit |
session.close() | Return connection to pool; expunge all objects |
session.expunge(obj) | Remove object from session (becomes Detached) |
session.refresh(obj) | Reload object's attributes from DB |
session.delete(obj) | Mark object for deletion on next flush |
session.merge(obj) | Merge a detached object's state into the session |
expire_on_commit=True (the default) means that after session.commit(), all objects tracked by that session have their attributes expired. The next time you access an attribute on an expired object, SQLAlchemy fires a SELECT to reload it. If you close the session before accessing the object, this raises DetachedInstanceError. Always call session.refresh(obj) or load the data you need before closing the session.
Part 5 - Querying: SQLAlchemy 2.0 Style
SQLAlchemy has two query APIs. Always use the 2.0 style:
from sqlalchemy import select, update, delete, and_, or_, func
from sqlalchemy.orm import Session
# ❌ Legacy 1.x style - avoid in new code
users = session.query(User).filter(User.is_active == True).all()
# ✅ SQLAlchemy 2.0 style - use this
stmt = select(User).where(User.is_active == True)
users = session.execute(stmt).scalars().all()
Common Query Patterns
def get_active_users(session: Session) -> list[User]:
stmt = select(User).where(User.is_active == True)
return session.execute(stmt).scalars().all()
def get_user_by_email(session: Session, email: str) -> User | None:
stmt = select(User).where(User.email == email)
return session.execute(stmt).scalar_one_or_none()
def get_users_paginated(
session: Session, skip: int = 0, limit: int = 20
) -> list[User]:
stmt = (
select(User)
.where(User.is_active == True)
.order_by(User.created_at.desc())
.offset(skip)
.limit(limit)
)
return session.execute(stmt).scalars().all()
def count_user_orders(session: Session, user_id: int) -> int:
stmt = select(func.count(Order.id)).where(Order.user_id == user_id)
return session.execute(stmt).scalar_one()
def get_orders_with_filter(
session: Session,
user_id: int,
status: str | None = None,
min_amount: Decimal | None = None,
) -> list[Order]:
stmt = select(Order).where(Order.user_id == user_id)
if status:
stmt = stmt.where(Order.status == status)
if min_amount:
stmt = stmt.where(Order.total_amount >= min_amount)
stmt = stmt.order_by(Order.created_at.desc())
return session.execute(stmt).scalars().all()
def get_user_order_summary(session: Session) -> list[dict]:
# JOIN and aggregate - still Python, still safe
stmt = (
select(
User.id,
User.email,
func.count(Order.id).label("order_count"),
func.sum(Order.total_amount).label("total_spent"),
)
.join(Order, Order.user_id == User.id, isouter=True)
.group_by(User.id, User.email)
.order_by(func.sum(Order.total_amount).desc())
)
rows = session.execute(stmt).all()
return [
{
"user_id": row.id,
"email": row.email,
"order_count": row.order_count,
"total_spent": row.total_spent,
}
for row in rows
]
scalar_one() vs scalar_one_or_none() vs scalars().all()
| Method | When to Use | Raises if |
|---|---|---|
scalar_one() | Exactly one row expected | 0 rows or >1 row |
scalar_one_or_none() | Zero or one row expected | >1 row |
scalars().first() | First of potentially many | Never (returns None if empty) |
scalars().all() | Multiple rows expected | Never |
Part 6 - Lazy vs Eager Loading
This is the most critical concept for SQLAlchemy performance in production.
Default Lazy Loading - The N+1 Trap
class User(Base):
__tablename__ = "users"
# ...
# Default: lazy="select" - fires a SELECT when orders is first accessed
orders: Mapped[list["Order"]] = relationship(back_populates="user")
# This is the N+1 problem in code form:
def get_dashboard_data(session: Session) -> list[dict]:
users = session.execute(select(User)).scalars().all() # Query 1: SELECT * FROM users
results = []
for user in users:
results.append({
"name": user.username,
"order_count": len(user.orders), # Query 2...N+1: SELECT * FROM orders WHERE user_id = ?
})
return results
With 1000 users, this generates 1001 queries. With 10,000 users, it generates 10,001 queries. The problem scales linearly with your data.
SQLAlchemy's default lazy="select" behavior is a performance trap for any list query that touches relationships. It is designed for convenience on individual object lookups, not for loading collections of related objects. Always explicitly specify the loading strategy when writing queries that access relationships.
Loading Strategies Compared
| Strategy | How It Works | Best For |
|---|---|---|
lazy="select" (default) | Fires a new SELECT when attribute is first accessed | Single object lookups only |
lazy="joined" | Always uses a JOIN - relationship loaded with parent | Small related collections |
lazy="subquery" | Second query using a subquery with IN | Deprecated in 2.0, avoid |
lazy="raise" | Raises an error on access | Detecting accidental lazy loads |
selectinload() | Second SELECT with IN clause, applied per-query | Collections - best default |
joinedload() | JOIN in the same query, applied per-query | Single related objects |
subqueryload() | Subquery-based, applied per-query | Avoid - use selectinload |
lazy="raise" - The Safety Net
The most defensive configuration for production code is to mark all relationships as lazy="raise" at the model level:
class User(Base):
__tablename__ = "users"
# ...
orders: Mapped[list["Order"]] = relationship(
back_populates="user",
lazy="raise", # accessing user.orders without explicit loading raises an error
)
With lazy="raise", any code that accidentally accesses user.orders without explicitly eager-loading it will raise sqlalchemy.exc.InvalidRequestError immediately - at the point of the bug, not 1000 queries later. You are forced to be explicit about every relationship load.
Part 7 - Solving N+1 with Explicit Eager Loading
selectinload - The Recommended Strategy for Collections
selectinload emits a second SELECT with an IN clause to load the related objects for all parent objects at once:
from sqlalchemy.orm import selectinload
def get_dashboard_data(session: Session) -> list[dict]:
stmt = (
select(User)
.options(selectinload(User.orders)) # tells SQLAlchemy to load orders with a second SELECT
.where(User.is_active == True)
)
users = session.execute(stmt).scalars().all()
# users is already loaded, user.orders is already populated for all users
# SQL executed:
# SELECT * FROM users WHERE is_active = TRUE
# SELECT * FROM orders WHERE user_id IN (1, 2, 3, ..., 1000)
return [
{"name": user.username, "order_count": len(user.orders)}
for user in users
]
Two queries total, regardless of how many users exist. This is the correct solution to N+1.
joinedload - For Single Related Objects
joinedload adds a JOIN to the parent query and populates the relationship in one round trip. Use it for many-to-one or one-to-one relationships (loading a single related object), not for one-to-many collections:
from sqlalchemy.orm import joinedload
def get_orders_with_users(session: Session) -> list[Order]:
stmt = (
select(Order)
.options(joinedload(Order.user)) # JOIN users ON orders.user_id = users.id
.where(Order.status == "pending")
)
orders = session.execute(stmt).unique().scalars().all()
# order.user is available without extra queries
# SQL: SELECT orders.*, users.* FROM orders
# JOIN users ON orders.user_id = users.id
# WHERE orders.status = 'pending'
return orders
Note: when using joinedload on a one-to-many (which can produce duplicate rows in the result), call .unique() before .scalars() to deduplicate.
Chained Eager Loading - Multiple Levels Deep
from sqlalchemy.orm import selectinload, joinedload
def get_orders_full(session: Session, user_id: int) -> list[Order]:
stmt = (
select(Order)
.where(Order.user_id == user_id)
.options(
joinedload(Order.user), # load user with JOIN
selectinload(Order.items).joinedload(OrderItem.product), # load items, then products
)
)
orders = session.execute(stmt).unique().scalars().all()
# Three total queries:
# 1. SELECT orders.*, users.* FROM orders JOIN users ... WHERE user_id = ?
# 2. SELECT order_items.*, products.* FROM order_items
# JOIN products ON order_items.product_id = products.id
# WHERE order_items.order_id IN (...)
return orders
A useful rule: use joinedload for many-to-one (child → parent) and use selectinload for one-to-many (parent → children). joinedload on a one-to-many relationship produces a JOIN that returns one row per child, creating duplicate parent rows in the result - you must call .unique(). selectinload avoids this with a separate IN query.
Part 8 - Bulk Operations
When you need to insert or update thousands of rows, the standard ORM approach (creating Python objects, adding to session, committing) is too slow. Each object goes through the identity map, change tracking, and individual SQL statements.
Bulk Insert with Core
from sqlalchemy import insert
def bulk_insert_products(session: Session, products_data: list[dict]) -> None:
"""Insert 10,000+ products efficiently using Core INSERT."""
if not products_data:
return
stmt = insert(Product)
session.execute(stmt, products_data)
session.commit()
# Usage:
bulk_insert_products(session, [
{"name": "Widget A", "sku": "WGT-001", "price": Decimal("9.99"), "stock_quantity": 100},
{"name": "Widget B", "sku": "WGT-002", "price": Decimal("14.99"), "stock_quantity": 50},
# ... thousands more
])
# SQL: INSERT INTO products (name, sku, price, stock_quantity) VALUES (...), (...), ...
Bulk Update with Core
from sqlalchemy import update
def deactivate_inactive_users(session: Session, days_inactive: int) -> int:
"""Bulk update - much faster than loading and updating individual objects."""
cutoff = datetime.now(timezone.utc) - timedelta(days=days_inactive)
stmt = (
update(User)
.where(User.last_login_at < cutoff)
.where(User.is_active == True)
.values(is_active=False)
.returning(User.id) # PostgreSQL: get the IDs of affected rows
)
result = session.execute(stmt)
session.commit()
return result.rowcount # number of rows updated
Performance Comparison
| Approach | 10,000 inserts | Notes |
|---|---|---|
session.add() in a loop | ~30 seconds | 10,000 round trips |
session.add_all() + commit | ~5 seconds | Batched flush but still ORM overhead |
session.execute(insert(), data) | ~0.5 seconds | Single bulk INSERT |
psycopg2.copy_from() | ~0.1 seconds | Direct COPY protocol - fastest possible |
Use the ORM for CRUD operations on individual records where maintainability matters. Use SQLAlchemy Core bulk operations when inserting or updating thousands of rows. For true bulk loading (millions of rows), use PostgreSQL's COPY command via psycopg2.copy_expert() - it is 10–50x faster than even Core bulk insert.
Part 9 - SQLAlchemy with FastAPI
The standard pattern for FastAPI is a session dependency that opens a session, yields it to the handler, and closes it - committing on success or rolling back on exception.
# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from typing import Generator
import os
DATABASE_URL = os.environ["DATABASE_URL"]
engine = create_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
)
SessionLocal = sessionmaker(
bind=engine,
autocommit=False,
autoflush=True,
)
def get_db() -> Generator[Session, None, None]:
"""FastAPI dependency: provides a database session per request."""
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from typing import Annotated
from myapp.database import get_db
from myapp.models import User, Order
from myapp.schemas import UserCreate, UserResponse, UserWithOrders
router = APIRouter(prefix="/users", tags=["users"])
DbSession = Annotated[Session, Depends(get_db)]
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(user_data: UserCreate, db: DbSession) -> UserResponse:
# Check for duplicate email
existing = db.execute(
select(User).where(User.email == user_data.email)
).scalar_one_or_none()
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="A user with this email already exists",
)
user = User(
email=user_data.email,
username=user_data.username,
password_hash=hash_password(user_data.password),
)
db.add(user)
db.flush() # assigns user.id without committing
db.refresh(user) # reload to get server defaults (created_at, etc.)
return UserResponse.model_validate(user)
@router.get("/{user_id}", response_model=UserWithOrders)
def get_user_with_orders(user_id: int, db: DbSession) -> UserWithOrders:
stmt = (
select(User)
.where(User.id == user_id)
.options(selectinload(User.orders)) # eager load - no N+1
)
user = db.execute(stmt).scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserWithOrders.model_validate(user)
@router.get("/", response_model=list[UserResponse])
def list_users(
db: DbSession,
skip: int = 0,
limit: int = 20,
) -> list[UserResponse]:
stmt = (
select(User)
.where(User.is_active == True)
.order_by(User.created_at.desc())
.offset(skip)
.limit(limit)
)
users = db.execute(stmt).scalars().all()
return [UserResponse.model_validate(u) for u in users]
# schemas.py - Pydantic response models
from pydantic import BaseModel, EmailStr, ConfigDict
from datetime import datetime
from decimal import Decimal
class UserResponse(BaseModel):
model_config = ConfigDict(from_attributes=True) # allows model_validate(orm_obj)
id: int
email: EmailStr
username: str
is_active: bool
created_at: datetime
class OrderSummary(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
status: str
total_amount: Decimal
created_at: datetime
class UserWithOrders(UserResponse):
orders: list[OrderSummary] = []
Full Example - E-Commerce Data Access Layer
Complete repository layer with User, Order, and Product
# repositories/user_repository.py
from __future__ import annotations
from datetime import datetime, timezone, timedelta
from decimal import Decimal
from typing import Optional
from sqlalchemy import select, func, update
from sqlalchemy.orm import Session, selectinload, joinedload
from myapp.models import User, Order, OrderItem, Product
class UserRepository:
"""Data access layer for User entities."""
def __init__(self, session: Session) -> None:
self._session = session
def create(self, email: str, username: str, password_hash: str) -> User:
user = User(email=email, username=username, password_hash=password_hash)
self._session.add(user)
self._session.flush()
self._session.refresh(user)
return user
def get_by_id(self, user_id: int) -> Optional[User]:
return self._session.get(User, user_id)
def get_by_email(self, email: str) -> Optional[User]:
stmt = select(User).where(User.email == email)
return self._session.execute(stmt).scalar_one_or_none()
def get_with_orders(self, user_id: int) -> Optional[User]:
"""Fetch user with all orders eager-loaded - no N+1."""
stmt = (
select(User)
.where(User.id == user_id)
.options(
selectinload(User.orders).selectinload(Order.items).joinedload(OrderItem.product)
)
)
return self._session.execute(stmt).scalar_one_or_none()
def list_active(self, skip: int = 0, limit: int = 20) -> list[User]:
stmt = (
select(User)
.where(User.is_active == True)
.order_by(User.created_at.desc())
.offset(skip)
.limit(limit)
)
return self._session.execute(stmt).scalars().all()
def deactivate(self, user_id: int) -> bool:
user = self.get_by_id(user_id)
if not user:
return False
user.is_active = False
return True
def get_spending_summary(self) -> list[dict]:
"""Aggregate query - better expressed as Core SQL than ORM."""
stmt = (
select(
User.id.label("user_id"),
User.email,
func.count(Order.id).label("order_count"),
func.coalesce(func.sum(Order.total_amount), 0).label("total_spent"),
)
.join(Order, Order.user_id == User.id, isouter=True)
.where(User.is_active == True)
.group_by(User.id, User.email)
.order_by(func.sum(Order.total_amount).desc().nullslast())
)
rows = self._session.execute(stmt).all()
return [row._asdict() for row in rows]
class OrderRepository:
"""Data access layer for Order entities."""
def __init__(self, session: Session) -> None:
self._session = session
def create(self, user_id: int, items: list[dict]) -> Order:
"""
Create an order with items in a single transaction.
items: list of {"product_id": int, "quantity": int, "unit_price": Decimal}
"""
total = sum(
item["quantity"] * item["unit_price"] for item in items
)
order = Order(user_id=user_id, total_amount=total, status="pending")
self._session.add(order)
self._session.flush() # get order.id before adding items
for item_data in items:
order_item = OrderItem(
order_id=order.id,
product_id=item_data["product_id"],
quantity=item_data["quantity"],
unit_price=item_data["unit_price"],
)
self._session.add(order_item)
return order
def get_by_id_with_items(self, order_id: int) -> Optional[Order]:
stmt = (
select(Order)
.where(Order.id == order_id)
.options(
joinedload(Order.user),
selectinload(Order.items).joinedload(OrderItem.product),
)
)
return self._session.execute(stmt).unique().scalar_one_or_none()
def list_by_user(
self,
user_id: int,
status: Optional[str] = None,
) -> list[Order]:
stmt = (
select(Order)
.where(Order.user_id == user_id)
.options(selectinload(Order.items))
)
if status:
stmt = stmt.where(Order.status == status)
stmt = stmt.order_by(Order.created_at.desc())
return self._session.execute(stmt).scalars().all()
def update_status(self, order_id: int, new_status: str) -> bool:
stmt = (
update(Order)
.where(Order.id == order_id)
.values(status=new_status)
)
result = self._session.execute(stmt)
return result.rowcount > 0
Graded Practice
Level 1 - Predict the Query Count
Given this code, how many SQL queries are executed?
with SessionLocal() as session:
users = session.execute(select(User).limit(5)).scalars().all()
for user in users:
print(user.username)
for order in user.orders:
print(order.total_amount)
Assume User.orders uses the default lazy="select".
Show Answer
6 queries total.
1 query to fetch the 5 users (SELECT * FROM users LIMIT 5), then 5 more queries - one per user - to fetch their orders (SELECT * FROM orders WHERE user_id = ? for each user). This is the classic N+1 pattern.
The fix:
with SessionLocal() as session:
stmt = (
select(User)
.limit(5)
.options(selectinload(User.orders)) # 1 additional SELECT with IN (1,2,3,4,5)
)
users = session.execute(stmt).scalars().all()
for user in users:
print(user.username)
for order in user.orders: # already loaded - zero extra queries
print(order.total_amount)
Total: 2 queries, regardless of how many users you load.
Level 2 - Debug This Code
A developer writes this FastAPI endpoint:
@router.get("/{user_id}/orders")
def get_user_orders(user_id: int, db: Session = Depends(get_db)):
user = db.execute(select(User).where(User.id == user_id)).scalar_one_or_none()
if not user:
raise HTTPException(status_code=404)
return {"user": user.email, "orders": [o.total_amount for o in user.orders]}
This works in testing but intermittently fails in production with DetachedInstanceError. Identify the bug and provide a corrected version.
Show Answer
The bug: user.orders accesses the orders relationship after the session dependency has potentially closed. The get_db dependency closes the session in the finally block. If expire_on_commit=True is set and a commit happened, or if the session was closed before the response was serialized, the ORM object is detached and lazy-loading raises DetachedInstanceError.
Additionally, returning o.total_amount inside the route function - without a Pydantic response model - means FastAPI is serializing raw ORM objects and Decimal values, which is fragile.
Corrected version:
from sqlalchemy.orm import selectinload
from myapp.schemas import OrderSummary
@router.get("/{user_id}/orders", response_model=dict)
def get_user_orders(user_id: int, db: DbSession) -> dict:
# Load user AND orders in a single query group - no lazy loading needed
stmt = (
select(User)
.where(User.id == user_id)
.options(selectinload(User.orders)) # eager load while session is open
)
user = db.execute(stmt).scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# All data is already loaded - safe to access after session work
return {
"user": user.email,
"orders": [
{"id": o.id, "total_amount": str(o.total_amount), "status": o.status}
for o in user.orders # no lazy load - already populated
],
}
Level 3 - Design Challenge
You are building an analytics endpoint that returns the top 10 customers by revenue for the last 30 days, including their name, email, order count, and total revenue. The users table has 500,000 rows and orders has 5,000,000 rows.
Design the most efficient SQLAlchemy query. Consider: should you use ORM or Core? What index should exist on orders? Should you use a relationship at all?
Show Answer
Use Core, not ORM. This is a pure aggregate query. The ORM adds no value here - you are not loading objects to modify them. You are computing a summary. Use select() with aggregate functions directly.
Query:
from sqlalchemy import select, func, and_
from datetime import datetime, timezone, timedelta
from decimal import Decimal
def get_top_customers_by_revenue(
session: Session,
days: int = 30,
limit: int = 10,
) -> list[dict]:
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
stmt = (
select(
User.id.label("user_id"),
User.username,
User.email,
func.count(Order.id).label("order_count"),
func.sum(Order.total_amount).label("total_revenue"),
)
.join(Order, and_(
Order.user_id == User.id,
Order.created_at >= cutoff,
Order.status == "completed", # only count completed orders
))
.group_by(User.id, User.username, User.email)
.order_by(func.sum(Order.total_amount).desc())
.limit(limit)
)
rows = session.execute(stmt).all()
return [
{
"user_id": row.user_id,
"username": row.username,
"email": row.email,
"order_count": row.order_count,
"total_revenue": float(row.total_revenue),
}
for row in rows
]
Required indexes on orders:
-- Composite index to support the WHERE clause efficiently
CREATE INDEX ix_orders_user_status_created
ON orders (user_id, status, created_at DESC);
-- Alternatively, a partial index if "completed" is frequent:
CREATE INDEX ix_orders_completed_created
ON orders (created_at DESC, user_id)
WHERE status = 'completed';
Why not use ORM relationships here?
Using User.orders relationship would require loading all orders per user into Python memory, then aggregating in Python. With 5M orders, that is catastrophically slow. The database can aggregate 5M rows in a table scan + group by in under 1 second with the right index. Always push aggregation into the database.
Rule: ORM for CRUD on individual records. Raw SQL (via Core) for analytics, reporting, and bulk operations.
Key Takeaways
- SQLAlchemy is two layers: Core (SQL expression language) and ORM (unit of work, identity map). The ORM is built on Core - you can mix both in the same application, and understanding Core is necessary for writing efficient ORM queries.
- The Engine holds the connection pool. Create one engine per application, not per request. Configure
pool_size,max_overflow, andpool_pre_ping=Truefor production. - The Session is the unit of work. It tracks object changes, maintains the identity map, and flushes changes in a batch. Always use context managers or dependency injection to manage session lifecycle - never leave sessions open across requests.
- Default lazy loading (
lazy="select") causes N+1. For every N parent objects you load, SQLAlchemy fires N additional queries to load a relationship. This is invisible in development and catastrophic at scale. - Use
selectinloadfor one-to-many relationships. It emits a secondSELECT ... WHERE id IN (...)query, loading all related objects in one round trip. Total: 2 queries regardless of N. - Use
joinedloadfor many-to-one (child → parent) relationships. It adds a JOIN to the parent query. Remember to call.unique()whenjoinedloadis used on a one-to-many to deduplicate JOIN rows. - Consider
lazy="raise"as the default at the model level and explicit loading options per query. This makes accidental lazy loads fail fast instead of silently degrading performance. - Use SQLAlchemy 2.0 style (
select(),where(),session.execute()) exclusively. The legacysession.query()API is maintained for compatibility but receives no new features. - Use Core bulk operations for large inserts/updates.
session.execute(insert(Model), list_of_dicts)is 10–50x faster than adding individual ORM objects because it bypasses the identity map and change tracking. - ORM for CRUD, Core for analytics. When you need aggregates, GROUP BY, or complex JOINs for reporting, write the query in Core. The ORM adds overhead and complexity for queries where you never plan to modify the loaded objects.
What's Next
In Lesson 07 - Database Migrations with Alembic, you will learn how to manage schema changes safely as your application evolves. You have now defined SQLAlchemy models - but how do you add a column to a production database with 10 million rows without downtime? How do you roll back a bad migration at 2 AM? Alembic gives you version-controlled, reproducible, reversible schema changes.
| Topic | Lesson |
|---|---|
| Schema version control | Lesson 07 - Migrations with Alembic |
| Module projects | Projects - CRUD App, Transaction-Safe Service |
