Skip to main content

Caching Strategies - Trading Memory for Speed

Before reading further, predict what this code prints:

from functools import lru_cache

@lru_cache(maxsize=2)
def expensive(x):
print(f" computing {x}")
return x * x

print(expensive(1))
print(expensive(2))
print(expensive(3))
print(expensive(1))
print(expensive.cache_info())

Think carefully. The cache has maxsize=2. We call with three distinct arguments, then revisit the first.

computing 1
1
computing 2
4
computing 3
9
computing 1
1
CacheInfo(hits=0, misses=4, maxsize=2, currsize=2)

Every call is a miss. Zero hits. The call to expensive(1) at the end recomputes because inserting 3 evicted 1 from the LRU cache (since 1 was the least recently used at that point). The maxsize=2 cache can only hold two entries, and our working set is three. This is called cache thrashing, and it is worse than no cache at all - you pay the overhead of cache lookup and insertion with zero benefit.

The lesson: caching is not "add a decorator and win." It requires understanding access patterns, eviction policies, and failure modes.

What You Will Learn

  • How functools.lru_cache and functools.cache work internally
  • The cache invalidation problem and why it is fundamentally hard
  • How to implement TTL (time-to-live) caches for time-sensitive data
  • Memoization patterns beyond simple function caching
  • The cachetools library for specialized cache types (LRU, TTL, LFU)
  • Using Redis as an external distributed cache
  • Cache stampede prevention with locking and probabilistic early expiration
  • Real-world patterns for caching database queries and API responses

Prerequisites

  • Completed Lessons 1-3 (profiling strategy, cProfile, line_profiler)
  • Understanding of hash tables and O(1) lookups
  • Familiarity with decorators and closures

Part 1 - functools.lru_cache: The Standard Library Cache

lru_cache is Python's built-in memoization decorator. It caches function results keyed by the arguments, using a Least Recently Used eviction policy.

How It Works Internally

Internally, lru_cache uses a doubly-linked list for LRU ordering and a dict for O(1) lookups. The implementation is in C for CPython, making it fast.

Basic Usage

from functools import lru_cache

@lru_cache(maxsize=128)
def fibonacci(n):
"""Without caching: O(2^n). With caching: O(n)."""
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)

# Without cache: fibonacci(35) takes ~4.5 seconds
# With cache: fibonacci(35) takes ~0.00003 seconds
print(fibonacci(35)) # 9227465

# Inspect cache statistics
print(fibonacci.cache_info())
# CacheInfo(hits=33, misses=36, maxsize=128, currsize=36)

maxsize and typed Parameters

# maxsize=None: unbounded cache (equivalent to functools.cache in 3.9+)
@lru_cache(maxsize=None)
def compute(x):
return x ** 2

# maxsize=0: no caching (every call computes fresh) - useful for testing
@lru_cache(maxsize=0)
def always_fresh(x):
return x ** 2

# typed=True: treats 1 (int) and 1.0 (float) as different keys
@lru_cache(maxsize=128, typed=True)
def typed_compute(x):
print(f" computing {x!r} (type={type(x).__name__})")
return x * 2

typed_compute(1) # computing 1 (type=int)
typed_compute(1.0) # computing 1.0 (type=float) - separate cache entry!

# Without typed=True, 1 and 1.0 would share a cache entry
# because hash(1) == hash(1.0) and 1 == 1.0

Cache Management

from functools import lru_cache

@lru_cache(maxsize=256)
def get_user(user_id: int):
"""Simulate a database lookup."""
print(f" DB query for user {user_id}")
return {"id": user_id, "name": f"User {user_id}"}

# Populate cache
get_user(1)
get_user(2)
get_user(3)
print(get_user.cache_info())
# CacheInfo(hits=0, misses=3, maxsize=256, currsize=3)

# Clear the entire cache
get_user.cache_clear()
print(get_user.cache_info())
# CacheInfo(hits=0, misses=0, maxsize=256, currsize=0)

