Skip to main content

ORM with SQLAlchemy

Reading time: ~35 minutes | Level: Intermediate → Engineering

Before reading further, consider this puzzle:

from flask import Flask
from myapp.models import User, db

app = Flask(__name__)

@app.get("/dashboard")
def dashboard():
users = db.session.query(User).all() # 1 SQL query
results = []
for user in users: # 1000 users
results.append({
"name": user.name,
"order_count": len(user.orders), # user.orders accessed here
})
return {"users": results}

With 1000 users in the database, how many SQL queries does this endpoint execute?

The answer is 1001. The first query fetches all users. Then, because user.orders is a relationship with default lazy loading, SQLAlchemy fires a separate SELECT query for every single user the first time user.orders is accessed. 1000 users = 1000 extra queries.

This is the N+1 problem - and it is one of the most common performance killers hidden inside ORM code. It does not surface in development (5 users in your local database) and only shows up in production under real load. By the end of this lesson you will know exactly why it happens, how to detect it, and how to eliminate it entirely with SQLAlchemy's eager loading strategies.

What You Will Learn

  • SQLAlchemy's architecture: Core vs ORM, the Engine, and the connection pool
  • Declaring models with DeclarativeBase, mapped_column, and Python type annotations
  • Relationships: relationship(), ForeignKey, one-to-many, and many-to-many with an association table
  • The Session - SQLAlchemy's unit-of-work pattern and object lifecycle
  • Querying with SQLAlchemy 2.0 style: select(), where(), join(), scalars()
  • Lazy vs eager loading - the root cause of N+1 and how to avoid it
  • selectinload and joinedload - eliminating N+1 with explicit eager loading
  • Bulk insert and update operations for high-throughput scenarios
  • SQLAlchemy with FastAPI - the session dependency injection pattern

Prerequisites

  • Lesson 01 (SQL Fundamentals) - understanding raw SQL is essential for reading what SQLAlchemy generates
  • Lesson 03 (PostgreSQL) - connection strings, transactions, and driver behaviour
  • Lesson 04 (Transactions) - SQLAlchemy's session is built on top of database transactions

Part 1 - Architecture: Core vs ORM, Engine, and Connection Pool

SQLAlchemy is two libraries in one package:

Your Application Code

┌───────────────────────────────────┐
│ SQLAlchemy ORM │ ← Declarative models, Session, relationships
│ (Unit of Work, Identity Map) │
└────────────────┬──────────────────┘

┌───────────────────────────────────┐
│ SQLAlchemy Core │ ← SQL expression language, Table, Column
│ (select(), insert(), join()) │
└────────────────┬──────────────────┘

┌───────────────────────────────────┐
│ Engine + Connection Pool │ ← Manages raw DBAPI connections
└────────────────┬──────────────────┘

┌───────────────────────────────────┐
│ DBAPI Driver (psycopg2, etc.) │ ← Speaks the wire protocol to the DB
└───────────────────────────────────┘

SQLAlchemy Core is a Python-level SQL expression language. You build SQL statements as Python objects - select(users).where(users.c.id == 1) - and SQLAlchemy compiles them to dialect-specific SQL. Core is ideal for complex queries, bulk operations, and places where you want full control over the SQL generated.

SQLAlchemy ORM adds the object-relational mapping layer on top of Core. You define Python classes, map them to tables, and SQLAlchemy translates object operations (add, delete, attribute access) into Core expressions. The ORM also maintains an identity map (a session-level cache mapping primary keys to Python objects) and implements the unit of work pattern (tracking changes and flushing them together).

The Engine is the entry point to SQLAlchemy. It holds the connection pool and the dialect (the component that knows how to speak PostgreSQL, SQLite, MySQL). You create it once at application startup:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
"postgresql+psycopg2://user:password@localhost:5432/mydb",
pool_size=10, # number of persistent connections kept open
max_overflow=20, # connections beyond pool_size allowed under load
pool_timeout=30, # seconds to wait for a connection before raising
pool_pre_ping=True, # test connections before use (detects stale connections)
echo=False, # set True to log all SQL - invaluable for debugging
)
tip

Set echo=True during development to see every SQL statement SQLAlchemy generates. In production, use a logging configuration that captures SQLAlchemy's sqlalchemy.engine logger at WARNING level - this logs only slow queries and errors, not every statement.

