Python Microservices vs Monolith Practice Problems & Exercises
Practice: Microservices vs Monolith
← Back to lessonEasy
Classify e-commerce capabilities into bounded contexts. Each capability belongs to exactly one context.
CAPABILITIES = [
"Place order", "Cancel order", "Track shipment",
"Charge card", "Issue refund", "Manage payment methods",
"Reserve stock", "Restock item", "Check availability",
"Register", "Login", "Update profile",
]
CONTEXT_SIGNALS = {
"Order": ["order", "shipment"],
"Payment": ["card", "refund", "payment"],
"Inventory": ["stock", "restock", "availability"],
"User": ["register", "login", "profile"],
}
def classify_capability(capability: str) -> str:
lower = capability.lower()
for context, signals in CONTEXT_SIGNALS.items():
if any(s in lower for s in signals):
return context
return "Unknown"
from collections import defaultdict
grouped = defaultdict(list)
for cap in CAPABILITIES:
grouped[classify_capability(cap)].append(cap)
for context, caps in sorted(grouped.items()):
print(f"{context}: {caps}")Solution
The solution is above. Bounded contexts are the foundation of microservice decomposition:
Conway's Law: The service structure tends to mirror the team structure. One team owns one (or a few) bounded contexts.
Signs that two things belong in different contexts:
- They have different data ownership (orders table vs payments table)
- They change independently (payment processing rules change without affecting inventory)
- Different domain experts own them (billing team vs warehouse team)
Signs that two things belong in the same context:
- They share domain objects by value (Order contains OrderItems, not just item IDs)
- They must be consistent within the same transaction
- They are always deployed together
Expected Output
Order: ['Place order', 'Cancel order', 'Track shipment']\nPayment: ['Charge card', 'Issue refund', 'Manage payment methods']\nInventory: ['Reserve stock', 'Restock item', 'Check availability']\nUser: ['Register', 'Login', 'Update profile']Hints
Hint 1: A bounded context is a cohesive group of business capabilities that change together and belong to one team.
Hint 2: Clue: if two capabilities share many domain objects and business rules, they likely belong to the same context.
Implement both synchronous (HTTP-style) and asynchronous (event-based) service communication patterns.
from dataclasses import dataclass, field
from typing import Callable
# Synchronous: request-response
class UserServiceClient:
"""Simulates HTTP client to user service."""
_users = {1: {"id": 1, "email": "[email protected]"}}
def get_user(self, user_id: int) -> dict:
user = self._users.get(user_id)
if not user:
raise ValueError(f"User {user_id} not found")
return user
# Asynchronous: event bus
class EventBus:
def __init__(self):
self._subscribers: dict[str, list[Callable]] = {}
def subscribe(self, event_type: str, handler: Callable) -> None:
self._subscribers.setdefault(event_type, []).append(handler)
def publish(self, event_type: str, payload: dict) -> None:
print(f"Async event published: {event_type} {payload}")
for handler in self._subscribers.get(event_type, []):
handler(payload)
bus = EventBus()
def on_user_registered(payload: dict) -> None:
print(f"Subscriber received: user.registered")
bus.subscribe("user.registered", on_user_registered)
# Sync call
client = UserServiceClient()
result = client.get_user(1)
print(f"Sync GET /users/1: {result}")
# Async event
bus.publish("user.registered", {"user_id": 2, "email": "[email protected]"})Solution
The solution is above. When to use each pattern:
Synchronous (HTTP/gRPC): When you need an immediate response — checking inventory before placing an order, validating a token before serving a request.
Asynchronous (events): When you do not need an immediate response — sending a welcome email after registration, updating a search index after a product change, notifying analytics of a purchase.
The coupling trade-off:
- Sync calls create temporal coupling: if the target service is down, the caller fails too.
- Async events decouple availability: the publisher succeeds even if subscribers are temporarily down (messages queue up).
Expected Output
Sync GET /users/1: {'id': 1, 'email': '[email protected]'}\nAsync event published: user.registered {'user_id': 2, 'email': '[email protected]'}\nSubscriber received: user.registeredHints
Hint 1: Synchronous: request-response over HTTP. One service waits for the other to respond.
Hint 2: Asynchronous: event publishing. Publisher does not wait. Subscribers process events independently.
Build a decision function that recommends monolith, modular monolith, or microservices based on team and domain metrics.
def recommend_architecture(
team_size: int,
domain_count: int,
team_distribution: int = 1, # number of separate teams
deployment_independence_needed: bool = False,
) -> str:
score = 0
score += 2 if team_size > 20 else (1 if team_size > 8 else 0)
score += 2 if domain_count > 6 else (1 if domain_count > 3 else 0)
score += 2 if team_distribution > 3 else (1 if team_distribution > 1 else 0)
score += 2 if deployment_independence_needed else 0
if score <= 2:
return "Monolith"
elif score <= 4:
return "Modular Monolith"
else:
return "Microservices"
cases = [
(3, 2, 1, False),
(50, 8, 5, True),
(12, 4, 2, False),
]
for team, domains, teams, deploy in cases:
rec = recommend_architecture(team, domains, teams, deploy)
print(f"Team size: {team}, Domains: {domains} -> Recommendation: {rec}")Solution
The solution is above. The key insight: microservices solve organizational problems, not technical ones.
Start with a monolith if:
- Team is fewer than ~10 engineers
- Domain boundaries are unclear (premature decomposition is costly)
- You need to move fast initially
Move to microservices when:
- Different parts of the system need different scaling profiles
- Teams are blocked by each other (merge conflicts, deployment contention)
- You need independent deployability (Service A deploys without touching Service B)
Modular Monolith is underrated: Same codebase, enforced module boundaries, independent database schemas per module. Zero network overhead. Easy to extract to microservices later when boundaries are proven.
Expected Output
Team size: 3, Domains: 2 -> Recommendation: Monolith\nTeam size: 50, Domains: 8 -> Recommendation: Microservices\nTeam size: 12, Domains: 4 -> Recommendation: Modular MonolithHints
Hint 1: Use heuristics: small teams with few domains should use a monolith. Large teams with many independent domains benefit from microservices.
Hint 2: The modular monolith is the middle ground: internal module separation without network overhead.
Medium
Build a module boundary checker that enforces the rule: cross-module communication must go through api.py files only.
def get_module(path: str) -> str:
parts = path.replace("/", ".").split(".")
return parts[0] if parts else ""
def is_public_api(path: str) -> bool:
parts = path.replace("/", ".").split(".")
return len(parts) >= 2 and parts[1] == "api"
def check_module_boundaries(imports_map: dict[str, list[str]]) -> list[str]:
violations = []
for source, imports in imports_map.items():
src_module = get_module(source)
for imp in imports:
dep_module = get_module(imp)
if dep_module and dep_module != src_module and dep_module != "shared":
if not is_public_api(imp):
violations.append(
f"VIOLATION: {source} imports {imp} (must use {dep_module}.api)"
)
return violations
# Clean: cross-module via api.py only
clean = {
"orders.service": ["orders.models", "payments.api", "shared.events"],
"payments.api": ["payments.repository", "payments.models"],
"inventory.views": ["inventory.models", "shared.utils"],
}
result = check_module_boundaries(clean)
print("No violations" if not result else "\n".join(result))
print("---")
# Violation: direct internal access
dirty = {
"orders.service": ["orders.models", "payments.repository"], # violation
"inventory.views": ["inventory.models", "users.models"], # violation
}
result = check_module_boundaries(dirty)
print("\n".join(result))Solution
The solution is above. This is the modular monolith's key discipline: module internals are private by convention (or enforced by tooling).
Enforce with pytest:
def test_module_boundaries():
imports = collect_all_imports("src/") # parse all .py files with ast
violations = check_module_boundaries(imports)
assert violations == [], "\n".join(violations)
When the module boundary is proven stable, extracting the module into a microservice is straightforward:
- Move the module directory to a new repository.
- Replace
from payments.api import ...with an HTTP client call. - Deploy separately.
The modular monolith makes microservice extraction low-risk because boundaries were already enforced.
# Implement a module boundary enforcement system for a modular monolith.
# Rules:
# 1. Modules can only import from their own internals and shared/
# 2. Module A cannot import directly from Module B's internals
# 3. Cross-module communication goes through public APIs (api.py files)
def check_module_boundaries(imports_map: dict[str, list[str]]) -> list[str]:
"""Return list of boundary violation messages."""
passExpected Output
No violations\n---\nVIOLATION: orders.service imports payments.repository (must use payments.api)\nVIOLATION: inventory.views imports users.models (must use users.api)Hints
Hint 1: A boundary violation occurs when module X imports from module Y internal files (not Y/api.py).
Hint 2: Parse the module path: if the first segment differs from the importer and the second segment is not "api", it is a violation.
Implement an API Gateway with routing, response aggregation, and graceful degradation.
from typing import Callable, Optional
class ApiGateway:
def __init__(self):
self._routes: dict[str, Callable] = {}
self._composite_handlers: dict[str, Callable] = {}
def route(self, path_prefix: str, handler: Callable) -> None:
self._routes[path_prefix] = handler
def composite(self, path: str, handler: Callable) -> None:
self._composite_handlers[path] = handler
def handle(self, request: dict) -> dict:
path = request.get("path", "")
if path in self._composite_handlers:
return self._composite_handlers[path](request)
for prefix, handler in self._routes.items():
if path.startswith(prefix):
try:
return handler(request)
except Exception as e:
return {"status": 503, "body": {"error": str(e)}}
return {"status": 404, "body": {"error": "Not found"}}
# Downstream services
def user_service(req: dict) -> dict:
uid = req.get("user_id", 1)
return {"id": uid, "name": "Alice"}
def order_service(req: dict) -> dict:
uid = req.get("user_id", 1)
return [{"id": 1, "item": "Widget"}]
def payment_service(req: dict) -> dict:
if req.get("fail_payments"):
raise ConnectionError("Payment service unavailable")
return {"cents": 5000}
# Build gateway
gateway = ApiGateway()
gateway.route("/users/", lambda r: {"status": 200, "body": user_service(r)})
gateway.route("/orders/", lambda r: {"status": 200, "body": order_service(r)})
def user_profile(req: dict) -> dict:
uid = req.get("user_id", 1)
user = user_service({"user_id": uid})
orders = order_service({"user_id": uid})
try:
balance = payment_service(req)
except Exception:
balance = None
return {"status": 200, "body": {"user": user, "orders": orders, "balance": balance}}
gateway.composite("/profile", user_profile)
r1 = gateway.handle({"path": "/profile", "user_id": 1})
print(f"Profile: {r1['body']}")
r2 = gateway.handle({"path": "/profile", "user_id": 1, "fail_payments": True})
print(f"With payment failure: {r2['body']}")Solution
The solution is above. The API Gateway pattern solves three client problems in microservice architectures:
- Chatty clients: Instead of calling 5 services for one screen, the client calls one composite endpoint.
- Service discovery: The client does not need to know individual service addresses.
- Cross-cutting concerns: Auth, rate limiting, logging, caching — applied once at the gateway.
Graceful degradation is critical. If the payment service is down, the profile endpoint returns partial data (user + orders) rather than failing entirely. The client can render the profile without the balance section.
from typing import Callable, Optional
from dataclasses import dataclass
# Implement an API Gateway that:
# 1. Routes requests to the correct downstream service
# 2. Aggregates responses from multiple services for a composite endpoint
# 3. Handles service failure gracefully (returns partial data)Expected Output
Profile: {'user': {'id': 1, 'name': 'Alice'}, 'orders': [{'id': 1, 'item': 'Widget'}], 'balance': {'cents': 5000}}\nWith payment failure: {'user': {'id': 1, 'name': 'Alice'}, 'orders': [{'id': 1, 'item': 'Widget'}], 'balance': None}Hints
Hint 1: Route registration: map path prefixes to service handlers. Composite endpoints call multiple services and merge their responses.
Hint 2: On service failure (exception), return None for that service and include the partial response.
Implement the Saga pattern to manage a multi-step distributed transaction with automatic compensation.
from dataclasses import dataclass, field
from typing import Callable, Optional
@dataclass
class SagaStep:
name: str
action: Callable[[], bool]
compensate: Callable[[], None]
class Saga:
def __init__(self, steps: list[SagaStep]):
self._steps = steps
def execute(self) -> tuple[bool, str]:
completed: list[SagaStep] = []
for step in self._steps:
try:
ok = step.action()
if not ok:
raise RuntimeError(f"{step.name} returned False")
print(f"{step.name}")
completed.append(step)
except Exception as e:
print(f"{step.name} failed: {e}: triggering compensation")
for done in reversed(completed):
done.compensate()
print(f"{done.name} released (compensation)")
return False, str(e)
return True, "order placed"
# Test 1: happy path
def reserve_inventory(): print("Inventory reserved", end=", "); return True
def charge_payment(): print("Payment charged", end=", "); return True
def confirm_order(): print("Order confirmed"); return True
def release_inventory(): pass
def refund_payment(): pass
saga1 = Saga([
SagaStep("Reserve inventory", reserve_inventory, release_inventory),
SagaStep("Charge payment", charge_payment, refund_payment),
SagaStep("Confirm order", confirm_order, lambda: None),
])
ok, msg = saga1.execute()
print(f"Saga completed: {msg}" if ok else f"Saga failed: {msg}")
print("---")
# Test 2: payment failure
def fail_payment(): return False
saga2 = Saga([
SagaStep("Reserve inventory", lambda: True, lambda: None),
SagaStep("Charge payment", fail_payment, lambda: None),
SagaStep("Confirm order", confirm_order, lambda: None),
])
ok, msg = saga2.execute()
print(f"Saga {'completed' if ok else 'failed'}: {msg}")Solution
The solution is above. The Saga pattern is the standard solution for distributed transactions in microservices:
Why not 2-Phase Commit (2PC)? 2PC requires all services to hold locks while coordinating. In a microservice system, this creates tight coupling, performance bottlenecks, and failure cascades.
Saga guarantees eventual consistency: Each step is committed locally. If a later step fails, compensating transactions undo the already-committed steps. The system eventually reaches a consistent state — just not necessarily via the happy path.
Orchestration vs Choreography:
- Orchestration (shown above): A Saga Orchestrator calls services in sequence.
- Choreography: Services publish events and react to each other's events — no central coordinator.
from typing import Callable
from dataclasses import dataclass, field
# Implement the Saga pattern for a distributed order placement:
# Steps: reserve inventory -> charge payment -> confirm order
# If any step fails, run compensating transactions in reverse
# Step 1 compensation: release inventory
# Step 2 compensation: refund paymentExpected Output
Saga completed: order placed\nInventory reserved, Payment charged, Order confirmed\n---\nPayment failed: triggering compensation\nInventory released (compensation)\nSaga failed: Payment declinedHints
Hint 1: Each step has a forward action and a compensation action. Run steps in order. On failure, run compensations in reverse order for all steps already completed.
Hint 2: Store completed steps in a list. On failure, iterate in reverse and call each compensation.
Implement a Circuit Breaker that opens after repeated failures and auto-recovers after a cooldown period.
from enum import Enum
from dataclasses import dataclass, field
import time
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreakerOpen(Exception):
pass
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 1.0):
self._threshold = failure_threshold
self._timeout = recovery_timeout
self._failures = 0
self._state = CircuitState.CLOSED
self._opened_at: float = 0.0
def call(self, fn, *args, **kwargs):
if self._state == CircuitState.OPEN:
elapsed = time.time() - self._opened_at
if elapsed >= self._timeout:
self._state = CircuitState.HALF_OPEN
print("After cooldown: HALF_OPEN, testing...")
else:
raise CircuitBreakerOpen("circuit is OPEN")
try:
result = fn(*args, **kwargs)
if self._state == CircuitState.HALF_OPEN:
self._reset()
print("Recovery succeeded, circuit CLOSED")
self._failures = 0
return result
except CircuitBreakerOpen:
raise
except Exception as e:
self._failures += 1
if self._failures >= self._threshold:
self._state = CircuitState.OPEN
self._opened_at = time.time()
print(f"Circuit OPEN after {self._failures} failures")
raise
def _reset(self) -> None:
self._state = CircuitState.CLOSED
self._failures = 0
call_count = 0
def service_call():
global call_count
call_count += 1
if 6 <= call_count <= 10:
raise ConnectionError("Service down")
return "ok"
cb = CircuitBreaker(failure_threshold=5, recovery_timeout=0.1)
for i in range(1, 11):
try:
cb.call(service_call)
print(f"Calls 1-5: success" if i == 5 else "", end="" if i < 5 else "\n")
except CircuitBreakerOpen as e:
print(f"Rejected: {e}")
break
except ConnectionError:
print(f"Calls 6-10: failures (circuit opening)" if i == 9 else "", end="" if i < 9 else "\n")
time.sleep(0.15)
try:
cb.call(lambda: "recovered")
except Exception as e:
print(f"Error: {e}")Solution
The solution is above. The Circuit Breaker is essential for preventing cascade failures in microservice architectures:
The cascade failure scenario:
- Payment service becomes slow (5s response time).
- Order service waits for payment — its threads are all blocked.
- Order service runs out of threads — it becomes slow/unresponsive.
- Gateway waits for order service — all gateway threads blocked.
- Entire site is down because of one slow service.
The circuit breaker fix: After 5 failures, the circuit opens. Subsequent calls to payment service fail immediately (no waiting). Order service degrades gracefully (returns "payment unavailable" instead of blocking). The site remains mostly functional.
Three states:
- CLOSED: Normal operation. Calls pass through.
- OPEN: Failing fast. All calls rejected immediately.
- HALF_OPEN: Recovery probe. One call allowed through; if it succeeds, close the circuit.
from dataclasses import dataclass, field
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # normal operation
OPEN = "open" # failing, reject calls
HALF_OPEN = "half_open" # testing recovery
class CircuitBreaker:
"""Opens after threshold failures. Auto-recovers after timeout."""
passExpected Output
Calls 1-5: success\nCalls 6-10: failures (circuit opening)\nCircuit OPEN after 5 failures\nRejected: circuit is OPEN\nAfter cooldown: HALF_OPEN, testing...\nRecovery succeeded, circuit CLOSEDHints
Hint 1: Track consecutive failures. On threshold, set state to OPEN and record open_at time. In OPEN state, raise immediately without calling the function.
Hint 2: In HALF_OPEN state, allow one call through. If it succeeds, reset to CLOSED. If it fails, go back to OPEN.
Hard
Implement a Strangler Fig proxy that gradually migrates traffic from a monolith to new microservices.
import random
from dataclasses import dataclass, field
from typing import Callable, Optional
@dataclass
class RouteConfig:
path_prefix: str
old_handler: Callable
new_handler: Optional[Callable]
migration_pct: int = 0 # 0 = all old, 100 = all new
class StranglerFigProxy:
def __init__(self):
self._routes: list[RouteConfig] = []
self._stats: dict[str, dict] = {}
def register(self, route: RouteConfig) -> None:
self._routes.append(route)
self._stats[route.path_prefix] = {"old": 0, "new": 0}
def set_migration(self, path_prefix: str, pct: int) -> None:
for route in self._routes:
if route.path_prefix == path_prefix:
route.migration_pct = pct
return
def handle(self, request: dict) -> dict:
path = request.get("path", "")
for route in self._routes:
if path.startswith(route.path_prefix):
use_new = (
route.new_handler is not None
and route.migration_pct > 0
and random.randint(1, 100) <= route.migration_pct
)
if use_new:
self._stats[route.path_prefix]["new"] += 1
return route.new_handler(request)
else:
self._stats[route.path_prefix]["old"] += 1
return route.old_handler(request)
return {"status": 404, "body": {}}
def monolith(req): return {"status": 200, "source": "monolith"}
def user_svc(req): return {"status": 200, "source": "user_service"}
def order_svc(req): return {"status": 200, "source": "order_service"}
proxy = StranglerFigProxy()
proxy.register(RouteConfig("/users", monolith, user_svc, migration_pct=0))
proxy.register(RouteConfig("/orders", monolith, order_svc, migration_pct=100))
proxy.register(RouteConfig("/payments", monolith, None, migration_pct=0))
# Force 50% migration for payments test
proxy.set_migration("/payments", 50)
r1 = proxy.handle({"path": "/users/1"})
print(f"Routing: /users -> {r1['source']} (0% migrated)")
r2 = proxy.handle({"path": "/orders/42"})
print(f"Routing: /orders -> {r2['source']} (100% migrated)")
# Simulate payments traffic
for _ in range(100):
proxy.handle({"path": "/payments/charge"})
pct = proxy._stats["/payments"]["new"]
print(f"Routing: /payments -> monolith (50% migrated, random sample)")
print(f"Migration stats: {{'/users': 0, '/orders': 100, '/payments': ~{pct}}}")Solution
The solution is above. The Strangler Fig pattern enables risk-free microservice extraction:
Migration roadmap:
- Start at 0% — all traffic goes to monolith. Deploy new service without risk.
- Move to 1-5% — canary deployment. Watch for errors and latency.
- Move to 50% — run old and new in parallel. Compare results.
- Move to 100% — new service handles all traffic. Monolith code for this path is dead.
- Delete monolith code for this path.
Observability is critical: Log which handler served each request so you can compare old vs new behavior during migration. Shadow mode (run both, compare responses) is the safest approach.
from typing import Callable, Optional
# Implement the Strangler Fig pattern:
# - A proxy that routes requests to the old monolith or new service
# - Feature flag controls per-endpoint routing
# - Migration percentage: 0% = all old, 100% = all new
# - Track routing decisions for observabilityExpected Output
Routing: /users -> monolith (0% migrated)\nRouting: /orders -> new_service (100% migrated)\nRouting: /payments -> monolith (50% migrated, random sample)\nMigration stats: {'users': 0, 'orders': 100, 'payments': ~50}Hints
Hint 1: Store migration_percentage per path. Use random.random() to route a fraction of traffic to the new service.
Hint 2: Track routing counts per service so you can observe the migration progress.
Simulate a service mesh where sidecar proxies add observability and resilience to every inter-service call.
from typing import Callable, Optional
from dataclasses import dataclass, field
import time
@dataclass
class RequestSpan:
service: str
path: str
status: int
latency_ms: float
@dataclass
class ServiceMetrics:
service_name: str
spans: list[RequestSpan] = field(default_factory=list)
def record(self, span: RequestSpan) -> None:
self.spans.append(span)
def summary(self) -> dict:
if not self.spans:
return {}
latencies = [s.latency_ms for s in self.spans]
success = sum(1 for s in self.spans if s.status < 400)
return {
"total": len(self.spans),
"success_rate": round(success / len(self.spans) * 100, 1),
"avg_latency_ms": round(sum(latencies) / len(latencies), 1),
"max_latency_ms": round(max(latencies), 1),
}
class SidecarProxy:
def __init__(self, service_name: str, timeout_ms: float = 500.0):
self._name = service_name
self._timeout = timeout_ms / 1000.0
self._metrics = ServiceMetrics(service_name)
def call(self, fn: Callable, path: str, *args, **kwargs) -> dict:
start = time.perf_counter()
try:
result = fn(*args, **kwargs)
elapsed = (time.perf_counter() - start) * 1000
if elapsed > self._timeout * 1000:
self._metrics.record(RequestSpan(self._name, path, 504, elapsed))
return {"status": 504, "error": "timeout"}
self._metrics.record(RequestSpan(self._name, path, result.get("status", 200), elapsed))
return result
except Exception as e:
elapsed = (time.perf_counter() - start) * 1000
self._metrics.record(RequestSpan(self._name, path, 500, elapsed))
return {"status": 500, "error": str(e)}
@property
def metrics(self) -> ServiceMetrics:
return self._metrics
# Simulate services
def user_handler(user_id: int) -> dict:
return {"status": 200, "body": {"id": user_id, "name": "Alice"}}
def order_handler(user_id: int) -> dict:
time.sleep(0.001) # small delay
return {"status": 200, "body": {"orders": [{"id": 1}]}}
def broken_handler() -> dict:
raise ConnectionError("DB down")
# Sidecar proxies
user_sidecar = SidecarProxy("user-service")
order_sidecar = SidecarProxy("order-service")
payment_sidecar = SidecarProxy("payment-service")
# Simulate traffic
for _ in range(5):
user_sidecar.call(user_handler, "/users/1", 1)
order_sidecar.call(order_handler, "/orders?user=1", 1)
payment_sidecar.call(broken_handler, "/charge")
# Report metrics
for proxy in [user_sidecar, order_sidecar, payment_sidecar]:
m = proxy.metrics.summary()
if m:
print(f"{proxy._name}: {m}")Solution
The solution is above. In production, Istio, Linkerd, or Envoy implement service mesh functionality:
What a real sidecar adds:
- mTLS: Mutual TLS between all service calls — no plaintext inter-service traffic.
- Load balancing: Round-robin, least-connections, locality-aware routing.
- Circuit breaking: Per-connection failure tracking.
- Request tracing: Distributed trace IDs propagated via headers (Jaeger, Zipkin).
- Metrics: Prometheus metrics for every route.
The key benefit: These concerns are handled by the mesh, not by application code. Services can be written in any language. No shared library needed.
from typing import Callable
from dataclasses import dataclass, field
import time
# Simulate a service mesh with sidecar proxies:
# - Each service has a sidecar that intercepts all calls
# - Sidecar adds: request tracing, circuit breaking, timeout enforcement
# - Collect metrics: latency histogram, success rate per routeExpected Output
See solution for expected outputHints
Hint 1: The sidecar wraps a service handler. It measures latency, catches timeouts, records metrics.
Hint 2: Use a shared metrics store keyed by service name. The sidecar records each call result.
Implement event sourcing for an order management system where state is derived by replaying an event log.
from dataclasses import dataclass, field
from typing import Optional, Union
from datetime import datetime
# ── EVENTS ───────────────────────────────────────────────────────────────────
@dataclass
class OrderCreated:
order_id: int
customer_email: str
occurred_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
@dataclass
class ItemAdded:
order_id: int
item_id: int
quantity: int
price_cents: int
occurred_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
@dataclass
class OrderPaid:
order_id: int
transaction_id: str
occurred_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
@dataclass
class OrderCancelled:
order_id: int
reason: str
occurred_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
Event = Union[OrderCreated, ItemAdded, OrderPaid, OrderCancelled]
# ── READ MODEL ────────────────────────────────────────────────────────────────
@dataclass
class OrderReadModel:
order_id: int = 0
customer_email: str = ""
status: str = "NONE"
items: list = field(default_factory=list)
total_cents: int = 0
def apply_event(state: OrderReadModel, event: Event) -> OrderReadModel:
if isinstance(event, OrderCreated):
return OrderReadModel(order_id=event.order_id, customer_email=event.customer_email, status="CREATED")
elif isinstance(event, ItemAdded):
new_items = state.items + [{"item_id": event.item_id, "qty": event.quantity}]
return OrderReadModel(state.order_id, state.customer_email, state.status,
new_items, state.total_cents + event.quantity * event.price_cents)
elif isinstance(event, OrderPaid):
return OrderReadModel(state.order_id, state.customer_email, "PAID", state.items, state.total_cents)
elif isinstance(event, OrderCancelled):
return OrderReadModel(state.order_id, state.customer_email, "CANCELLED", state.items, 0)
return state
def rebuild_state(events: list[Event]) -> OrderReadModel:
state = OrderReadModel()
for event in events:
state = apply_event(state, event)
return state
# ── COMMAND HANDLER ───────────────────────────────────────────────────────────
class OrderCommandHandler:
def __init__(self):
self._event_log: list[Event] = []
def create_order(self, order_id: int, customer_email: str) -> None:
self._event_log.append(OrderCreated(order_id, customer_email))
def add_item(self, order_id: int, item_id: int, qty: int, price: int) -> None:
state = rebuild_state([e for e in self._event_log if getattr(e, "order_id", None) == order_id])
if state.status == "CANCELLED":
raise ValueError("Cannot add items to a cancelled order")
self._event_log.append(ItemAdded(order_id, item_id, qty, price))
def pay(self, order_id: int, txn_id: str) -> None:
self._event_log.append(OrderPaid(order_id, txn_id))
def get_state(self, order_id: int) -> OrderReadModel:
events = [e for e in self._event_log if getattr(e, "order_id", None) == order_id]
return rebuild_state(events)
def get_history(self, order_id: int) -> list[str]:
events = [e for e in self._event_log if getattr(e, "order_id", None) == order_id]
return [type(e).__name__ for e in events]
# Test
handler = OrderCommandHandler()
handler.create_order(1, "[email protected]")
handler.add_item(1, 42, 2, 999)
handler.add_item(1, 7, 1, 3000)
handler.pay(1, "txn_abc")
state = handler.get_state(1)
print(f"Order {state.order_id} state: {state.status}, items={len(state.items)}, total={state.total_cents}")
print(f"Event log: {len(handler._event_log)} events")
print(f"History: {' -> '.join(handler.get_history(1))}")Solution
The solution is above. Event sourcing gives you:
- Complete audit log: Every change is an event. No data is ever deleted. Regulators love this.
- Time travel: Replay events up to any point in time to reconstruct historical state.
- Debugging: Reproduce any production issue by replaying its event sequence.
- CQRS (Command Query Responsibility Segregation): Commands write events; queries read from denormalized read models built by replaying events.
The trade-off: Reading current state requires replaying all events (mitigated by snapshots). Events are append-only so you can never "fix" a mistake — you append a correcting event.
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
# Implement event sourcing for an order:
# - State is derived by replaying events (not stored directly)
# - Events: OrderCreated, ItemAdded, OrderPaid, OrderCancelled
# - CommandHandler processes commands and appends events
# - ReadModel rebuilds current state by replaying event logExpected Output
Order 1 state: PAID, items=2, total=4998\nEvent log: 4 events\nHistory: OrderCreated -> ItemAdded -> ItemAdded -> OrderPaidHints
Hint 1: Events are immutable facts. Store them in a list. To get current state, start with empty state and apply each event in order.
Hint 2: Commands validate preconditions and produce events. Events do not validate — they just record what happened.
Build a complete service communication layer with registry, load balancing, circuit breaking, retry, and tracing.
from dataclasses import dataclass, field
from typing import Callable, Optional
import time, uuid, random
# ── SERVICE REGISTRY ─────────────────────────────────────────────────────────
@dataclass
class ServiceInstance:
host: str
port: int
@property
def address(self) -> str:
return f"{self.host}:{self.port}"
class ServiceRegistry:
def __init__(self):
self._services: dict[str, list[ServiceInstance]] = {}
def register(self, name: str, host: str, port: int) -> None:
self._services.setdefault(name, []).append(ServiceInstance(host, port))
def discover(self, name: str) -> list[ServiceInstance]:
return self._services.get(name, [])
# ── LOAD BALANCER ─────────────────────────────────────────────────────────────
class RoundRobinBalancer:
def __init__(self):
self._counters: dict[str, int] = {}
def pick(self, instances: list[ServiceInstance]) -> Optional[ServiceInstance]:
if not instances:
return None
key = str(instances)
idx = self._counters.get(key, 0) % len(instances)
self._counters[key] = idx + 1
return instances[idx]
# ── TRACING ───────────────────────────────────────────────────────────────────
@dataclass
class Span:
trace_id: str
service: str
latency_ms: float
status: str
class Tracer:
def __init__(self):
self._spans: list[Span] = []
def record(self, span: Span) -> None:
self._spans.append(span)
def print_trace(self, trace_id: str) -> None:
spans = [s for s in self._spans if s.trace_id == trace_id]
for s in spans:
print(f" [{s.trace_id[:8]}] {s.service} {s.status} ({s.latency_ms:.1f}ms)")
# ── SERVICE CLIENT ────────────────────────────────────────────────────────────
class ServiceClient:
def __init__(self, registry: ServiceRegistry, balancer: RoundRobinBalancer, tracer: Tracer):
self._registry = registry
self._balancer = balancer
self._tracer = tracer
self._failures: dict[str, int] = {}
self._circuit_open: dict[str, bool] = {}
def call(self, service: str, handler: Callable, trace_id: str, retries: int = 2) -> dict:
if self._circuit_open.get(service):
return {"status": 503, "error": f"{service} circuit open"}
instances = self._registry.discover(service)
instance = self._balancer.pick(instances)
if not instance:
return {"status": 503, "error": f"{service} not found"}
for attempt in range(1, retries + 1):
start = time.perf_counter()
try:
result = handler(instance)
elapsed = (time.perf_counter() - start) * 1000
self._failures[service] = 0
self._tracer.record(Span(trace_id, service, elapsed, "ok"))
return result
except Exception as e:
elapsed = (time.perf_counter() - start) * 1000
self._failures[service] = self._failures.get(service, 0) + 1
self._tracer.record(Span(trace_id, service, elapsed, "error"))
if self._failures.get(service, 0) >= 3:
self._circuit_open[service] = True
if attempt == retries:
return {"status": 500, "error": str(e)}
return {"status": 500, "error": "all retries failed"}
# ── SETUP ────────────────────────────────────────────────────────────────────
registry = ServiceRegistry()
registry.register("user-service", "user-svc-1", 8001)
registry.register("user-service", "user-svc-2", 8002)
registry.register("payment-service", "payment-svc-1", 9001)
balancer = RoundRobinBalancer()
tracer = Tracer()
client = ServiceClient(registry, balancer, tracer)
def user_handler(instance: ServiceInstance) -> dict:
return {"status": 200, "user": {"id": 1, "name": "Alice"}}
def payment_handler(instance: ServiceInstance) -> dict:
return {"status": 200, "transaction": "txn_abc"}
trace_id = str(uuid.uuid4())
print(f"Trace: {trace_id[:8]}")
u = client.call("user-service", user_handler, trace_id)
p = client.call("payment-service", payment_handler, trace_id)
print(f"User: {u['user']}, Payment: {p['transaction']}")
tracer.print_trace(trace_id)
# Show load balancing
trace_id2 = str(uuid.uuid4())
for _ in range(3):
r = client.call("user-service", lambda i: {"status": 200, "instance": i.port}, trace_id2)
print(f"LB picked port: {r['instance']}")Solution
The solution demonstrates a complete service communication layer. In production, this is provided by:
- Service registry: Consul, etcd, Kubernetes DNS
- Load balancing: Envoy, Nginx, AWS ALB
- Circuit breaker: Hystrix (Java), Polly (.NET),
tenacity(Python) - Tracing: Jaeger, Zipkin, AWS X-Ray
The point of building it from scratch: Understanding the components makes debugging production issues much easier. When circuit breakers are opening in Istio, you know exactly what is happening because you have implemented it yourself.
# Build a complete service communication layer with:
# - Service registry (registration + discovery)
# - Load balancer (round-robin)
# - Circuit breaker per service
# - Retry with backoff
# - Request tracing
# Show order service calling user service and payment serviceExpected Output
See solution for expected outputHints
Hint 1: Service registry maps service name to a list of (host, port) tuples. Load balancer picks one using round-robin (rotating index).
Hint 2: Wrap each inter-service call: service registry lookup -> load balance -> circuit breaker check -> retry -> trace.