# Access the underlying function without caching
result = get_user.__wrapped__(42) # Bypasses cache entirely

:::danger lru_cache Requires Hashable Arguments All arguments to a cached function must be hashable. Lists, dicts, and sets will raise TypeError. This is a common source of bugs:

@lru_cache(maxsize=128)
def process(data):
return sum(data)

process([1, 2, 3]) # TypeError: unhashable type: 'list'
process((1, 2, 3)) # Works - tuples are hashable

If you need to cache functions that accept lists, convert to tuples in a wrapper. :::

functools.cache (Python 3.9+)

functools.cache is a simpler unbounded cache - equivalent to lru_cache(maxsize=None):

from functools import cache

@cache
def factorial(n):
return n * factorial(n - 1) if n else 1

factorial(100)
print(factorial.cache_info())
# CacheInfo(hits=0, misses=101, maxsize=None, currsize=101)

:::note cache vs lru_cache(maxsize=None) They are functionally identical. cache is slightly faster because it skips the LRU ordering overhead (no linked list manipulation). Use cache when you know the number of unique inputs is bounded and you want to keep everything forever. :::

Part 2 - The Cache Invalidation Problem

"There are only two hard things in Computer Science: cache invalidation and naming things." - Phil Karlton

Cache invalidation is hard because the cache must reflect the current state of the underlying data. When the data changes, the cache must be updated or cleared. Getting this wrong means serving stale data.

When Caches Go Wrong

from functools import lru_cache

# Simulated "database"
_database = {"course_1": {"title": "Python Foundation", "price": 0}}

@lru_cache(maxsize=128)
def get_course(course_id: str):
"""Cache the result of a database lookup."""
return _database[course_id].copy()

# First call: cache miss, fetches from DB
course = get_course("course_1")
print(course) # {'title': 'Python Foundation', 'price': 0}

# Admin updates the price in the database
_database["course_1"]["price"] = 49

# Second call: cache HIT - returns STALE data!
course = get_course("course_1")
print(course) # {'title': 'Python Foundation', 'price': 0} ← WRONG!

# The cache has no way to know the database changed.
# You must explicitly invalidate:
get_course.cache_clear()
course = get_course("course_1")
print(course) # {'title': 'Python Foundation', 'price': 49} ← correct

Invalidation Strategies

Part 3 - TTL Caches: Time-Based Expiration

lru_cache has no concept of time. Once a result is cached, it stays forever (until evicted by LRU or manually cleared). For data that changes, you need TTL (Time To Live) expiration.

Building a Simple TTL Cache

import time
from functools import wraps

def ttl_cache(maxsize=128, ttl_seconds=300):
"""
LRU cache with time-based expiration.

Entries expire after ttl_seconds, regardless of access pattern.
"""
def decorator(func):
cache = {}
timestamps = {}
call_count = {'hits': 0, 'misses': 0}

@wraps(func)
def wrapper(*args, **kwargs):
key = (args, tuple(sorted(kwargs.items())))
now = time.monotonic()

# Check if cached and not expired
if key in cache and (now - timestamps[key]) < ttl_seconds:
call_count['hits'] += 1
return cache[key]

# Cache miss or expired
call_count['misses'] += 1
result = func(*args, **kwargs)

# Evict if full
if len(cache) >= maxsize:
# Remove oldest entry
oldest_key = min(timestamps, key=timestamps.get)
del cache[oldest_key]
del timestamps[oldest_key]

cache[key] = result
timestamps[key] = now
return result

def cache_info():
return {
'hits': call_count['hits'],
'misses': call_count['misses'],
'size': len(cache),
'maxsize': maxsize,
'ttl_seconds': ttl_seconds,
}

def cache_clear():
cache.clear()
timestamps.clear()
call_count['hits'] = 0
call_count['misses'] = 0

wrapper.cache_info = cache_info
wrapper.cache_clear = cache_clear
return wrapper
return decorator