The connection pool (QueuePool by default) keeps a set of open database connections and hands them out to sessions. When a session finishes, its connection is returned to the pool - not closed. This avoids the overhead of opening a TCP connection and authenticating for every request. For a web application handling 100 req/s, a pool of 10–20 connections is typically sufficient because most requests spend most of their time doing application logic, not waiting on the database.

Part 2 - Declarative Models

SQLAlchemy 2.0 introduced the fully type-annotated declarative API using DeclarativeBase and mapped_column. This is the current recommended style:

from datetime import datetime, timezone
from decimal import Decimal
from typing import Optional

from sqlalchemy import String, Numeric, DateTime, Text, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship


# All models inherit from this base class
class Base(DeclarativeBase):
pass


class User(Base):
__tablename__ = "users"

# Mapped[T] declares the Python type; mapped_column() provides column options
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
username: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
is_active: Mapped[bool] = mapped_column(default=True, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(), # DB-level default: NOW()
nullable=False,
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(), # DB updates this column on every UPDATE
nullable=False,
)

# Optional field - Mapped[Optional[str]] allows NULL
bio: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

# Relationships declared here - covered in Part 3
orders: Mapped[list["Order"]] = relationship(back_populates="user")

def __repr__(self) -> str:
return f"<User id={self.id} email={self.email!r}>"

Type Annotation to Column Mapping

The Mapped[T] annotation and mapped_column() cooperate:

Python annotationSQL type produced
Mapped[int]INTEGER NOT NULL
Mapped[Optional[int]]INTEGER NULL
Mapped[str]VARCHAR NOT NULL
Mapped[bool]BOOLEAN NOT NULL
Mapped[datetime]DATETIME NOT NULL
Mapped[Decimal]NUMERIC NOT NULL
Mapped[float]FLOAT NOT NULL
note

Mapped[int] implies NOT NULL. Mapped[Optional[int]] (equivalent to Mapped[int | None]) implies NULL. This is inferred directly from the Python type annotation - you do not need to repeat nullable=True or nullable=False unless you want to be explicit.

Creating Tables

# Create all tables that don't exist yet - safe to call at startup
Base.metadata.create_all(engine)

# Drop and recreate all tables - use only in tests
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)

In production, never use create_all to manage schema changes. Use Alembic migrations (covered in Lesson 07) instead. Reserve create_all for test fixtures and local development.

Part 3 - Relationships

One-to-Many: User → Orders

class Order(Base):
__tablename__ = "orders"

id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
# ForeignKey references the table name and column name (not the class)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False, index=True)
status: Mapped[str] = mapped_column(String(50), default="pending", nullable=False)
total_amount: Mapped[Decimal] = mapped_column(Numeric(10, 2), nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)

# Many side: each Order has one User
user: Mapped["User"] = relationship(back_populates="orders")

# One side: each Order has many items
items: Mapped[list["OrderItem"]] = relationship(back_populates="order", cascade="all, delete-orphan")

def __repr__(self) -> str:
return f"<Order id={self.id} user_id={self.user_id} status={self.status!r}>"

The back_populates parameter wires both sides of the relationship together. When you set order.user = some_user, SQLAlchemy automatically adds order to some_user.orders. Both sides stay in sync in memory.

Many-to-Many: Orders ↔ Products via Association Table

# Association table - no class needed for simple many-to-many
from sqlalchemy import Table, Column

order_products = Table(
"order_products",
Base.metadata,
Column("order_id", ForeignKey("orders.id"), primary_key=True),
Column("product_id", ForeignKey("products.id"), primary_key=True),
Column("quantity", default=1, nullable=False),
)


class Product(Base):
__tablename__ = "products"

id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(255), nullable=False)
sku: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
price: Mapped[Decimal] = mapped_column(Numeric(10, 2), nullable=False)
stock_quantity: Mapped[int] = mapped_column(default=0, nullable=False)

# Many-to-many: a product can appear in many orders
orders: Mapped[list["Order"]] = relationship(
secondary=order_products, back_populates="products"
)

def __repr__(self) -> str:
return f"<Product id={self.id} sku={self.sku!r}>"


class OrderItem(Base):
__tablename__ = "order_items"

id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
order_id: Mapped[int] = mapped_column(ForeignKey("orders.id"), nullable=False, index=True)
product_id: Mapped[int] = mapped_column(ForeignKey("products.id"), nullable=False)
quantity: Mapped[int] = mapped_column(nullable=False)
unit_price: Mapped[Decimal] = mapped_column(Numeric(10, 2), nullable=False)

