Python Hexagonal Architecture (Ports and Adapters): Practice Problems & Exercises
Practice: Hexagonal Architecture (Ports and Adapters)
← Back to lessonEasy
Write a classify_component(name) function that returns "port" or "adapter" based on naming conventions.
def classify_component(name: str) -> str:
if name.startswith("I") and name[1:2].isupper():
return "port"
return "adapter"
components = [
"IPaymentGateway", "StripeAdapter", "IUserRepository",
"PostgresUserRepository", "SendGridEmailAdapter", "IEmailSender",
]
for c in components:
print(f"{c}: {classify_component(c)}")
Solution
def classify_component(name: str) -> str:
if (name.startswith("I") and len(name) > 1 and name[1].isupper()) or name.endswith("Port"):
return "port"
return "adapter"
Ports vs Adapters:
- Port (inbound / driving): Defines how the outside world drives the application — e.g.,
IOrderService. FastAPI calls this port. - Port (outbound / driven): Defines what the application needs from the outside world — e.g.,
IPaymentGateway. The application calls this port. - Adapter: Concrete code that connects one side to the other —
StripeAdapterimplementsIPaymentGatewayusing the Stripe SDK.
The hexagon has two sides:
- Left side (driving): External actors (HTTP, CLI, tests) call the application through inbound ports.
- Right side (driven): The application calls the outside world through outbound ports implemented by adapters.
Expected Output
IPaymentGateway: port\nStripeAdapter: adapter\nIUserRepository: port\nPostgresUserRepository: adapter\nSendGridEmailAdapter: adapter\nIEmailSender: portHints
Hint 1: Ports are abstract interfaces (Protocol or ABC) that define what the application needs. Adapters are concrete implementations that connect to the real world.
Hint 2: Naming convention: ports are usually I-prefixed interfaces. Adapters are concrete classes that mention the technology (Stripe, Postgres, SendGrid).
Define an IEmailPort driven port and implement three adapters: SMTP (simulated), Console, and Spy (for testing).
from typing import Protocol
from dataclasses import dataclass, field
class IEmailPort(Protocol):
def send(self, to: str, subject: str, body: str) -> None: ...
class SmtpEmailAdapter:
"""Simulates SMTP - in real life this would use smtplib."""
def send(self, to: str, subject: str, body: str) -> None:
print(f"SMTP: sent to {to}")
class ConsoleEmailAdapter:
def send(self, to: str, subject: str, body: str) -> None:
print(f"Console: [EMAIL] to {to}: {subject}")
@dataclass
class SpyEmailAdapter:
_sent: list = field(default_factory=list)
def send(self, to: str, subject: str, body: str) -> None:
self._sent.append({"to": to, "subject": subject})
def sent_count(self) -> int:
return len(self._sent)
def send_welcome(notifier: IEmailPort, email: str) -> None:
notifier.send(email, "Welcome!", "Thanks for signing up.")
send_welcome(SmtpEmailAdapter(), "[email protected]")
send_welcome(ConsoleEmailAdapter(), "[email protected]")
spy = SpyEmailAdapter()
send_welcome(spy, "[email protected]")
print(f"Logged: {spy.sent_count()} emails")Solution
The code above is the complete solution. Key points:
The Spy adapter pattern is crucial for testing. Instead of mocking (unittest.mock.patch), you provide a real implementation that records calls. Tests assert on spy.sent_count() or spy._sent[0]["to"] — no mock framework needed.
Structural subtyping: None of the three adapters inherit from IEmailPort. Python's Protocol uses structural subtyping — if a class has the right methods with compatible signatures, it satisfies the protocol automatically.
Expected Output
SMTP: sent to [email protected]\nConsole: [EMAIL] to [email protected]: Welcome!\nLogged: 1 emailsHints
Hint 1: A driven (outbound) port is a Protocol that the application uses to reach outside systems. Define it with typing.Protocol.
Hint 2: Write three implementations: SmtpEmailAdapter, ConsoleEmailAdapter, and SpyEmailAdapter (records calls for testing).
Implement an IBankingPort inbound port and a BankingService that satisfies it, with an in-memory store.
from typing import Protocol
from dataclasses import dataclass, field
class IBankingPort(Protocol):
def create_account(self, email: str) -> int: ...
def deposit(self, account_id: int, amount_cents: int) -> int: ...
def withdraw(self, account_id: int, amount_cents: int) -> int: ...
def get_balance(self, account_id: int) -> int: ...
@dataclass
class Account:
id: int
email: str
balance: int = 0
@dataclass
class BankingService:
_accounts: dict = field(default_factory=dict)
_next_id: int = 1
def create_account(self, email: str) -> int:
acc = Account(self._next_id, email)
self._accounts[self._next_id] = acc
self._next_id += 1
print(f"Created account {acc.id} for {acc.email}")
return acc.id
def deposit(self, account_id: int, amount_cents: int) -> int:
acc = self._accounts[account_id]
acc.balance += amount_cents
print(f"Deposited {amount_cents}. New balance: {acc.balance}")
return acc.balance
def withdraw(self, account_id: int, amount_cents: int) -> int:
acc = self._accounts[account_id]
if acc.balance < amount_cents:
raise ValueError("Insufficient funds")
acc.balance -= amount_cents
print(f"Withdrew {amount_cents}. New balance: {acc.balance}")
return acc.balance
def get_balance(self, account_id: int) -> int:
return self._accounts[account_id].balance
svc: IBankingPort = BankingService()
acc_id = svc.create_account("[email protected]")
print(f"Balance: {svc.get_balance(acc_id)}")
svc.deposit(acc_id, 5000)
svc.withdraw(acc_id, 2000)
try:
svc.withdraw(acc_id, 99999)
except ValueError as e:
print(e)Solution
The solution is above. BankingService does not explicitly inherit from IBankingPort yet we can type-annotate svc: IBankingPort = BankingService() — structural subtyping validates this.
Driving adapters (left side) would look like:
@app.post("/accounts/{account_id}/deposit")
def deposit_endpoint(account_id: int, body: dict, svc: IBankingPort = Depends(get_service)):
balance = svc.deposit(account_id, body["amount_cents"])
return {"balance": balance}
The HTTP adapter calls IBankingPort.deposit() — it never knows about BankingService internals.
Expected Output
Created account 1 for [email protected]\nBalance: 0\nDeposited 5000. New balance: 5000\nWithdrew 2000. New balance: 3000\nInsufficient fundsHints
Hint 1: An inbound (driving) port defines the application API that external actors call. The application service implements this port.
Hint 2: External actors (HTTP routers, CLI) call the service through IBankingPort. They never call domain objects directly.
Medium
Write a run_workflow function that uses an IProductRepository port, then verify that swapping two different adapter implementations produces identical results.
from typing import Protocol, Optional
from dataclasses import dataclass, field
@dataclass
class Product:
id: int
name: str
price_cents: int
class IProductRepository(Protocol):
def save(self, product: Product) -> None: ...
def find(self, product_id: int) -> Optional[Product]: ...
class InMemoryProductRepository:
def __init__(self):
self._store: dict = {}
def save(self, product: Product) -> None:
self._store[product.id] = product
print(f"[InMemory] Saved product {product.id}: {product.name}")
def find(self, product_id: int) -> Optional[Product]:
return self._store.get(product_id)
class ListProductRepository:
def __init__(self):
self._items: list = []
def save(self, product: Product) -> None:
self._items = [p for p in self._items if p.id != product.id]
self._items.append(product)
print(f"[List] Saved product {product.id}: {product.name}")
def find(self, product_id: int) -> Optional[Product]:
for p in self._items:
if p.id == product_id:
return p
return None
def run_workflow(repo: IProductRepository, label: str) -> Optional[str]:
repo.save(Product(1, "Widget", 999))
found = repo.find(1)
if found:
print(f"[{label}] Found: {found.name}")
return found.name if found else None
r1 = run_workflow(InMemoryProductRepository(), "InMemory")
print("---")
r2 = run_workflow(ListProductRepository(), "List")
print("---")
print(f"Both adapters behave identically: {r1 == r2}")Solution
The solution is above. The key: run_workflow does not know which adapter it receives. Both satisfy IProductRepository structurally.
Contract testing with pytest:
@pytest.mark.parametrize("repo", [
InMemoryProductRepository(),
ListProductRepository(),
])
def test_save_and_find(repo):
repo.save(Product(1, "Widget", 999))
found = repo.find(1)
assert found is not None
assert found.name == "Widget"
Every adapter must pass the same contract tests. A new RedisProductRepository must pass these tests before it can be used in production.
Expected Output
[InMemory] Saved product 1: Widget\n[InMemory] Found: Widget\n---\n[List] Saved product 1: Widget\n[List] Found: Widget\n---\nBoth adapters behave identically: TrueHints
Hint 1: Write the application logic (run_workflow) first. Then write two repository adapters. Verify they produce the same results.
Hint 2: A pure function that accepts IProductRepository and exercises all operations is the ideal contract test harness.
Implement a composite adapter that fans out to multiple notification channels, and a use case that only calls the port once.
from typing import Protocol
from dataclasses import dataclass, field
class INotificationPort(Protocol):
def notify(self, user_id: int, message: str) -> None: ...
class EmailNotificationAdapter:
def notify(self, user_id: int, message: str) -> None:
print(f"[EMAIL] User {user_id}: {message}")
class SmsNotificationAdapter:
def notify(self, user_id: int, message: str) -> None:
print(f"[SMS] User {user_id}: {message}")
@dataclass
class SpyNotificationAdapter:
_calls: list = field(default_factory=list)
def notify(self, user_id: int, message: str) -> None:
self._calls.append((user_id, message))
def call_count(self) -> int:
return len(self._calls)
class CompositeNotificationAdapter:
def __init__(self, *adapters: INotificationPort):
self._adapters = list(adapters)
def notify(self, user_id: int, message: str) -> None:
for adapter in self._adapters:
adapter.notify(user_id, message)
class OrderReadyUseCase:
def __init__(self, notifier: INotificationPort):
self._notifier = notifier
def execute(self, user_id: int, order_id: int) -> None:
self._notifier.notify(user_id, "Your order is ready")
spy = SpyNotificationAdapter()
composite = CompositeNotificationAdapter(
EmailNotificationAdapter(),
SmsNotificationAdapter(),
spy,
)
use_case = OrderReadyUseCase(composite)
use_case.execute(user_id=1, order_id=42)
print(f"Notifications sent: {spy.call_count()}")Solution
The solution is above. Adding a new notification channel (push, Slack, webhook) requires zero changes to OrderReadyUseCase — only the composition root changes.
Why composite beats a direct list in the use case:
# Bad: use case knows about all channels
def execute(self, user_id, order_id):
self._email.notify(...) # changes when new channel added
self._sms.notify(...)
# Good: composite hides fan-out
def execute(self, user_id, order_id):
self._notifier.notify(...) # never changes
from typing import Protocol
from dataclasses import dataclass, field
class INotificationPort(Protocol):
def notify(self, user_id: int, message: str) -> None: ...
# Implement:
# 1. EmailNotificationAdapter
# 2. SmsNotificationAdapter
# 3. CompositeNotificationAdapter that fans out to both
# 4. OrderReadyUseCase that uses INotificationPortExpected Output
[EMAIL] User 1: Your order is ready\n[SMS] User 1: Your order is ready\nNotifications sent: 2Hints
Hint 1: The composite adapter is itself an adapter - it implements INotificationPort and holds a list of other INotificationPort instances.
Hint 2: Fan-out means calling all inner adapters. The use case only calls notify() once and all channels fire.
Implement a CachingProductAdapter that wraps any IProductPort and transparently caches results.
from typing import Protocol, Optional
from dataclasses import dataclass, field
class IProductPort(Protocol):
def fetch(self, product_id: int) -> Optional[dict]: ...
class SlowProductAdapter:
def __init__(self):
self.call_count = 0
self._db = {1: {"name": "Widget", "price": 999}, 2: {"name": "Gadget", "price": 1999}}
def fetch(self, product_id: int) -> Optional[dict]:
self.call_count += 1
print(f"Slow fetch called for ID {product_id}")
return self._db.get(product_id)
@dataclass
class CachingProductAdapter:
_inner: IProductPort
_cache: dict = field(default_factory=dict)
def fetch(self, product_id: int) -> Optional[dict]:
if product_id in self._cache:
return self._cache[product_id]
result = self._inner.fetch(product_id)
if result is not None:
self._cache[product_id] = result
return result
# Compose
slow = SlowProductAdapter()
cached = CachingProductAdapter(_inner=slow)
r1 = cached.fetch(1)
print(f"Product: {r1['name']}")
r2 = cached.fetch(1) # should hit cache
print(f"Product: {r2['name']} (from cache)")
print(f"Slow fetch calls total: {slow.call_count}")Solution
The solution is above. The critical insight: CachingProductAdapter satisfies IProductPort structurally, so any code that accepts IProductPort will work with the cached version transparently.
Stacking adapters:
# You can stack multiple adapters
slow = SlowProductAdapter()
cached = CachingProductAdapter(_inner=slow)
logged = LoggingProductAdapter(_inner=cached) # logs then delegates to cache
use_case = GetProductUseCase(repo=logged)
Each layer adds behaviour without modifying the layers below it. This is the Decorator pattern applied at the port/adapter boundary.
TTL-based cache:
from time import time
@dataclass
class TtlCachingAdapter:
_inner: IProductPort
_ttl_seconds: int = 60
_cache: dict = field(default_factory=dict) # id -> (value, expires_at)
def fetch(self, product_id: int) -> Optional[dict]:
if product_id in self._cache:
value, expires_at = self._cache[product_id]
if time() < expires_at:
return value
result = self._inner.fetch(product_id)
if result is not None:
self._cache[product_id] = (result, time() + self._ttl_seconds)
return result
from typing import Protocol, Optional
from dataclasses import dataclass, field
class IProductPort(Protocol):
def fetch(self, product_id: int) -> Optional[dict]: ...
# Implement:
# 1. SlowProductAdapter (simulates a slow DB call with a counter)
# 2. CachingProductAdapter (wraps any IProductPort, caches results in a dict)
# Show that the slow adapter is only called once for repeated fetches of the same IDExpected Output
Slow fetch called for ID 1\nProduct: Widget\nProduct: Widget (from cache)\nSlow fetch calls total: 1Hints
Hint 1: The caching adapter wraps an IProductPort. It checks its internal dict first. On a miss, it calls the inner adapter and stores the result.
Hint 2: This is the Decorator pattern applied to a hexagonal port — the cache is transparent to the caller.
Build a TodoService with two driving adapters: a CLI adapter and an HTTP adapter. Both drive the same hexagon.
from typing import Protocol
from dataclasses import dataclass, field
class ITodoPort(Protocol):
def add(self, text: str) -> int: ...
def complete(self, todo_id: int) -> bool: ...
def list_pending(self) -> list[dict]: ...
@dataclass
class TodoService:
_todos: dict = field(default_factory=dict)
_next_id: int = 1
def add(self, text: str) -> int:
todo_id = self._next_id
self._todos[todo_id] = {"id": todo_id, "text": text, "done": False}
self._next_id += 1
return todo_id
def complete(self, todo_id: int) -> bool:
if todo_id not in self._todos:
return False
self._todos[todo_id]["done"] = True
return True
def list_pending(self) -> list[dict]:
return [t for t in self._todos.values() if not t["done"]]
class CliTodoAdapter:
def __init__(self, service: ITodoPort):
self._svc = service
def handle(self, command: str) -> None:
parts = command.strip().split(maxsplit=1)
action = parts[0]
if action == "add":
todo_id = self._svc.add(parts[1])
print(f"CLI: Added todo {todo_id}")
elif action == "done":
self._svc.complete(int(parts[1]))
print(f"CLI: Completed {parts[1]}")
elif action == "list":
pending = self._svc.list_pending()
print(f"CLI: Pending: {len(pending)}")
class HttpTodoAdapter:
def __init__(self, service: ITodoPort):
self._svc = service
def handle(self, request: dict) -> dict:
method = request["method"]
path = request["path"]
if method == "POST" and path == "/todos":
todo_id = self._svc.add(request["body"]["text"])
return {"status": 201, "body": {"id": todo_id, "text": request["body"]["text"]}}
elif method == "GET" and path == "/todos":
return {"status": 200, "body": {"todos": self._svc.list_pending()}}
elif method == "POST" and path.startswith("/todos/") and path.endswith("/complete"):
todo_id = int(path.split("/")[2])
self._svc.complete(todo_id)
return {"status": 200, "body": {"done": True}}
return {"status": 404, "body": {"error": "not found"}}
svc = TodoService()
cli = CliTodoAdapter(svc)
http = HttpTodoAdapter(svc)
cli.handle("add Buy milk")
cli.handle("list")
r = http.handle({"method": "POST", "path": "/todos", "body": {"text": "Write tests"}})
print(f"HTTP {r['status']}: {r['body']}")
r = http.handle({"method": "GET", "path": "/todos", "body": {}})
print(f"HTTP {r['status']}: {r['body']}")
cli.handle("done 1")
r = http.handle({"method": "GET", "path": "/todos", "body": {}})
print(f"HTTP {r['status']}: {r['body']}")Solution
The solution is above. Both adapters drive the same TodoService instance. The service is oblivious to whether it is being called from a CLI command, an HTTP request, or a test.
The hexagonal benefit in testing:
def test_add_and_complete():
svc = TodoService()
todo_id = svc.add("Buy milk")
assert len(svc.list_pending()) == 1
svc.complete(todo_id)
assert len(svc.list_pending()) == 0
Tests call the service directly (through ITodoPort) without any adapter overhead. No HTTP server, no subprocess, no CLI parsing.
from typing import Protocol
from dataclasses import dataclass, field
# Application core (the hexagon)
class ITodoPort(Protocol):
def add(self, text: str) -> int: ...
def complete(self, todo_id: int) -> bool: ...
def list_pending(self) -> list[dict]: ...
# Implement:
# 1. TodoService (the hexagon - satisfies ITodoPort)
# 2. CliTodoAdapter (parses command strings like "add Buy milk", "done 1", "list")
# 3. HttpTodoAdapter (processes request dicts like request to simulate HTTP handling)
# Show both adapters driving the same TodoServiceExpected Output
CLI: Added todo 1\nCLI: Pending: 1\nHTTP 201: {'id': 2, 'text': 'Write tests'}\nHTTP 200: {'todos': [{'id': 1, 'text': 'Buy milk'}, {'id': 2, 'text': 'Write tests'}]}\nCLI: Completed 1\nHTTP 200: {'todos': [{'id': 2, 'text': 'Write tests'}]}Hints
Hint 1: Both adapters call the same ITodoPort methods. The CLI adapter parses strings; the HTTP adapter parses dicts. The TodoService never knows which adapter called it.
Hint 2: Keep adapters thin - they translate between the external format and the port API. No business logic in adapters.
Hard
Implement a RetryPaymentAdapter that wraps any IPaymentPort and automatically retries on failure with exponential backoff.
from typing import Protocol
from dataclasses import dataclass, field
import time
class IPaymentPort(Protocol):
def charge(self, amount_cents: int, token: str) -> dict: ...
class FlakyPaymentAdapter:
"""Simulates an unreliable payment service."""
def __init__(self, fail_first_n: int = 2):
self._fail_first_n = fail_first_n
self._call_count = 0
def charge(self, amount_cents: int, token: str) -> dict:
self._call_count += 1
if self._call_count <= self._fail_first_n:
raise ConnectionError("Payment service unavailable")
return {"transaction_id": "txn_42", "status": "ok"}
class RetryPaymentAdapter:
def __init__(self, inner: IPaymentPort, max_retries: int = 3, base_delay: float = 0.0):
self._inner = inner
self._max_retries = max_retries
self._base_delay = base_delay # 0.0 for tests, >0 for production
def charge(self, amount_cents: int, token: str) -> dict:
last_exc: Exception = RuntimeError("No attempts made")
for attempt in range(1, self._max_retries + 1):
try:
result = self._inner.charge(amount_cents, token)
print(f"Attempt {attempt} succeeded: {result}")
return result
except Exception as e:
last_exc = e
print(f"Attempt {attempt} failed: {e}")
if attempt < self._max_retries and self._base_delay > 0:
time.sleep(self._base_delay * (2 ** (attempt - 1)))
raise last_exc
flaky = FlakyPaymentAdapter(fail_first_n=2)
retry = RetryPaymentAdapter(inner=flaky, max_retries=3, base_delay=0.0)
retry.charge(9999, "tok_test")Solution
The solution is above. The pattern is identical to the caching adapter: RetryPaymentAdapter wraps IPaymentPort, so it is transparent to any code that uses IPaymentPort.
Production configuration:
# Composition root
real_gateway = StripePaymentAdapter(api_key=settings.STRIPE_KEY)
resilient_gateway = RetryPaymentAdapter(
inner=real_gateway,
max_retries=3,
base_delay=0.5, # 0.5s, 1.0s, 2.0s
)
circuit_breaker = CircuitBreakerPaymentAdapter(inner=resilient_gateway, threshold=5)
use_case = ChargeCustomerUseCase(payment=circuit_breaker)
Each adapter layer adds a resilience concern without any knowledge of the others. The use case ChargeCustomerUseCase only knows about IPaymentPort.
from typing import Protocol, Optional
from dataclasses import dataclass, field
import time
class IPaymentPort(Protocol):
def charge(self, amount_cents: int, token: str) -> dict: ...
# Implement:
# 1. FlakyPaymentAdapter (fails the first N calls, then succeeds)
# 2. RetryPaymentAdapter (wraps IPaymentPort, retries up to max_retries times with exponential backoff)
# Show 3 retries before successExpected Output
Attempt 1 failed: Payment service unavailable\nAttempt 2 failed: Payment service unavailable\nAttempt 3 succeeded: {'transaction_id': 'txn_42', 'status': 'ok'}Hints
Hint 1: FlakyPaymentAdapter keeps a call counter. It raises an exception for the first N calls, then returns success.
Hint 2: RetryPaymentAdapter catches the exception, sleeps (or skips sleep in tests), and retries up to max_retries times. Use a loop, not recursion.
Build an architecture fitness function that parses Python source files and reports violations of hexagonal architecture rules.
import ast
ADAPTER_PREFIXES = ("adapter", "infrastructure", "adapters")
CORE_PREFIXES = ("app", "application", "domain", "core")
def get_imports(source: str) -> list[str]:
tree = ast.parse(source)
imports = []
for node in ast.walk(tree):
if isinstance(node, ast.ImportFrom) and node.module:
imports.append(node.module)
elif isinstance(node, ast.Import):
for alias in node.names:
imports.append(alias.name)
return imports
def is_core(module_path: str) -> bool:
return any(module_path.startswith(p) for p in CORE_PREFIXES)
def is_adapter(module_path: str) -> bool:
return any(module_path.startswith(p) for p in ADAPTER_PREFIXES)
def check_hexagonal_rules(project: dict[str, str]) -> list[str]:
violations = []
for filepath, source in project.items():
module_name = filepath.replace("/", ".").removesuffix(".py")
if not is_core(module_name):
continue
for imp in get_imports(source):
if is_adapter(imp):
violations.append(
f"VIOLATION: {filepath} imports {imp} (core must not depend on adapters)"
)
return violations
# Test 1: clean project
clean_project = {
"app/core.py": "from app.ports import IPaymentPort\n",
"app/ports.py": "from typing import Protocol\n",
"adapter/stripe_adapter.py": "from app.ports import IPaymentPort\n",
}
result = check_hexagonal_rules(clean_project)
print("No violations" if not result else "\n".join(result))
print("---")
# Test 2: violation
dirty_project = {
"app/core.py": (
"from adapter.stripe_adapter import StripeAdapter\n"
"from adapter.sendgrid_adapter import SendGridAdapter\n"
),
"adapter/stripe_adapter.py": "from app.ports import IPaymentPort\n",
"adapter/sendgrid_adapter.py": "from app.ports import IEmailPort\n",
}
result = check_hexagonal_rules(dirty_project)
print("\n".join(result))Solution
The solution is above. Run this as a pytest test in CI:
def test_hexagonal_rules():
import glob
project = {}
for path in glob.glob("src/**/*.py", recursive=True):
project[path] = open(path).read()
violations = check_hexagonal_rules(project)
assert violations == [], "Architecture violations found:\n" + "\n".join(violations)
Extending the checks:
def check_hexagonal_rules_extended(project):
violations = []
for filepath, source in project.items():
module_name = filepath.replace("/", ".").removesuffix(".py")
imports = get_imports(source)
# Rule 1: core must not import adapters
if is_core(module_name):
for imp in imports:
if is_adapter(imp):
violations.append(f"VIOLATION: {filepath} imports {imp}")
# Rule 2: adapters must not import other adapters directly
# (they should communicate through ports)
if is_adapter(module_name):
for imp in imports:
if is_adapter(imp) and imp != module_name:
violations.append(f"ADAPTER COUPLING: {filepath} imports {imp}")
return violations
import ast
from pathlib import Path
# Given a mock project layout as a dict of filename -> source code,
# write a function that verifies:
# 1. Application core modules do not import from adapter modules
# 2. No circular imports between core modules
def check_hexagonal_rules(project: dict[str, str]) -> list[str]:
"""Return a list of violation messages."""
passExpected Output
No violations\n---\nVIOLATION: app/core.py imports adapter.stripe_adapter (core must not depend on adapters)\nVIOLATION: app/core.py imports adapter.sendgrid_adapter (core must not depend on adapters)Hints
Hint 1: Use ast.parse() and ast.walk() to find all ImportFrom nodes in each file. Check if the imported module is in the "adapter" namespace while the importing file is in the "app" (core) namespace.
Hint 2: Core modules are those NOT in adapter/ or infrastructure/ directories. Violations occur when a core module imports from adapter/ or infrastructure/.
Define an async IAsyncCachePort, implement two adapters, and use them in an async use case.
import asyncio
from typing import Protocol, Optional
from dataclasses import dataclass, field
class IAsyncCachePort(Protocol):
async def get(self, key: str) -> Optional[str]: ...
async def set(self, key: str, value: str) -> None: ...
class InMemoryAsyncCacheAdapter:
def __init__(self):
self._store: dict = {}
async def get(self, key: str) -> Optional[str]:
return self._store.get(key)
async def set(self, key: str, value: str) -> None:
self._store[key] = value
class LoggingAsyncCacheAdapter:
def __init__(self, inner: IAsyncCachePort):
self._inner = inner
async def get(self, key: str) -> Optional[str]:
result = await self._inner.get(key)
print(f"[LOG] GET {key} -> {result}")
return result
async def set(self, key: str, value: str) -> None:
print(f"[LOG] SET {key} = {value}")
await self._inner.set(key, value)
async def get_user_name(cache: IAsyncCachePort, user_id: int) -> Optional[str]:
key = f"user:{user_id}"
name = await cache.get(key)
if name:
print(f"Cache hit: {name}")
else:
print(f"Cache miss for user:{user_id}")
return name
async def main():
mem = InMemoryAsyncCacheAdapter()
logged = LoggingAsyncCacheAdapter(mem)
await logged.set("user:1", "Alice")
await get_user_name(logged, 1)
await get_user_name(logged, 99)
asyncio.run(main())Solution
The solution is above. Async protocols work identically to sync ones in terms of hexagonal architecture — the only difference is await at the call sites.
Testing async adapters:
import pytest
@pytest.mark.asyncio
async def test_cache_miss():
cache = InMemoryAsyncCacheAdapter()
result = await cache.get("missing_key")
assert result is None
@pytest.mark.asyncio
async def test_cache_round_trip():
cache = InMemoryAsyncCacheAdapter()
await cache.set("x", "hello")
assert await cache.get("x") == "hello"
Redis adapter (production):
class RedisAsyncCacheAdapter:
def __init__(self, client): # aioredis client
self._client = client
async def get(self, key: str) -> Optional[str]:
value = await self._client.get(key)
return value.decode() if value else None
async def set(self, key: str, value: str) -> None:
await self._client.set(key, value)
Same interface, zero changes to the use case.
import asyncio
from typing import Protocol, Optional
from dataclasses import dataclass, field
# Define async versions of the port and two adapters:
# 1. IAsyncCachePort (async Protocol)
# 2. InMemoryAsyncCacheAdapter
# 3. LoggingAsyncCacheAdapter (wraps IAsyncCachePort and logs all calls)
# 4. An async use case that uses IAsyncCachePortExpected Output
[LOG] SET user:1 = Alice\n[LOG] GET user:1 -> Alice\nCache hit: Alice\n[LOG] GET user:99 -> None\nCache miss for user:99Hints
Hint 1: Async Protocols work the same way as sync ones — just mark methods with async def. The runtime check for structural subtyping still applies.
Hint 2: The logging adapter wraps IAsyncCachePort with async methods that await the inner adapter. Use async def get/set in both the adapter and the wrapper.
Build a complete hexagonal order processing system with inbound and outbound ports and full adapter implementations.
from typing import Protocol, Optional
from dataclasses import dataclass, field
# ── PORTS ─────────────────────────────────────────────────────────────────────
class IOrderService(Protocol):
def place_order(self, customer: str, item_id: int, qty: int, payment_token: str) -> dict: ...
def get_order(self, order_id: int) -> Optional[dict]: ...
class IInventoryPort(Protocol):
def reserve(self, item_id: int, qty: int) -> bool: ...
def release(self, item_id: int, qty: int) -> None: ...
class IPaymentPort(Protocol):
def charge(self, token: str, amount_cents: int) -> dict: ...
class INotificationPort(Protocol):
def notify(self, customer: str, message: str) -> None: ...
# ── APPLICATION CORE ──────────────────────────────────────────────────────────
@dataclass
class OrderService:
_inventory: IInventoryPort
_payment: IPaymentPort
_notifier: INotificationPort
_orders: dict = field(default_factory=dict)
_next_id: int = 1
PRICES = {1: 1000, 2: 2500, 3: 500} # item_id -> price_cents
def place_order(self, customer: str, item_id: int, qty: int, payment_token: str) -> dict:
if not self._inventory.reserve(item_id, qty):
return {"error": "insufficient inventory"}
amount = self.PRICES.get(item_id, 0) * qty
try:
txn = self._payment.charge(payment_token, amount)
except Exception as e:
self._inventory.release(item_id, qty)
return {"error": str(e)}
order_id = self._next_id
self._next_id += 1
order = {"id": order_id, "customer": customer, "item_id": item_id, "qty": qty, "txn": txn["id"]}
self._orders[order_id] = order
self._notifier.notify(customer, f"Order {order_id} confirmed!")
return order
def get_order(self, order_id: int) -> Optional[dict]:
return self._orders.get(order_id)
# ── ADAPTERS ──────────────────────────────────────────────────────────────────
class InMemoryInventory:
def __init__(self, stock: dict):
self._stock = dict(stock)
def reserve(self, item_id: int, qty: int) -> bool:
available = self._stock.get(item_id, 0)
if available < qty:
return False
self._stock[item_id] -= qty
return True
def release(self, item_id: int, qty: int) -> None:
self._stock[item_id] = self._stock.get(item_id, 0) + qty
class FakePaymentAdapter:
def __init__(self, should_fail: bool = False):
self._should_fail = should_fail
self._counter = 0
def charge(self, token: str, amount_cents: int) -> dict:
if self._should_fail:
raise RuntimeError("Payment declined")
self._counter += 1
return {"id": f"txn_{self._counter}", "status": "ok"}
class ConsoleNotifier:
def notify(self, customer: str, message: str) -> None:
print(f"[NOTIFY] {customer}: {message}")
class CliOrderAdapter:
def __init__(self, svc: IOrderService):
self._svc = svc
def handle(self, cmd: str) -> None:
parts = cmd.split()
if parts[0] == "order":
result = self._svc.place_order(parts[1], int(parts[2]), int(parts[3]), parts[4])
print(f"CLI result: {result}")
elif parts[0] == "get":
print(f"CLI get: {self._svc.get_order(int(parts[1]))}")
# ── COMPOSITION ROOT ──────────────────────────────────────────────────────────
inventory = InMemoryInventory({1: 10, 2: 5, 3: 100})
payment = FakePaymentAdapter()
notifier = ConsoleNotifier()
svc = OrderService(inventory, payment, notifier)
cli = CliOrderAdapter(svc)
cli.handle("order [email protected] 1 2 tok_visa")
cli.handle("get 1")
cli.handle("order [email protected] 2 10 tok_mc") # fails: only 5 in stockSolution
The solution is above. Replacing any adapter requires changing exactly one line in the composition root:
# Switch to real payment gateway
from adapters.stripe import StripePaymentAdapter
payment = StripePaymentAdapter(api_key=settings.STRIPE_KEY)
# Switch to PostgreSQL inventory
from adapters.postgres import PostgresInventoryAdapter
inventory = PostgresInventoryAdapter(session=db_session)
# Everything else unchanged
svc = OrderService(inventory, payment, notifier)
The hexagonal architecture pays off because:
- You can test
OrderServicewith any combination of fake adapters — no real DB, no real Stripe. - You can switch payment gateways (Stripe to PayPal) without touching
OrderService. - The
CliOrderAdapterand any futureHttpOrderAdapterboth call the sameIOrderServiceport.
# Build a complete hexagonal order system with:
# Inbound port: IOrderService (place_order, get_order, cancel_order)
# Outbound ports: IInventoryPort, IPaymentPort, INotificationPort
# Application core: OrderService implementing IOrderService
# Adapters: InMemoryInventory, FakePayment, ConsoleNotifier
# Drive it with two adapters: CliOrderAdapter and a test functionExpected Output
See solution for expected outputHints
Hint 1: Start with all port interfaces, then build the OrderService core, then build adapters one by one, then compose at the bottom.
Hint 2: The OrderService should call IInventoryPort.reserve(), IPaymentPort.charge(), and INotificationPort.notify() in sequence. If payment fails, release the inventory reservation.