# Usage
@ttl_cache(maxsize=100, ttl_seconds=60)
def get_exchange_rate(currency_pair: str) -> float:
"""Fetch exchange rate - cached for 60 seconds."""
print(f" Fetching rate for {currency_pair}")
# Simulate API call
import random
return round(random.uniform(0.8, 1.2), 4)

print(get_exchange_rate("USD/EUR")) # Fetching...
print(get_exchange_rate("USD/EUR")) # Cache hit - no fetch
print(get_exchange_rate.cache_info())
# {'hits': 1, 'misses': 1, 'size': 1, 'maxsize': 100, 'ttl_seconds': 60}

Using cachetools for Production TTL Caches

The cachetools library provides well-tested cache implementations:

pip install cachetools
from cachetools import TTLCache, LRUCache, LFUCache, cached
import time

# TTL cache: entries expire after 30 seconds
ttl_cache = TTLCache(maxsize=1000, ttl=30)

@cached(ttl_cache)
def get_config(key: str) -> dict:
"""Fetch configuration - refreshed every 30 seconds."""
print(f" Loading config: {key}")
return {"key": key, "value": "loaded", "timestamp": time.time()}

print(get_config("database")) # Loading config: database
print(get_config("database")) # Cache hit

# LFU cache: evicts Least Frequently Used
lfu_cache = LFUCache(maxsize=100)

@cached(lfu_cache)
def compute_feature(feature_id: int) -> float:
"""Expensive feature computation - keep the most popular."""
print(f" Computing feature {feature_id}")
return feature_id ** 0.5

# LRU cache (same as functools but with cachetools ecosystem)
lru_cache = LRUCache(maxsize=256)

Thread-Safe Caching

from cachetools import TTLCache, cached
import threading

# cachetools caches are NOT thread-safe by default!
# Use a lock for concurrent access:

_cache = TTLCache(maxsize=1000, ttl=60)
_cache_lock = threading.Lock()

@cached(_cache, lock=_cache_lock)
def get_user_profile(user_id: int) -> dict:
"""Thread-safe cached user profile lookup."""
# ... database query ...
return {"id": user_id, "name": f"User {user_id}"}

Part 4 - Memoization Patterns

Memoization is the general technique of caching function results. lru_cache covers the simple case, but real applications require more sophisticated patterns.

Pattern 1: Memoizing Methods

lru_cache on methods caches per-instance if used correctly, but has a subtle trap:

from functools import lru_cache

class Analyzer:
def __init__(self, data):
self.data = data

@lru_cache(maxsize=128) # WARNING: this caches on (self, threshold)
def analyze(self, threshold):
print(f" Analyzing with threshold={threshold}")
return [x for x in self.data if x > threshold]

a1 = Analyzer([1, 2, 3, 4, 5])
a2 = Analyzer([10, 20, 30, 40, 50])

a1.analyze(3) # Computes
a2.analyze(3) # Computes (different self)
a1.analyze(3) # Cache hit

# Problem: the cache is on the CLASS, not the instance.
# a1 and a2 share the same cache dict.
# And 'self' is part of the key, so instances are never garbage collected
# while the cache holds a reference to them!

:::danger lru_cache on Methods Prevents Garbage Collection When lru_cache is applied to a method, self becomes part of the cache key. The cache holds a strong reference to the instance, preventing it from being garbage collected even after all other references are dropped. For long-running applications, this is a memory leak. :::

Fix: Use a per-instance cache with __init__:

from functools import lru_cache

class Analyzer:
def __init__(self, data):
self.data = data
# Create a per-instance cached method
self.analyze = lru_cache(maxsize=128)(self._analyze)

def _analyze(self, threshold):
print(f" Analyzing with threshold={threshold}")
return [x for x in self.data if x > threshold]

# Now each instance has its own cache, and the cache is collected
# when the instance is collected
a1 = Analyzer([1, 2, 3, 4, 5])
a1.analyze(3) # Computes
a1.analyze(3) # Cache hit
del a1 # Instance and its cache are freed