order: Mapped["Order"] = relationship(back_populates="items")
product: Mapped["Product"] = relationship()

Model Relationship Diagram

Part 4 - The Session: Unit of Work Pattern

The Session is the primary interface between your application and the database. It implements two important patterns:

  • Unit of Work: tracks all changes made to Python objects within a session and flushes them together as a single batch of SQL statements
  • Identity Map: maintains a dictionary mapping (class, primary_key) to Python objects, ensuring that within one session there is never two different Python objects representing the same database row

Session Lifecycle

Creating and Using a Session

from sqlalchemy.orm import Session, sessionmaker

# SessionLocal is a factory for creating Session objects
# Create once at application startup
SessionLocal = sessionmaker(
bind=engine,
autocommit=False, # never commit automatically
autoflush=True, # flush before queries so the session state is consistent
expire_on_commit=True, # expire attributes after commit (forces reload on next access)
)

# Pattern 1: context manager - commits on success, rolls back on exception
def create_user(email: str, username: str, password_hash: str) -> User:
with SessionLocal() as session:
user = User(email=email, username=username, password_hash=password_hash)
session.add(user)
session.commit()
session.refresh(user) # reload from DB to get server-generated defaults
return user

# Pattern 2: explicit session management
def transfer_funds(from_account_id: int, to_account_id: int, amount: Decimal) -> None:
with SessionLocal() as session:
try:
from_acc = session.get(Account, from_account_id)
to_acc = session.get(Account, to_account_id)

if from_acc.balance < amount:
raise ValueError("Insufficient funds")

from_acc.balance -= amount
to_acc.balance += amount

session.commit() # both changes committed atomically
except Exception:
session.rollback() # undo all changes in this session
raise

Key Session Methods

MethodWhat It Does
session.add(obj)Move object from Transient/Detached to Pending/Persistent state
session.add_all([obj1, obj2])Add multiple objects at once
session.get(Model, pk)Fetch by primary key; checks identity map first
session.flush()Send pending SQL to DB within the current transaction (no commit)
session.commit()Commit the current transaction; expires all objects
session.rollback()Discard all changes since last commit
session.close()Return connection to pool; expunge all objects
session.expunge(obj)Remove object from session (becomes Detached)
session.refresh(obj)Reload object's attributes from DB
session.delete(obj)Mark object for deletion on next flush
session.merge(obj)Merge a detached object's state into the session
warning

expire_on_commit=True (the default) means that after session.commit(), all objects tracked by that session have their attributes expired. The next time you access an attribute on an expired object, SQLAlchemy fires a SELECT to reload it. If you close the session before accessing the object, this raises DetachedInstanceError. Always call session.refresh(obj) or load the data you need before closing the session.

Part 5 - Querying: SQLAlchemy 2.0 Style

SQLAlchemy has two query APIs. Always use the 2.0 style:

from sqlalchemy import select, update, delete, and_, or_, func
from sqlalchemy.orm import Session

# ❌ Legacy 1.x style - avoid in new code
users = session.query(User).filter(User.is_active == True).all()

# ✅ SQLAlchemy 2.0 style - use this
stmt = select(User).where(User.is_active == True)
users = session.execute(stmt).scalars().all()

Common Query Patterns

def get_active_users(session: Session) -> list[User]:
stmt = select(User).where(User.is_active == True)
return session.execute(stmt).scalars().all()


def get_user_by_email(session: Session, email: str) -> User | None:
stmt = select(User).where(User.email == email)
return session.execute(stmt).scalar_one_or_none()


def get_users_paginated(
session: Session, skip: int = 0, limit: int = 20
) -> list[User]:
stmt = (
select(User)
.where(User.is_active == True)
.order_by(User.created_at.desc())
.offset(skip)
.limit(limit)
)
return session.execute(stmt).scalars().all()


def count_user_orders(session: Session, user_id: int) -> int:
stmt = select(func.count(Order.id)).where(Order.user_id == user_id)
return session.execute(stmt).scalar_one()


def get_orders_with_filter(
session: Session,
user_id: int,
status: str | None = None,
min_amount: Decimal | None = None,
) -> list[Order]:
stmt = select(Order).where(Order.user_id == user_id)
if status:
stmt = stmt.where(Order.status == status)
if min_amount:
stmt = stmt.where(Order.total_amount >= min_amount)
stmt = stmt.order_by(Order.created_at.desc())
return session.execute(stmt).scalars().all()


