Project 01 - Task Management REST API
Estimated time: 5–7 hours | Level: Intermediate
Before reading the requirements, answer this: your API returns {"detail": "Not found"} for every missing resource. A client developer asks: "Is this a missing task, a missing user, or a missing workspace?" Your response is the same in all three cases. What RFC fixes this?
RFC 7807 - Problem Details for HTTP APIs. It defines a standard JSON error body with type, title, status, detail, and instance fields, so clients can programmatically distinguish error categories. Every 4xx/5xx in this project returns an RFC 7807 response. This project is about building the API right, not just building an API.
Learning Objectives
By completing this project you will have practiced:
- Designing REST endpoints with proper HTTP semantics (method, status code, headers)
- Writing Pydantic v2 models for create, update, and response - with correct field constraints
- Managing SQLAlchemy sessions with FastAPI dependency injection
- Implementing pagination, filtering, and sorting correctly
- Writing middleware for request ID injection and request timing
- Returning RFC 7807 error responses from FastAPI exception handlers
- Writing a complete pytest test suite using
TestClientwith fixture-based DB setup - Containerizing a FastAPI app with a minimal
Dockerfile
Project Structure
task-api/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app, middleware, exception handlers, lifespan
│ ├── config.py # Settings (database URL, etc.)
│ ├── database.py # SQLAlchemy engine, session factory, Base
│ ├── models/
│ │ ├── __init__.py
│ │ └── task.py # SQLAlchemy ORM model
│ ├── schemas/
│ │ ├── __init__.py
│ │ └── task.py # Pydantic request/response models
│ ├── routers/
│ │ ├── __init__.py
│ │ └── tasks.py # All /tasks endpoints
│ └── dependencies.py # get_db dependency
├── tests/
│ ├── __init__.py
│ ├── conftest.py # Fixtures: test client, test DB, sample tasks
│ └── test_tasks.py # Full test suite
├── Dockerfile
├── docker-compose.yml
├── pyproject.toml
└── README.md
Requirements
R1 - Full CRUD for Tasks
Implement these endpoints with correct HTTP semantics:
| Method | Path | Description | Success Status |
|---|---|---|---|
POST | /tasks | Create a task | 201 Created |
GET | /tasks | List tasks (paginated, filtered) | 200 OK |
GET | /tasks/{task_id} | Get one task | 200 OK |
PATCH | /tasks/{task_id} | Partial update | 200 OK |
DELETE | /tasks/{task_id} | Delete a task | 204 No Content |
R2 - SQLite Backend with SQLAlchemy
Use SQLite for simplicity - in-memory for tests, file-based for development. The schema must persist between restarts in development.
R3 - Pagination, Filtering, Sorting
GET /tasks must support:
limit(default 20, max 100)offset(default 0)statusfilter (pending/in_progress/done/cancelled)priorityfilter (1–5)sort_by(created_at, priority, title) withsort_order(asc/desc)
R4 - Request ID and Timing Middleware
Every response must include:
X-Request-ID: a UUID generated per request (or echoed from client if provided)X-Process-Time: request processing time in milliseconds
R5 - RFC 7807 Error Responses
All 4xx and 5xx responses must use this JSON body:
{
"type": "https://example.com/errors/not-found",
"title": "Resource Not Found",
"status": 404,
"detail": "Task with id '550e8400...' was not found.",
"instance": "/tasks/550e8400..."
}
R6 - Full Pytest Test Suite
Tests must cover:
- Happy path for all 5 endpoints
- Validation errors (invalid priority, missing title, etc.)
- 404 for non-existent tasks
- Pagination boundary cases (offset > total, limit > max)
- Filtering combinations
- Middleware headers present on all responses
R7 - OpenAPI Documentation
Every endpoint must have:
summaryanddescriptiontags=["Tasks"]response_modelpointing to the correct Pydantic modelresponses={404: ..., 422: ...}declared in the route decorator
R8 - Dockerfile and docker-compose.yml
A working Dockerfile and docker-compose.yml that start the API locally with docker compose up.
Complete File Contents
pyproject.toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "task-api"
version = "0.1.0"
description = "Production-quality Task Management REST API"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.111.0",
"uvicorn[standard]>=0.29.0",
"sqlalchemy>=2.0.0",
"pydantic[email]>=2.7.0",
"pydantic-settings>=2.0.0",
"orjson>=3.10.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-cov>=5.0.0",
"httpx>=0.27.0",
"anyio>=4.0.0",
]
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = ["--tb=short", "-q", "--strict-markers"]
markers = [
"slow: marks tests as slow",
"integration: marks tests as integration tests",
]
[tool.coverage.run]
source = ["app"]
branch = true
[tool.coverage.report]
fail_under = 80
show_missing = true
app/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
app_name: str = "Task Management API"
app_version: str = "0.1.0"
database_url: str = "sqlite:///./tasks.db"
debug: bool = False
class Config:
env_file = ".env"
settings = Settings()
app/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
from app.config import settings
engine = create_engine(
settings.database_url,
# check_same_thread=False: required for SQLite + FastAPI thread pool
connect_args={"check_same_thread": False} if "sqlite" in settings.database_url else {},
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class Base(DeclarativeBase):
pass
def create_all_tables() -> None:
"""Create all tables - called at application startup via lifespan."""
Base.metadata.create_all(bind=engine)
def drop_all_tables() -> None:
"""Drop all tables - used in tests to reset state between test runs."""
Base.metadata.drop_all(bind=engine)
app/dependencies.py
from collections.abc import Generator
from sqlalchemy.orm import Session
from app.database import SessionLocal
def get_db() -> Generator[Session, None, None]:
"""
FastAPI dependency: yields a SQLAlchemy session per request.
The session is always closed in the finally block - even if an exception occurs.
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
app/models/task.py
from datetime import datetime, timezone
from sqlalchemy import Column, Integer, String, DateTime, Enum as SQLEnum
from sqlalchemy.dialects.sqlite import TEXT
from app.database import Base
import enum
class TaskStatus(str, enum.Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
DONE = "done"
CANCELLED = "cancelled"
class Task(Base):
__tablename__ = "tasks"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
title = Column(String(200), nullable=False, index=True)
description = Column(TEXT, nullable=True)
status = Column(
SQLEnum(TaskStatus, name="task_status"),
nullable=False,
default=TaskStatus.PENDING,
index=True,
)
priority = Column(Integer, nullable=False, default=2, index=True)
assignee = Column(String(100), nullable=True)
created_at = Column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(tz=timezone.utc),
)
updated_at = Column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(tz=timezone.utc),
onupdate=lambda: datetime.now(tz=timezone.utc),
)
def __repr__(self) -> str:
return f"<Task id={self.id} title={self.title!r} status={self.status}>"
app/schemas/task.py
from pydantic import BaseModel, Field, ConfigDict
from typing import Optional
from datetime import datetime
import enum
class TaskStatus(str, enum.Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
DONE = "done"
CANCELLED = "cancelled"
class TaskCreate(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
title: str = Field(
min_length=1,
max_length=200,
description="Task title",
examples=["Write unit tests", "Review PR #42"],
)
description: Optional[str] = Field(
default=None,
max_length=2000,
description="Optional detailed description",
)
priority: int = Field(
default=2,
ge=1,
le=5,
description="Priority 1 (lowest) to 5 (highest)",
)
assignee: Optional[str] = Field(
default=None,
max_length=100,
description="Name or identifier of the assignee",
)
class TaskUpdate(BaseModel):
"""All fields optional - PATCH semantics: only provided fields are updated."""
model_config = ConfigDict(str_strip_whitespace=True)
title: Optional[str] = Field(default=None, min_length=1, max_length=200)
description: Optional[str] = Field(default=None, max_length=2000)
status: Optional[TaskStatus] = None
priority: Optional[int] = Field(default=None, ge=1, le=5)
assignee: Optional[str] = Field(default=None, max_length=100)
class TaskResponse(BaseModel):
"""Response schema - reads SQLAlchemy ORM attributes via from_attributes."""
model_config = ConfigDict(from_attributes=True, use_enum_values=True)
id: int
title: str
description: Optional[str]
status: TaskStatus
priority: int
assignee: Optional[str]
created_at: datetime
updated_at: datetime
class TaskListResponse(BaseModel):
"""Paginated list response wrapper."""
items: list[TaskResponse]
total: int
limit: int
offset: int
class ProblemDetail(BaseModel):
"""RFC 7807 Problem Details response body."""
type: str
title: str
status: int
detail: str
instance: Optional[str] = None
app/routers/tasks.py
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from sqlalchemy import select, func, asc, desc
from typing import Optional
from app.dependencies import get_db
from app.models.task import Task as TaskModel, TaskStatus as TaskStatusModel
from app.schemas.task import (
TaskCreate, TaskUpdate, TaskResponse, TaskListResponse, ProblemDetail,
)
router = APIRouter(prefix="/tasks", tags=["Tasks"])
TASK_NOT_FOUND_RESPONSES = {
404: {"model": ProblemDetail, "description": "Task not found"},
}
def get_task_or_404(task_id: int, db: Session) -> TaskModel:
"""Fetch a task by ID or raise a pre-structured RFC 7807 404 HTTPException."""
task = db.get(TaskModel, task_id)
if task is None:
raise HTTPException(
status_code=404,
detail={
"type": "https://example.com/errors/not-found",
"title": "Task Not Found",
"status": 404,
"detail": f"Task with id '{task_id}' was not found.",
"instance": f"/tasks/{task_id}",
},
)
return task
@router.post(
"/",
response_model=TaskResponse,
status_code=status.HTTP_201_CREATED,
summary="Create a new task",
description="Create a task with a title, optional description, priority (1-5), and optional assignee.",
responses={422: {"description": "Validation error - invalid field values"}},
)
def create_task(body: TaskCreate, db: Session = Depends(get_db)) -> TaskResponse:
task = TaskModel(
title=body.title,
description=body.description,
priority=body.priority,
assignee=body.assignee,
)
db.add(task)
db.commit()
db.refresh(task)
return TaskResponse.model_validate(task)
@router.get(
"/",
response_model=TaskListResponse,
summary="List tasks",
description=(
"Returns a paginated list of tasks. "
"Supports filtering by status and priority, and sorting by created_at, priority, or title."
),
)
def list_tasks(
limit: int = Query(default=20, ge=1, le=100, description="Number of tasks to return"),
offset: int = Query(default=0, ge=0, description="Number of tasks to skip"),
status: Optional[str] = Query(default=None, description="Filter by status"),
priority: Optional[int] = Query(default=None, ge=1, le=5, description="Filter by priority"),
sort_by: str = Query(default="created_at", pattern=r"^(created_at|priority|title)$"),
sort_order: str = Query(default="desc", pattern=r"^(asc|desc)$"),
db: Session = Depends(get_db),
) -> TaskListResponse:
stmt = select(TaskModel)
if status is not None:
try:
status_enum = TaskStatusModel(status)
except ValueError:
raise HTTPException(
status_code=422,
detail={
"type": "https://example.com/errors/validation",
"title": "Invalid Status",
"status": 422,
"detail": (
f"'{status}' is not a valid status. "
"Choose from: pending, in_progress, done, cancelled"
),
"instance": "/tasks",
},
)
stmt = stmt.where(TaskModel.status == status_enum)
if priority is not None:
stmt = stmt.where(TaskModel.priority == priority)
# Count total matching records before applying pagination
count_stmt = select(func.count()).select_from(stmt.subquery())
total = db.scalar(count_stmt)
sort_column = getattr(TaskModel, sort_by)
order_fn = desc if sort_order == "desc" else asc
stmt = stmt.order_by(order_fn(sort_column)).offset(offset).limit(limit)
tasks = db.scalars(stmt).all()
return TaskListResponse(
items=[TaskResponse.model_validate(t) for t in tasks],
total=total or 0,
limit=limit,
offset=offset,
)
@router.get(
"/{task_id}",
response_model=TaskResponse,
summary="Get a task by ID",
responses=TASK_NOT_FOUND_RESPONSES,
)
def get_task(task_id: int, db: Session = Depends(get_db)) -> TaskResponse:
task = get_task_or_404(task_id, db)
return TaskResponse.model_validate(task)
@router.patch(
"/{task_id}",
response_model=TaskResponse,
summary="Partially update a task",
description="Updates only the fields provided in the request body. Omitted fields are not changed.",
responses={**TASK_NOT_FOUND_RESPONSES, 422: {"description": "Validation error"}},
)
def update_task(
task_id: int, body: TaskUpdate, db: Session = Depends(get_db),
) -> TaskResponse:
task = get_task_or_404(task_id, db)
# exclude_unset=True: only apply fields the client explicitly sent
# Without this, every Optional field would be set to None, destroying data
update_data = body.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(task, field, value)
db.commit()
db.refresh(task)
return TaskResponse.model_validate(task)
@router.delete(
"/{task_id}",
status_code=status.HTTP_204_NO_CONTENT,
summary="Delete a task",
description="Permanently deletes a task. Returns 204 No Content on success.",
responses=TASK_NOT_FOUND_RESPONSES,
)
def delete_task(task_id: int, db: Session = Depends(get_db)) -> None:
task = get_task_or_404(task_id, db)
db.delete(task)
db.commit()
# Return None: FastAPI converts this to a 204 response with no body
app/main.py
import time
import uuid
from contextlib import asynccontextmanager
from typing import AsyncIterator
from fastapi import FastAPI, Request, Response, HTTPException
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from app.config import settings
from app.database import create_all_tables
from app.routers import tasks
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
"""
FastAPI lifespan: replaces the deprecated @app.on_event("startup").
Code before yield runs at startup; code after yield runs at shutdown.
"""
create_all_tables()
print(f"[startup] {settings.app_name} v{settings.app_version} ready")
yield
print("[shutdown] Graceful shutdown complete")
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
description=(
"A production-quality Task Management REST API. "
"All error responses follow RFC 7807 Problem Details format."
),
lifespan=lifespan,
)
# ── Middleware: Request ID ─────────────────────────────────────────────────────
@app.middleware("http")
async def request_id_middleware(request: Request, call_next) -> Response:
"""
Inject X-Request-ID into every request and echo it in the response.
Clients can provide their own X-Request-ID for distributed tracing correlation.
"""
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
request.state.request_id = request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
# ── Middleware: Timing ─────────────────────────────────────────────────────────
@app.middleware("http")
async def timing_middleware(request: Request, call_next) -> Response:
"""Report total request processing time in X-Process-Time (milliseconds, 2dp)."""
start = time.perf_counter()
response = await call_next(request)
elapsed_ms = (time.perf_counter() - start) * 1000
response.headers["X-Process-Time"] = f"{elapsed_ms:.2f}ms"
return response
# ── Exception Handlers: RFC 7807 ──────────────────────────────────────────────
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(
request: Request, exc: RequestValidationError
) -> JSONResponse:
"""
Convert FastAPI's default 422 response to RFC 7807 Problem Details format.
Collects all field errors into a single readable detail string.
"""
errors = exc.errors()
detail_parts = [
f"{'→'.join(str(x) for x in e['loc'] if x != 'body')}: {e['msg']}"
for e in errors
]
return JSONResponse(
status_code=422,
content={
"type": "https://example.com/errors/validation",
"title": "Validation Error",
"status": 422,
"detail": "; ".join(detail_parts),
"instance": str(request.url.path),
},
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse:
"""
Forward pre-structured RFC 7807 dicts from router HTTPException raises directly.
Wrap plain string details in an RFC 7807 envelope.
"""
if isinstance(exc.detail, dict) and "type" in exc.detail:
return JSONResponse(status_code=exc.status_code, content=exc.detail)
return JSONResponse(
status_code=exc.status_code,
content={
"type": f"https://example.com/errors/http-{exc.status_code}",
"title": "HTTP Error",
"status": exc.status_code,
"detail": str(exc.detail),
"instance": str(request.url.path),
},
)
@app.exception_handler(500)
async def server_error_handler(request: Request, exc) -> JSONResponse:
"""Never expose internal error details to API clients."""
return JSONResponse(
status_code=500,
content={
"type": "https://example.com/errors/internal-error",
"title": "Internal Server Error",
"status": 500,
"detail": "An unexpected error occurred. Please try again or contact support.",
"instance": str(request.url.path),
},
)
# ── Health check ───────────────────────────────────────────────────────────────
@app.get("/health", tags=["Health"], summary="Service health check")
async def health_check():
"""Returns 200 if the service is running. Used by Docker healthcheck and load balancers."""
return {"status": "ok", "version": settings.app_version}
# ── Mount routers ──────────────────────────────────────────────────────────────
app.include_router(tasks.router)
tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.database import Base
from app.dependencies import get_db
from app.main import app
# In-memory SQLite: fast, no file cleanup needed, isolated per session
TEST_DATABASE_URL = "sqlite:///:memory:"
test_engine = create_engine(
TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=test_engine)
def override_get_db():
"""Replace the production DB dependency with the test in-memory DB."""
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
@pytest.fixture(scope="session", autouse=True)
def create_test_tables():
"""Create all tables once for the entire test session - not per test."""
Base.metadata.create_all(bind=test_engine)
yield
Base.metadata.drop_all(bind=test_engine)
@pytest.fixture(autouse=True)
def reset_tables():
"""
Truncate all rows between every test.
Ensures each test starts with an empty database - no shared state.
"""
yield
with TestingSessionLocal() as db:
from app.models.task import Task
db.query(Task).delete()
db.commit()
@pytest.fixture(scope="session")
def client():
"""FastAPI TestClient with the in-memory DB dependency override."""
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as c:
yield c
app.dependency_overrides.clear()
@pytest.fixture
def sample_task(client) -> dict:
"""Create one task and return its response JSON."""
response = client.post("/tasks/", json={
"title": "Sample Task",
"description": "A test task",
"priority": 3,
})
assert response.status_code == 201
return response.json()
@pytest.fixture
def multiple_tasks(client) -> list[dict]:
"""Create 5 tasks with different priorities for pagination and filter tests."""
tasks_data = [
{"title": "Task Alpha", "priority": 1},
{"title": "Task Beta", "priority": 2},
{"title": "Task Gamma", "priority": 3},
{"title": "Task Delta", "priority": 4},
{"title": "Task Epsilon", "priority": 5},
]
created = []
for data in tasks_data:
resp = client.post("/tasks/", json=data)
assert resp.status_code == 201
created.append(resp.json())
return created
tests/test_tasks.py
class TestCreateTask:
def test_create_task_success(self, client):
resp = client.post("/tasks/", json={"title": "Write tests", "priority": 3})
assert resp.status_code == 201
body = resp.json()
assert body["title"] == "Write tests"
assert body["priority"] == 3
assert body["status"] == "pending"
assert "id" in body
assert "created_at" in body
def test_create_task_minimal_fields(self, client):
"""Only title is required - priority defaults to 2."""
resp = client.post("/tasks/", json={"title": "Minimal"})
assert resp.status_code == 201
assert resp.json()["priority"] == 2
def test_create_task_strips_whitespace(self, client):
resp = client.post("/tasks/", json={"title": " Spaces Around "})
assert resp.status_code == 201
assert resp.json()["title"] == "Spaces Around"
def test_create_task_missing_title_is_422(self, client):
resp = client.post("/tasks/", json={"priority": 3})
assert resp.status_code == 422
body = resp.json()
assert body["type"] == "https://example.com/errors/validation"
assert body["status"] == 422
def test_create_task_empty_title_is_422(self, client):
assert client.post("/tasks/", json={"title": ""}).status_code == 422
def test_create_task_priority_6_is_422(self, client):
assert client.post("/tasks/", json={"title": "t", "priority": 6}).status_code == 422
def test_create_task_priority_0_is_422(self, client):
assert client.post("/tasks/", json={"title": "t", "priority": 0}).status_code == 422
def test_every_response_has_request_id_header(self, client):
resp = client.post("/tasks/", json={"title": "Header test"})
assert "X-Request-ID" in resp.headers
def test_every_response_has_process_time_header(self, client):
resp = client.post("/tasks/", json={"title": "Timing test"})
assert "X-Process-Time" in resp.headers
assert resp.headers["X-Process-Time"].endswith("ms")
def test_client_request_id_is_echoed(self, client):
custom_id = "my-custom-request-id-abc123"
resp = client.post(
"/tasks/",
json={"title": "Echo test"},
headers={"X-Request-ID": custom_id},
)
assert resp.headers["X-Request-ID"] == custom_id
class TestListTasks:
def test_list_tasks_empty_db(self, client):
resp = client.get("/tasks/")
assert resp.status_code == 200
body = resp.json()
assert body["items"] == []
assert body["total"] == 0
def test_list_tasks_returns_all(self, client, multiple_tasks):
resp = client.get("/tasks/")
assert resp.status_code == 200
assert resp.json()["total"] == 5
def test_pagination_limit(self, client, multiple_tasks):
resp = client.get("/tasks/?limit=2&offset=0")
assert resp.status_code == 200
body = resp.json()
assert len(body["items"]) == 2
assert body["total"] == 5 # total is the full count
def test_pagination_offset_beyond_total(self, client, multiple_tasks):
resp = client.get("/tasks/?offset=100")
assert resp.status_code == 200
body = resp.json()
assert body["items"] == []
assert body["total"] == 5 # total still reflects full count
def test_filter_by_priority(self, client, multiple_tasks):
resp = client.get("/tasks/?priority=3")
assert resp.status_code == 200
assert all(t["priority"] == 3 for t in resp.json()["items"])
def test_invalid_status_is_422(self, client):
resp = client.get("/tasks/?status=invalid_status")
assert resp.status_code == 422
assert resp.json()["type"] == "https://example.com/errors/validation"
def test_sort_by_priority_asc(self, client, multiple_tasks):
resp = client.get("/tasks/?sort_by=priority&sort_order=asc")
assert resp.status_code == 200
priorities = [t["priority"] for t in resp.json()["items"]]
assert priorities == sorted(priorities)
def test_limit_above_100_is_422(self, client):
assert client.get("/tasks/?limit=101").status_code == 422
class TestGetTask:
def test_get_task_success(self, client, sample_task):
resp = client.get(f"/tasks/{sample_task['id']}")
assert resp.status_code == 200
assert resp.json()["id"] == sample_task["id"]
assert resp.json()["title"] == sample_task["title"]
def test_get_nonexistent_task_is_404(self, client):
resp = client.get("/tasks/99999")
assert resp.status_code == 404
body = resp.json()
assert body["type"] == "https://example.com/errors/not-found"
assert "99999" in body["detail"]
def test_get_task_invalid_id_type_is_422(self, client):
assert client.get("/tasks/not-an-id").status_code == 422
class TestUpdateTask:
def test_update_title_only(self, client, sample_task):
resp = client.patch(f"/tasks/{sample_task['id']}", json={"title": "Updated Title"})
assert resp.status_code == 200
assert resp.json()["title"] == "Updated Title"
def test_update_status(self, client, sample_task):
resp = client.patch(f"/tasks/{sample_task['id']}", json={"status": "done"})
assert resp.status_code == 200
assert resp.json()["status"] == "done"
def test_patch_only_changes_provided_fields(self, client, sample_task):
"""PATCH must not null out fields not included in the request body."""
original_priority = sample_task["priority"]
resp = client.patch(f"/tasks/{sample_task['id']}", json={"title": "Only title changed"})
assert resp.status_code == 200
assert resp.json()["title"] == "Only title changed"
assert resp.json()["priority"] == original_priority # must be unchanged
def test_update_nonexistent_task_is_404(self, client):
resp = client.patch("/tasks/99999", json={"title": "Ghost"})
assert resp.status_code == 404
def test_update_invalid_status_is_422(self, client, sample_task):
resp = client.patch(f"/tasks/{sample_task['id']}", json={"status": "flying"})
assert resp.status_code == 422
class TestDeleteTask:
def test_delete_task_returns_204_no_body(self, client, sample_task):
task_id = sample_task["id"]
resp = client.delete(f"/tasks/{task_id}")
assert resp.status_code == 204
assert resp.content == b"" # 204 must have no response body
def test_deleted_task_is_gone(self, client, sample_task):
task_id = sample_task["id"]
client.delete(f"/tasks/{task_id}")
assert client.get(f"/tasks/{task_id}").status_code == 404
def test_delete_nonexistent_task_is_404(self, client):
assert client.delete("/tasks/99999").status_code == 404
def test_delete_twice_second_is_404(self, client, sample_task):
task_id = sample_task["id"]
assert client.delete(f"/tasks/{task_id}").status_code == 204
assert client.delete(f"/tasks/{task_id}").status_code == 404
class TestHealth:
def test_health_check_returns_ok(self, client):
resp = client.get("/health")
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
Dockerfile
# ── Stage 1: builder ──────────────────────────────────────────────────────────
# Install all dependencies here. The runtime stage copies only installed packages,
# not pip, setuptools, or any build tooling.
FROM python:3.12-slim AS builder
WORKDIR /build
RUN pip install --upgrade pip
COPY pyproject.toml .
# Install to /install so the runtime stage can COPY --from=builder /install /usr/local
RUN pip install --no-cache-dir --prefix=/install \
fastapi \
"uvicorn[standard]" \
sqlalchemy \
"pydantic[email]" \
pydantic-settings \
orjson
# ── Stage 2: runtime ──────────────────────────────────────────────────────────
# Minimal image: no pip, no build tools, no package manager cache.
# Final image is typically 100-150 MB vs 800+ MB for a non-multi-stage build.
FROM python:3.12-slim AS runtime
# Security: run as non-root user - never run application processes as root in production
RUN groupadd --gid 1001 appgroup && \
useradd --uid 1001 --gid 1001 --no-create-home appuser
WORKDIR /app
# Copy installed packages from builder - not pip or build tools
COPY --from=builder /install /usr/local
# Copy application source only - no tests, no dev configs, no .env files
COPY app/ ./app/
USER appuser
EXPOSE 8000
# --workers 1: single worker required for SQLite (no concurrent writes)
# Switch to multiple workers when using PostgreSQL in production
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
docker-compose.yml
version: "3.9"
services:
api:
build:
context: .
target: runtime # build the runtime stage, not the builder
ports:
- "8000:8000"
environment:
DATABASE_URL: "sqlite:///./tasks.db"
DEBUG: "false"
volumes:
# Persist the SQLite database file between container restarts
- ./data:/app/data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 5s
retries: 3
start_period: 10s
restart: unless-stopped
Verification Checklist
Before submitting this project, verify each item:
-
pytest tests/ -v- all tests pass, zero failures -
POST /tasks/with valid body →201, response hasid,created_at,status: "pending" -
POST /tasks/with missing title →422, body has"type": "https://example.com/errors/validation" -
GET /tasks/?limit=2&offset=0→200,itemshas 2 items,totalreflects full count -
GET /tasks/99999→404, body has"type": "https://example.com/errors/not-found"and"99999"indetail -
PATCH /tasks/{id}with only{"title": "new"}→ priority and status unchanged -
DELETE /tasks/{id}→204with empty body; secondDELETEon same ID →404 - Every response has
X-Request-IDandX-Process-Timeheaders - Sending
X-Request-ID: my-id→X-Request-ID: my-idechoed in response -
GET /docsrenders Swagger UI with task endpoints tagged"Tasks" -
docker compose upstarts without errors -
curl http://localhost:8000/healthfrom the host →{"status": "ok", ...}
Extension Challenges
Extension 1 - Add soft delete
Add a deleted_at timestamp column to the Task model. DELETE /tasks/{id} sets deleted_at instead of removing the row. GET /tasks and GET /tasks/{id} exclude soft-deleted tasks by default. Add GET /tasks?include_deleted=true to show all records.
Extension 2 - Full-text search
Add GET /tasks?q=search+term that filters by LIKE match on title and description. Implement in the list_tasks router function and add tests for matching and non-matching cases.
Extension 3 - Rate limiting middleware
Write middleware that tracks request counts per client IP using an in-process dict. Return 429 Too Many Requests with an RFC 7807 body and a Retry-After header if an IP exceeds 100 requests per minute.