Pattern 2: Cache Key Normalization

from functools import lru_cache

def normalize_args(func):
"""Decorator to normalize arguments before caching."""
@lru_cache(maxsize=256)
def cached_func(*args, **kwargs):
return func(*args, **kwargs)

def wrapper(*args, **kwargs):
# Normalize: convert all keyword args to positional
# and sort keyword args for consistent cache keys
normalized = tuple(args) + tuple(
v for _, v in sorted(kwargs.items())
)
return cached_func(*normalized)

wrapper.cache_info = cached_func.cache_info
wrapper.cache_clear = cached_func.cache_clear
return wrapper

@normalize_args
def search(query, limit, offset=0):
print(f" Searching: {query}, limit={limit}, offset={offset}")
return [f"result_{i}" for i in range(offset, offset + limit)]

# These all hit the same cache entry:
search("python", 10, offset=0) # Miss
search("python", 10, 0) # Hit (normalized to same key)
search("python", 10) # Hit (default offset=0)

Pattern 3: Computed Property Cache

class Dataset:
"""Cache expensive computed properties."""

def __init__(self, raw_data: list[float]):
self._raw_data = raw_data
self._cache = {}

def _invalidate(self):
"""Call when raw data changes."""
self._cache.clear()

@property
def mean(self) -> float:
if 'mean' not in self._cache:
self._cache['mean'] = sum(self._raw_data) / len(self._raw_data)
return self._cache['mean']

@property
def sorted_data(self) -> list[float]:
if 'sorted' not in self._cache:
self._cache['sorted'] = sorted(self._raw_data)
return self._cache['sorted']