def get_user_order_summary(session: Session) -> list[dict]:
# JOIN and aggregate - still Python, still safe
stmt = (
select(
User.id,
User.email,
func.count(Order.id).label("order_count"),
func.sum(Order.total_amount).label("total_spent"),
)
.join(Order, Order.user_id == User.id, isouter=True)
.group_by(User.id, User.email)
.order_by(func.sum(Order.total_amount).desc())
)
rows = session.execute(stmt).all()
return [
{
"user_id": row.id,
"email": row.email,
"order_count": row.order_count,
"total_spent": row.total_spent,
}
for row in rows
]

scalar_one() vs scalar_one_or_none() vs scalars().all()

MethodWhen to UseRaises if
scalar_one()Exactly one row expected0 rows or >1 row
scalar_one_or_none()Zero or one row expected>1 row
scalars().first()First of potentially manyNever (returns None if empty)
scalars().all()Multiple rows expectedNever

Part 6 - Lazy vs Eager Loading

This is the most critical concept for SQLAlchemy performance in production.

Default Lazy Loading - The N+1 Trap

class User(Base):
__tablename__ = "users"
# ...
# Default: lazy="select" - fires a SELECT when orders is first accessed
orders: Mapped[list["Order"]] = relationship(back_populates="user")
# This is the N+1 problem in code form:
def get_dashboard_data(session: Session) -> list[dict]:
users = session.execute(select(User)).scalars().all() # Query 1: SELECT * FROM users

results = []
for user in users:
results.append({
"name": user.username,
"order_count": len(user.orders), # Query 2...N+1: SELECT * FROM orders WHERE user_id = ?
})
return results

With 1000 users, this generates 1001 queries. With 10,000 users, it generates 10,001 queries. The problem scales linearly with your data.

warning

SQLAlchemy's default lazy="select" behavior is a performance trap for any list query that touches relationships. It is designed for convenience on individual object lookups, not for loading collections of related objects. Always explicitly specify the loading strategy when writing queries that access relationships.

Loading Strategies Compared

StrategyHow It WorksBest For
lazy="select" (default)Fires a new SELECT when attribute is first accessedSingle object lookups only
lazy="joined"Always uses a JOIN - relationship loaded with parentSmall related collections
lazy="subquery"Second query using a subquery with INDeprecated in 2.0, avoid
lazy="raise"Raises an error on accessDetecting accidental lazy loads
selectinload()Second SELECT with IN clause, applied per-queryCollections - best default
joinedload()JOIN in the same query, applied per-querySingle related objects
subqueryload()Subquery-based, applied per-queryAvoid - use selectinload

lazy="raise" - The Safety Net

The most defensive configuration for production code is to mark all relationships as lazy="raise" at the model level:

class User(Base):
__tablename__ = "users"
# ...
orders: Mapped[list["Order"]] = relationship(
back_populates="user",
lazy="raise", # accessing user.orders without explicit loading raises an error
)

With lazy="raise", any code that accidentally accesses user.orders without explicitly eager-loading it will raise sqlalchemy.exc.InvalidRequestError immediately - at the point of the bug, not 1000 queries later. You are forced to be explicit about every relationship load.

Part 7 - Solving N+1 with Explicit Eager Loading

selectinload emits a second SELECT with an IN clause to load the related objects for all parent objects at once:

from sqlalchemy.orm import selectinload

def get_dashboard_data(session: Session) -> list[dict]:
stmt = (
select(User)
.options(selectinload(User.orders)) # tells SQLAlchemy to load orders with a second SELECT
.where(User.is_active == True)
)
users = session.execute(stmt).scalars().all()
# users is already loaded, user.orders is already populated for all users

# SQL executed:
# SELECT * FROM users WHERE is_active = TRUE
# SELECT * FROM orders WHERE user_id IN (1, 2, 3, ..., 1000)

return [
{"name": user.username, "order_count": len(user.orders)}
for user in users
]

Two queries total, regardless of how many users exist. This is the correct solution to N+1.

joinedload adds a JOIN to the parent query and populates the relationship in one round trip. Use it for many-to-one or one-to-one relationships (loading a single related object), not for one-to-many collections:

from sqlalchemy.orm import joinedload

def get_orders_with_users(session: Session) -> list[Order]:
stmt = (
select(Order)
.options(joinedload(Order.user)) # JOIN users ON orders.user_id = users.id
.where(Order.status == "pending")
)
orders = session.execute(stmt).unique().scalars().all()
# order.user is available without extra queries
# SQL: SELECT orders.*, users.* FROM orders
# JOIN users ON orders.user_id = users.id
# WHERE orders.status = 'pending'
return orders

Note: when using joinedload on a one-to-many (which can produce duplicate rows in the result), call .unique() before .scalars() to deduplicate.

Chained Eager Loading - Multiple Levels Deep

from sqlalchemy.orm import selectinload, joinedload

def get_orders_full(session: Session, user_id: int) -> list[Order]:
stmt = (
select(Order)
.where(Order.user_id == user_id)
.options(
joinedload(Order.user), # load user with JOIN
selectinload(Order.items).joinedload(OrderItem.product), # load items, then products
)
)
orders = session.execute(stmt).unique().scalars().all()
# Three total queries:
# 1. SELECT orders.*, users.* FROM orders JOIN users ... WHERE user_id = ?
# 2. SELECT order_items.*, products.* FROM order_items
# JOIN products ON order_items.product_id = products.id
# WHERE order_items.order_id IN (...)
return orders
tip

A useful rule: use joinedload for many-to-one (child → parent) and use selectinload for one-to-many (parent → children). joinedload on a one-to-many relationship produces a JOIN that returns one row per child, creating duplicate parent rows in the result - you must call .unique(). selectinload avoids this with a separate IN query.

Part 8 - Bulk Operations

When you need to insert or update thousands of rows, the standard ORM approach (creating Python objects, adding to session, committing) is too slow. Each object goes through the identity map, change tracking, and individual SQL statements.

Bulk Insert with Core

from sqlalchemy import insert

def bulk_insert_products(session: Session, products_data: list[dict]) -> None:
"""Insert 10,000+ products efficiently using Core INSERT."""
if not products_data:
return

stmt = insert(Product)
session.execute(stmt, products_data)
session.commit()

# Usage:
bulk_insert_products(session, [
{"name": "Widget A", "sku": "WGT-001", "price": Decimal("9.99"), "stock_quantity": 100},
{"name": "Widget B", "sku": "WGT-002", "price": Decimal("14.99"), "stock_quantity": 50},
# ... thousands more
])
# SQL: INSERT INTO products (name, sku, price, stock_quantity) VALUES (...), (...), ...

Bulk Update with Core

from sqlalchemy import update

def deactivate_inactive_users(session: Session, days_inactive: int) -> int:
"""Bulk update - much faster than loading and updating individual objects."""
cutoff = datetime.now(timezone.utc) - timedelta(days=days_inactive)
stmt = (
update(User)
.where(User.last_login_at < cutoff)
.where(User.is_active == True)
.values(is_active=False)
.returning(User.id) # PostgreSQL: get the IDs of affected rows
)
result = session.execute(stmt)
session.commit()
return result.rowcount # number of rows updated

Performance Comparison

Approach10,000 insertsNotes
session.add() in a loop~30 seconds10,000 round trips
session.add_all() + commit~5 secondsBatched flush but still ORM overhead
session.execute(insert(), data)~0.5 secondsSingle bulk INSERT
psycopg2.copy_from()~0.1 secondsDirect COPY protocol - fastest possible
tip

Use the ORM for CRUD operations on individual records where maintainability matters. Use SQLAlchemy Core bulk operations when inserting or updating thousands of rows. For true bulk loading (millions of rows), use PostgreSQL's COPY command via psycopg2.copy_expert() - it is 10–50x faster than even Core bulk insert.

Part 9 - SQLAlchemy with FastAPI

The standard pattern for FastAPI is a session dependency that opens a session, yields it to the handler, and closes it - committing on success or rolling back on exception.

# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from typing import Generator
import os

DATABASE_URL = os.environ["DATABASE_URL"]

engine = create_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
)

SessionLocal = sessionmaker(
bind=engine,
autocommit=False,
autoflush=True,
)


def get_db() -> Generator[Session, None, None]:
"""FastAPI dependency: provides a database session per request."""
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from typing import Annotated

from myapp.database import get_db
from myapp.models import User, Order
from myapp.schemas import UserCreate, UserResponse, UserWithOrders