@property
def median(self) -> float:
if 'median' not in self._cache:
s = self.sorted_data # Uses cached sorted_data
n = len(s)
if n % 2 == 0:
self._cache['median'] = (s[n//2 - 1] + s[n//2]) / 2
else:
self._cache['median'] = s[n//2]
return self._cache['median']

def append(self, value: float):
self._raw_data.append(value)
self._invalidate() # Properties will recompute on next access

ds = Dataset([3.0, 1.0, 4.0, 1.0, 5.0])
print(ds.mean) # Computes and caches
print(ds.mean) # Cache hit
print(ds.median) # Computes sorted_data, then median
ds.append(9.0) # Invalidates all caches
print(ds.mean) # Recomputes

Part 5 - Cache Stampede Prevention

A cache stampede (also called "thundering herd") occurs when a cached entry expires and many concurrent requests simultaneously try to recompute it. Instead of one computation, you get N identical computations hammering your database or API.

Solution 1: Lock-Based Prevention

import threading
import time
from functools import wraps

def stampede_protected_cache(maxsize=128, ttl_seconds=60):
"""
Cache with lock-based stampede prevention.
Only one thread recomputes on cache miss; others wait.
"""
def decorator(func):
cache = {}
timestamps = {}
locks = {}
global_lock = threading.Lock()

@wraps(func)
def wrapper(*args):
key = args
now = time.monotonic()

# Fast path: cache hit, not expired
if key in cache and (now - timestamps.get(key, 0)) < ttl_seconds:
return cache[key]

# Get or create a per-key lock
with global_lock:
if key not in locks:
locks[key] = threading.Lock()
key_lock = locks[key]

# Only one thread recomputes; others wait
with key_lock:
# Double-check: another thread may have populated it
now = time.monotonic()
if key in cache and (now - timestamps.get(key, 0)) < ttl_seconds:
return cache[key]

# Recompute
result = func(*args)
cache[key] = result
timestamps[key] = now

# Evict if over maxsize
if len(cache) > maxsize:
oldest = min(timestamps, key=timestamps.get)
del cache[oldest]
del timestamps[oldest]

return result

wrapper.cache_clear = lambda: (cache.clear(), timestamps.clear())
return wrapper
return decorator

@stampede_protected_cache(maxsize=100, ttl_seconds=30)
def get_course_catalog():
"""Expensive operation - only one thread should compute at a time."""
print(" Computing catalog...")
time.sleep(0.5) # Simulate slow DB query
return [{"id": i, "title": f"Course {i}"} for i in range(100)]

Solution 2: Probabilistic Early Expiration (XFetch)

Instead of all entries expiring at exactly the same time, add a random jitter that causes some entries to refresh slightly before they expire:

import time
import random
import math
from functools import wraps

def xfetch_cache(maxsize=128, ttl_seconds=60, beta=1.0):
"""
Cache with probabilistic early recomputation (XFetch algorithm).

As an entry approaches expiration, there is an increasing probability
that a request will proactively recompute it. This spreads recomputation
over time, preventing stampedes.

beta controls eagerness: higher beta = earlier recomputation.
"""
def decorator(func):
cache = {}
expiry = {}
compute_times = {} # Track how long computation takes

@wraps(func)
def wrapper(*args):
key = args
now = time.monotonic()

if key in cache:
# XFetch: probabilistic early expiration
ttl_remaining = expiry[key] - now
delta = compute_times.get(key, 0.1)

# Probability of recomputing increases as expiry approaches
# P(recompute) = exp(-ttl_remaining / (beta * delta))
if ttl_remaining > 0:
prob = math.exp(-ttl_remaining / (beta * max(delta, 0.001)))
if random.random() >= prob:
return cache[key] # Not yet - use cached value

# Recompute
start = time.monotonic()
result = func(*args)
elapsed = time.monotonic() - start

cache[key] = result
expiry[key] = now + ttl_seconds
compute_times[key] = elapsed

# Evict if over maxsize
if len(cache) > maxsize:
oldest = min(expiry, key=expiry.get)
del cache[oldest]
del expiry[oldest]

return result

wrapper.cache_clear = lambda: (cache.clear(), expiry.clear())
return wrapper
return decorator

Part 6 - Redis as an External Cache

For multi-process or multi-server deployments, in-process caches (like lru_cache) are insufficient. Each process has its own cache, leading to redundant computation and inconsistent data. Redis provides a shared, external cache.

import json
import time
import hashlib
from functools import wraps
from typing import Optional

# pip install redis
import redis

class RedisCache:
"""
Production-quality Redis caching layer.

Features:
- JSON serialization
- TTL-based expiration
- Cache key namespacing
- Stampede prevention via SETNX locking
"""

def __init__(self, redis_url: str = "redis://localhost:6379",
namespace: str = "app", default_ttl: int = 300):
self.client = redis.from_url(redis_url, decode_responses=True)
self.namespace = namespace
self.default_ttl = default_ttl

def _make_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""Generate a deterministic cache key."""
key_data = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True)
key_hash = hashlib.md5(key_data.encode()).hexdigest()[:12]
return f"{self.namespace}:{func_name}:{key_hash}"

def get(self, key: str) -> Optional[dict]:
"""Get a cached value."""
raw = self.client.get(key)
if raw is None:
return None
return json.loads(raw)

def set(self, key: str, value, ttl: Optional[int] = None):
"""Set a cached value with TTL."""
ttl = ttl or self.default_ttl
self.client.setex(key, ttl, json.dumps(value))

def delete(self, key: str):
"""Delete a cached entry."""
self.client.delete(key)

def cached(self, ttl: Optional[int] = None):
"""Decorator for caching function results in Redis."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
key = self._make_key(func.__name__, args, kwargs)

# Try cache first
cached_value = self.get(key)
if cached_value is not None:
return cached_value

# Cache miss - compute and store
result = func(*args, **kwargs)
self.set(key, result, ttl)
return result

wrapper.cache_invalidate = lambda *args, **kwargs: (
self.delete(self._make_key(func.__name__, args, kwargs))
)
return wrapper
return decorator

def cached_with_lock(self, ttl: Optional[int] = None,
lock_timeout: int = 10):
"""Decorator with Redis-based stampede prevention."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
key = self._make_key(func.__name__, args, kwargs)
lock_key = f"{key}:lock"

# Try cache first
cached_value = self.get(key)
if cached_value is not None:
return cached_value

# Try to acquire lock
acquired = self.client.set(
lock_key, "1",
nx=True, # Only set if not exists
ex=lock_timeout,
)

if acquired:
try:
# We got the lock - compute and cache
result = func(*args, **kwargs)
self.set(key, result, ttl)
return result
finally:
self.client.delete(lock_key)
else:
# Another process is computing - wait and retry
for _ in range(lock_timeout * 10):
time.sleep(0.1)
cached_value = self.get(key)
if cached_value is not None:
return cached_value
# Lock expired without result - compute ourselves
result = func(*args, **kwargs)
self.set(key, result, ttl)
return result

return wrapper
return decorator


# Usage
redis_cache = RedisCache(
redis_url="redis://localhost:6379",
namespace="engineersofai",
default_ttl=300,
)

@redis_cache.cached(ttl=60)
def get_course_catalog():
"""Cached in Redis for 60 seconds, shared across all workers."""
# ... expensive database query ...
return [{"id": 1, "title": "Python Foundation"}]

@redis_cache.cached_with_lock(ttl=120, lock_timeout=10)
def generate_report(report_type: str):
"""Only one worker generates the report; others wait."""
# ... expensive computation ...
return {"type": report_type, "data": [1, 2, 3]}

Part 7 - Real-World Caching Patterns

Caching Database Queries

from functools import lru_cache
from typing import Optional
import time

class CourseRepository:
"""Repository with multi-layer caching."""

def __init__(self, db_session, redis_client=None):
self.db = db_session
self.redis = redis_client
self._local_cache = {}
self._local_timestamps = {}
self._local_ttl = 10 # 10 second local cache

def get_course(self, course_id: int) -> Optional[dict]:
"""
Three-layer cache:
1. In-process dict (10s TTL) - fastest, per-worker
2. Redis (60s TTL) - shared across workers
3. Database - source of truth
"""
# Layer 1: Local cache
now = time.monotonic()
if course_id in self._local_cache:
if (now - self._local_timestamps[course_id]) < self._local_ttl:
return self._local_cache[course_id]

# Layer 2: Redis
if self.redis:
key = f"course:{course_id}"
cached = self.redis.get(key)
if cached:
import json
course = json.loads(cached)
self._local_cache[course_id] = course
self._local_timestamps[course_id] = now
return course

# Layer 3: Database
course = self.db.query(
"SELECT * FROM courses WHERE id = %s", (course_id,)
)
if course:
course_dict = dict(course)
# Populate both caches
self._local_cache[course_id] = course_dict
self._local_timestamps[course_id] = now
if self.redis:
import json
self.redis.setex(f"course:{course_id}", 60,
json.dumps(course_dict))
return course_dict

return None

def update_course(self, course_id: int, data: dict):
"""Update with cache invalidation."""
self.db.execute(
"UPDATE courses SET title=%s, price=%s WHERE id=%s",
(data['title'], data['price'], course_id),
)
# Invalidate all cache layers
self._local_cache.pop(course_id, None)
self._local_timestamps.pop(course_id, None)
if self.redis:
self.redis.delete(f"course:{course_id}")

Caching API Responses in FastAPI

from fastapi import FastAPI, Response
from functools import lru_cache
import time
import json
import hashlib

app = FastAPI()

class ResponseCache:
"""Simple in-process response cache with ETags."""

def __init__(self, ttl: int = 30):
self.cache = {}
self.ttl = ttl

def get_or_compute(self, key: str, compute_func, response: Response):
now = time.monotonic()

if key in self.cache:
entry = self.cache[key]
if (now - entry['timestamp']) < self.ttl:
# Add cache headers
response.headers["X-Cache"] = "HIT"
response.headers["ETag"] = entry['etag']
response.headers["Cache-Control"] = f"max-age={self.ttl}"
return entry['data']

# Compute fresh result
data = compute_func()
etag = hashlib.md5(json.dumps(data, default=str).encode()).hexdigest()

self.cache[key] = {
'data': data,
'etag': etag,
'timestamp': now,
}

response.headers["X-Cache"] = "MISS"
response.headers["ETag"] = etag
response.headers["Cache-Control"] = f"max-age={self.ttl}"

return data

_response_cache = ResponseCache(ttl=30)

@app.get("/api/courses")
async def list_courses(response: Response):
def compute():
# Simulate DB query
return [
{"id": 1, "title": "Python Foundation", "price": 0},
{"id": 2, "title": "Python Intermediate", "price": 49},
]

return _response_cache.get_or_compute("courses:all", compute, response)

Part 8 - Choosing the Right Cache Strategy

ScenarioRecommended CacheWhy
Pure function, bounded inputsfunctools.cacheSimplest, no eviction needed
Pure function, large domainfunctools.lru_cacheBounded memory with LRU eviction
Database queries (single server)cachetools.TTLCacheTTL ensures freshness
Database queries (multi-server)Redis + local L1Consistency across workers
API responsesResponse cache with ETagsHTTP-standard, client-side caching
Configuration valuescachetools.TTLCacheRefresh periodically
User sessionsRedisShared state across workers

Key Takeaways

  • lru_cache is not magic: it requires hashable arguments, it can thrash with undersized maxsize, and it holds strong references that prevent garbage collection when used on methods.
  • Cache invalidation is the hard part: TTL-based expiration is the simplest approach; event-based invalidation is more accurate but requires infrastructure.
  • functools.cache is lru_cache(maxsize=None): use it when you know the domain is bounded and want the simplest possible memoization.
  • Cache stampede is a real production problem: use lock-based protection or probabilistic early expiration (XFetch) to prevent thundering herds.
  • Multi-layer caching (local + Redis) is the production pattern: local cache for speed, Redis for consistency across workers, database as source of truth.
  • Always measure cache effectiveness: monitor hit rates. A cache with less than 50% hit rate is consuming memory without saving compute. Remove it or tune the parameters.
  • Caching on class methods requires care: use per-instance caches or weakref-based approaches to avoid memory leaks.

Graded Practice Challenges

Level 1 - Predict the Output

Question 1: What does this print?

from functools import lru_cache

@lru_cache(maxsize=3)
def f(x):
print(f" computing {x}")
return x

f(1); f(2); f(3)
f(2)
f(4)
f(1)
print(f.cache_info())
Answer
computing 1
computing 2
computing 3
computing 4
computing 1
CacheInfo(hits=1, misses=5, maxsize=3, currsize=3)

After f(1); f(2); f(3): cache = [1, 2, 3], LRU order: 1, 2, 3 (3 is MRU).

f(2): cache hit. LRU order shifts: 1, 3, 2 (2 is now MRU). Hits=1.

f(4): miss. Evicts LRU which is 1. Cache = [3, 2, 4]. Miss count=4.

f(1): miss. 1 was evicted. Evicts LRU which is 3. Cache = [2, 4, 1]. Miss count=5.

Total: 1 hit, 5 misses.

Question 2: Why does this lru_cache usage cause a memory leak?

from functools import lru_cache

class DataProcessor:
@lru_cache(maxsize=None)
def process(self, data_id):
return data_id * 2
Answer

lru_cache is a class-level decorator, so the cache is shared across all instances. The cache key includes self, which means the cache holds a strong reference to every DataProcessor instance that calls process. Even after you del a DataProcessor instance, the lru_cache still references it, preventing garbage collection. With maxsize=None, this grows without bound.

Fix: create the cache in __init__ as a per-instance attribute, or use weakref to allow instances to be collected.

Question 3: You have a cache with 95% hit rate and 5ms average compute time for misses. The cache lookup itself takes 0.1ms. What is the expected average response time?

Answer

Expected time = (hit_rate * lookup_time) + (miss_rate * (lookup_time + compute_time))

= (0.95 * 0.1ms) + (0.05 * (0.1ms + 5ms))

= 0.095ms + 0.255ms

= 0.35ms

Without caching, every request takes 5ms. The cache reduces average response time by 14.3x.

Level 2 - Debug Challenge

This TTL cache implementation has a subtle bug that causes it to serve stale data indefinitely under certain conditions. Find and fix it:

import time

class BrokenTTLCache:
def __init__(self, ttl=60):
self.cache = {}
self.ttl = ttl
self.created_at = {}

def get(self, key):
if key in self.cache:
age = time.time() - self.created_at[key]
if age < self.ttl:
return self.cache[key]
return None

def set(self, key, value):
self.cache[key] = value
self.created_at[key] = time.time()
Answer

The bug: when get detects an expired entry (age >= self.ttl), it returns None but does not delete the expired entry from cache and created_at. This means:

  1. The entry consumes memory forever
  2. If the system clock jumps backward (NTP correction, VM migration), age could become negative, and the expired entry would suddenly become "valid" again, serving stale data

Fix:

def get(self, key):
if key in self.cache:
age = time.monotonic() - self.created_at[key] # Use monotonic clock
if age < self.ttl:
return self.cache[key]
else:
# Clean up expired entry
del self.cache[key]
del self.created_at[key]
return None

Two changes: (1) delete expired entries, (2) use time.monotonic() instead of time.time() to avoid clock-jump bugs.

Level 3 - Design Challenge

Design a caching system for a course platform API with these requirements:

  1. Course catalog (changes rarely) - cached aggressively
  2. User enrollment status (changes on purchase) - must invalidate on write
  3. Certificate verification (immutable once issued) - cache forever
  4. Live class schedule (changes hourly) - short TTL
  5. Must work across 4 Gunicorn workers behind a load balancer

Specify which cache type and TTL for each endpoint, how invalidation works, and how you handle cache warmup after deployment.

Solution Sketch

Architecture: Two-layer caching - local cachetools.TTLCache per worker + shared Redis.

EndpointLocal TTLRedis TTLInvalidation
Course catalog60s300sEvent: clear on admin update
Enrollment status5s30sEvent: clear on purchase webhook
Certificate verification3600sForeverNever (immutable data)
Live class schedule30s60sTTL-only (no explicit invalidation)

Invalidation flow:

# On course update (admin action):
async def update_course(course_id, data):
await db.update(course_id, data)
# Invalidate Redis
await redis.delete(f"course:{course_id}")
await redis.delete("catalog:all")
# Publish invalidation event for other workers
await redis.publish("cache_invalidation", f"course:{course_id}")

# Each worker subscribes to invalidation events:
async def invalidation_listener():
pubsub = redis.pubsub()
await pubsub.subscribe("cache_invalidation")
async for message in pubsub.listen():
key = message['data']
local_cache.pop(key, None) # Clear local cache

Cache warmup after deployment:

@app.on_event("startup")
async def warmup_caches():
"""Pre-populate critical caches on worker startup."""
# Course catalog - always needed
catalog = await db.fetch_all_courses()
local_cache["catalog:all"] = catalog
await redis.setex("catalog:all", 300, json.dumps(catalog))

# Top 100 most-accessed courses
popular = await db.fetch_popular_courses(limit=100)
for course in popular:
key = f"course:{course['id']}"
local_cache[key] = course
await redis.setex(key, 300, json.dumps(course))

What's Next

Caching trades memory for speed, but what happens when you run out of memory? In Memory Optimization, you will learn to reduce Python's memory footprint using __slots__, weakref, compact data structures, and memory-mapped files.

© 2026 EngineersOfAI. All rights reserved.