router = APIRouter(prefix="/users", tags=["users"])

DbSession = Annotated[Session, Depends(get_db)]


@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(user_data: UserCreate, db: DbSession) -> UserResponse:
# Check for duplicate email
existing = db.execute(
select(User).where(User.email == user_data.email)
).scalar_one_or_none()
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="A user with this email already exists",
)

user = User(
email=user_data.email,
username=user_data.username,
password_hash=hash_password(user_data.password),
)
db.add(user)
db.flush() # assigns user.id without committing
db.refresh(user) # reload to get server defaults (created_at, etc.)
return UserResponse.model_validate(user)


@router.get("/{user_id}", response_model=UserWithOrders)
def get_user_with_orders(user_id: int, db: DbSession) -> UserWithOrders:
stmt = (
select(User)
.where(User.id == user_id)
.options(selectinload(User.orders)) # eager load - no N+1
)
user = db.execute(stmt).scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserWithOrders.model_validate(user)


@router.get("/", response_model=list[UserResponse])
def list_users(
db: DbSession,
skip: int = 0,
limit: int = 20,
) -> list[UserResponse]:
stmt = (
select(User)
.where(User.is_active == True)
.order_by(User.created_at.desc())
.offset(skip)
.limit(limit)
)
users = db.execute(stmt).scalars().all()
return [UserResponse.model_validate(u) for u in users]
# schemas.py - Pydantic response models
from pydantic import BaseModel, EmailStr, ConfigDict
from datetime import datetime
from decimal import Decimal


class UserResponse(BaseModel):
model_config = ConfigDict(from_attributes=True) # allows model_validate(orm_obj)

id: int
email: EmailStr
username: str
is_active: bool
created_at: datetime


class OrderSummary(BaseModel):
model_config = ConfigDict(from_attributes=True)

id: int
status: str
total_amount: Decimal
created_at: datetime


class UserWithOrders(UserResponse):
orders: list[OrderSummary] = []

Full Example - E-Commerce Data Access Layer

Complete repository layer with User, Order, and Product
# repositories/user_repository.py
from __future__ import annotations

from datetime import datetime, timezone, timedelta
from decimal import Decimal
from typing import Optional

from sqlalchemy import select, func, update
from sqlalchemy.orm import Session, selectinload, joinedload

from myapp.models import User, Order, OrderItem, Product


class UserRepository:
"""Data access layer for User entities."""

def __init__(self, session: Session) -> None:
self._session = session

def create(self, email: str, username: str, password_hash: str) -> User:
user = User(email=email, username=username, password_hash=password_hash)
self._session.add(user)
self._session.flush()
self._session.refresh(user)
return user

def get_by_id(self, user_id: int) -> Optional[User]:
return self._session.get(User, user_id)

def get_by_email(self, email: str) -> Optional[User]:
stmt = select(User).where(User.email == email)
return self._session.execute(stmt).scalar_one_or_none()

def get_with_orders(self, user_id: int) -> Optional[User]:
"""Fetch user with all orders eager-loaded - no N+1."""
stmt = (
select(User)
.where(User.id == user_id)
.options(
selectinload(User.orders).selectinload(Order.items).joinedload(OrderItem.product)
)
)
return self._session.execute(stmt).scalar_one_or_none()

def list_active(self, skip: int = 0, limit: int = 20) -> list[User]:
stmt = (
select(User)
.where(User.is_active == True)
.order_by(User.created_at.desc())
.offset(skip)
.limit(limit)
)
return self._session.execute(stmt).scalars().all()

def deactivate(self, user_id: int) -> bool:
user = self.get_by_id(user_id)
if not user:
return False
user.is_active = False
return True

def get_spending_summary(self) -> list[dict]:
"""Aggregate query - better expressed as Core SQL than ORM."""
stmt = (
select(
User.id.label("user_id"),
User.email,
func.count(Order.id).label("order_count"),
func.coalesce(func.sum(Order.total_amount), 0).label("total_spent"),
)
.join(Order, Order.user_id == User.id, isouter=True)
.where(User.is_active == True)
.group_by(User.id, User.email)
.order_by(func.sum(Order.total_amount).desc().nullslast())
)
rows = self._session.execute(stmt).all()
return [row._asdict() for row in rows]


class OrderRepository:
"""Data access layer for Order entities."""

def __init__(self, session: Session) -> None:
self._session = session

def create(self, user_id: int, items: list[dict]) -> Order:
"""
Create an order with items in a single transaction.
items: list of {"product_id": int, "quantity": int, "unit_price": Decimal}
"""
total = sum(
item["quantity"] * item["unit_price"] for item in items
)
order = Order(user_id=user_id, total_amount=total, status="pending")
self._session.add(order)
self._session.flush() # get order.id before adding items

for item_data in items:
order_item = OrderItem(
order_id=order.id,
product_id=item_data["product_id"],
quantity=item_data["quantity"],
unit_price=item_data["unit_price"],
)
self._session.add(order_item)

return order

def get_by_id_with_items(self, order_id: int) -> Optional[Order]:
stmt = (
select(Order)
.where(Order.id == order_id)
.options(
joinedload(Order.user),
selectinload(Order.items).joinedload(OrderItem.product),
)
)
return self._session.execute(stmt).unique().scalar_one_or_none()

def list_by_user(
self,
user_id: int,
status: Optional[str] = None,
) -> list[Order]:
stmt = (
select(Order)
.where(Order.user_id == user_id)
.options(selectinload(Order.items))
)
if status:
stmt = stmt.where(Order.status == status)
stmt = stmt.order_by(Order.created_at.desc())
return self._session.execute(stmt).scalars().all()

def update_status(self, order_id: int, new_status: str) -> bool:
stmt = (
update(Order)
.where(Order.id == order_id)
.values(status=new_status)
)
result = self._session.execute(stmt)
return result.rowcount > 0

Graded Practice

Level 1 - Predict the Query Count

Given this code, how many SQL queries are executed?

with SessionLocal() as session:
users = session.execute(select(User).limit(5)).scalars().all()
for user in users:
print(user.username)
for order in user.orders:
print(order.total_amount)

Assume User.orders uses the default lazy="select".

Show Answer

6 queries total.

1 query to fetch the 5 users (SELECT * FROM users LIMIT 5), then 5 more queries - one per user - to fetch their orders (SELECT * FROM orders WHERE user_id = ? for each user). This is the classic N+1 pattern.

The fix:

with SessionLocal() as session:
stmt = (
select(User)
.limit(5)
.options(selectinload(User.orders)) # 1 additional SELECT with IN (1,2,3,4,5)
)
users = session.execute(stmt).scalars().all()
for user in users:
print(user.username)
for order in user.orders: # already loaded - zero extra queries
print(order.total_amount)

Total: 2 queries, regardless of how many users you load.

Level 2 - Debug This Code

A developer writes this FastAPI endpoint:

@router.get("/{user_id}/orders")
def get_user_orders(user_id: int, db: Session = Depends(get_db)):
user = db.execute(select(User).where(User.id == user_id)).scalar_one_or_none()
if not user:
raise HTTPException(status_code=404)
return {"user": user.email, "orders": [o.total_amount for o in user.orders]}

This works in testing but intermittently fails in production with DetachedInstanceError. Identify the bug and provide a corrected version.

Show Answer

The bug: user.orders accesses the orders relationship after the session dependency has potentially closed. The get_db dependency closes the session in the finally block. If expire_on_commit=True is set and a commit happened, or if the session was closed before the response was serialized, the ORM object is detached and lazy-loading raises DetachedInstanceError.

Additionally, returning o.total_amount inside the route function - without a Pydantic response model - means FastAPI is serializing raw ORM objects and Decimal values, which is fragile.

Corrected version:

from sqlalchemy.orm import selectinload
from myapp.schemas import OrderSummary

@router.get("/{user_id}/orders", response_model=dict)
def get_user_orders(user_id: int, db: DbSession) -> dict:
# Load user AND orders in a single query group - no lazy loading needed
stmt = (
select(User)
.where(User.id == user_id)
.options(selectinload(User.orders)) # eager load while session is open
)
user = db.execute(stmt).scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")

# All data is already loaded - safe to access after session work
return {
"user": user.email,
"orders": [
{"id": o.id, "total_amount": str(o.total_amount), "status": o.status}
for o in user.orders # no lazy load - already populated
],
}

Level 3 - Design Challenge

You are building an analytics endpoint that returns the top 10 customers by revenue for the last 30 days, including their name, email, order count, and total revenue. The users table has 500,000 rows and orders has 5,000,000 rows.

Design the most efficient SQLAlchemy query. Consider: should you use ORM or Core? What index should exist on orders? Should you use a relationship at all?

Show Answer

Use Core, not ORM. This is a pure aggregate query. The ORM adds no value here - you are not loading objects to modify them. You are computing a summary. Use select() with aggregate functions directly.

Query:

from sqlalchemy import select, func, and_
from datetime import datetime, timezone, timedelta
from decimal import Decimal

def get_top_customers_by_revenue(
session: Session,
days: int = 30,
limit: int = 10,
) -> list[dict]:
cutoff = datetime.now(timezone.utc) - timedelta(days=days)

stmt = (
select(
User.id.label("user_id"),
User.username,
User.email,
func.count(Order.id).label("order_count"),
func.sum(Order.total_amount).label("total_revenue"),
)
.join(Order, and_(
Order.user_id == User.id,
Order.created_at >= cutoff,
Order.status == "completed", # only count completed orders
))
.group_by(User.id, User.username, User.email)
.order_by(func.sum(Order.total_amount).desc())
.limit(limit)
)

rows = session.execute(stmt).all()
return [
{
"user_id": row.user_id,
"username": row.username,
"email": row.email,
"order_count": row.order_count,
"total_revenue": float(row.total_revenue),
}
for row in rows
]

Required indexes on orders:

-- Composite index to support the WHERE clause efficiently
CREATE INDEX ix_orders_user_status_created
ON orders (user_id, status, created_at DESC);

-- Alternatively, a partial index if "completed" is frequent:
CREATE INDEX ix_orders_completed_created
ON orders (created_at DESC, user_id)
WHERE status = 'completed';

Why not use ORM relationships here?

Using User.orders relationship would require loading all orders per user into Python memory, then aggregating in Python. With 5M orders, that is catastrophically slow. The database can aggregate 5M rows in a table scan + group by in under 1 second with the right index. Always push aggregation into the database.

Rule: ORM for CRUD on individual records. Raw SQL (via Core) for analytics, reporting, and bulk operations.

Key Takeaways

  • SQLAlchemy is two layers: Core (SQL expression language) and ORM (unit of work, identity map). The ORM is built on Core - you can mix both in the same application, and understanding Core is necessary for writing efficient ORM queries.
  • The Engine holds the connection pool. Create one engine per application, not per request. Configure pool_size, max_overflow, and pool_pre_ping=True for production.
  • The Session is the unit of work. It tracks object changes, maintains the identity map, and flushes changes in a batch. Always use context managers or dependency injection to manage session lifecycle - never leave sessions open across requests.
  • Default lazy loading (lazy="select") causes N+1. For every N parent objects you load, SQLAlchemy fires N additional queries to load a relationship. This is invisible in development and catastrophic at scale.
  • Use selectinload for one-to-many relationships. It emits a second SELECT ... WHERE id IN (...) query, loading all related objects in one round trip. Total: 2 queries regardless of N.
  • Use joinedload for many-to-one (child → parent) relationships. It adds a JOIN to the parent query. Remember to call .unique() when joinedload is used on a one-to-many to deduplicate JOIN rows.
  • Consider lazy="raise" as the default at the model level and explicit loading options per query. This makes accidental lazy loads fail fast instead of silently degrading performance.
  • Use SQLAlchemy 2.0 style (select(), where(), session.execute()) exclusively. The legacy session.query() API is maintained for compatibility but receives no new features.
  • Use Core bulk operations for large inserts/updates. session.execute(insert(Model), list_of_dicts) is 10–50x faster than adding individual ORM objects because it bypasses the identity map and change tracking.
  • ORM for CRUD, Core for analytics. When you need aggregates, GROUP BY, or complex JOINs for reporting, write the query in Core. The ORM adds overhead and complexity for queries where you never plan to modify the loaded objects.

What's Next

In Lesson 07 - Database Migrations with Alembic, you will learn how to manage schema changes safely as your application evolves. You have now defined SQLAlchemy models - but how do you add a column to a production database with 10 million rows without downtime? How do you roll back a bad migration at 2 AM? Alembic gives you version-controlled, reproducible, reversible schema changes.

TopicLesson
Schema version controlLesson 07 - Migrations with Alembic
Module projectsProjects - CRUD App, Transaction-Safe Service
© 2026 EngineersOfAI. All rights reserved.