Python SOLID Principles in Python —: Practice Problems & Exercises
Practice: SOLID Principles in Python — Engineering Patterns for Maintainable Code
← Back to lessonEasy
UserManager violates the Single Responsibility Principle by handling user creation, email, database storage, and reporting. Refactor to separate concerns.
class UserRepository:
def save_to_db(self, user: dict) -> None:
print(f"Saving {user} to database")
class UserNotifier:
def send_welcome_email(self, email: str) -> None:
print(f"Sending welcome email to {email}")
class UserManager:
def create_user(self, name: str, email: str) -> dict:
return {"name": name, "email": email}
repo = UserRepository()
notifier = UserNotifier()
repo.save_to_db({"name": "Alice", "email": "[email protected]"})
notifier.send_welcome_email("[email protected]")Solution
class UserRepository:
"""Handles persistence — only reason to change: DB layer changes."""
def save_to_db(self, user: dict) -> None:
print(f"Saving {user} to database")
def find_by_email(self, email: str) -> dict | None:
return None # stub
class UserNotifier:
"""Handles notifications — only reason to change: notification channel changes."""
def send_welcome_email(self, email: str) -> None:
print(f"Sending welcome email to {email}")
class UserReportGenerator:
"""Handles reporting — only reason to change: report format changes."""
def generate(self) -> str:
return "User report: ..."
class UserManager:
"""Handles user creation logic — orchestrates the other classes."""
def __init__(self, repo: UserRepository, notifier: UserNotifier) -> None:
self._repo = repo
self._notifier = notifier
def create_user(self, name: str, email: str) -> dict:
user = {"name": name, "email": email}
self._repo.save_to_db(user)
self._notifier.send_welcome_email(email)
return user
repo = UserRepository()
notifier = UserNotifier()
Explanation: SRP states every class should have one reason to change. UserManager had four reasons. By extracting UserRepository, UserNotifier, and UserReportGenerator, each class changes only when its own concern changes — DB schema, notification template, or report format — without touching any other class.
class UserManager:
def create_user(self, name: str, email: str) -> dict:
return {"name": name, "email": email}
def send_welcome_email(self, email: str) -> None:
print(f"Sending welcome email to {email}")
def save_to_db(self, user: dict) -> None:
print(f"Saving {user} to database")
def generate_report(self) -> str:
return "User report: ..."
# TODO: refactor into two focused classes:
# UserRepository (save_to_db)
# UserNotifier (send_welcome_email)
# Keep UserManager for create_user only
class UserRepository:
pass
class UserNotifier:
pass
repo = UserRepository()
notifier = UserNotifier()
repo.save_to_db({"name": "Alice", "email": "[email protected]"})
notifier.send_welcome_email("[email protected]")Expected Output
Saving {'name': 'Alice', 'email': '[email protected]'} to database\nSending welcome email to [email protected]Hints
Hint 1: SRP: each class should have exactly one reason to change.
Hint 2: Move save_to_db to UserRepository and send_welcome_email to UserNotifier.
The if/elif chain in DiscountCalculator violates OCP — adding a new discount type requires modifying the method. Run the code as-is, then in the solution observe how to fix it with a strategy dict.
class DiscountCalculator:
def calculate(self, customer_type: str, price: float) -> float:
if customer_type == "regular":
return price
elif customer_type == "vip":
return price * 0.9
elif customer_type == "employee":
return price * 0.7
raise ValueError(f"Unknown type: {customer_type}")
calc = DiscountCalculator()
print(calc.calculate("vip", 100.0))
print(calc.calculate("employee", 100.0))Solution
from typing import Protocol
class DiscountStrategy(Protocol):
def apply(self, price: float) -> float: ...
class RegularDiscount:
def apply(self, price: float) -> float:
return price
class VIPDiscount:
def apply(self, price: float) -> float:
return price * 0.9
class EmployeeDiscount:
def apply(self, price: float) -> float:
return price * 0.7
# Adding a new type: just create a new class and register it
class StudentDiscount:
def apply(self, price: float) -> float:
return price * 0.8
class DiscountCalculator:
def __init__(self) -> None:
self._strategies: dict[str, DiscountStrategy] = {
"regular": RegularDiscount(),
"vip": VIPDiscount(),
"employee": EmployeeDiscount(),
"student": StudentDiscount(),
}
def register(self, name: str, strategy: DiscountStrategy) -> None:
self._strategies[name] = strategy
def calculate(self, customer_type: str, price: float) -> float:
if customer_type not in self._strategies:
raise ValueError(f"Unknown type: {customer_type}")
return self._strategies[customer_type].apply(price)
calc = DiscountCalculator()
print(calc.calculate("vip", 100.0)) # 90.0
print(calc.calculate("employee", 100.0)) # 70.0
print(calc.calculate("student", 100.0)) # 80.0
Explanation: The OCP-compliant version registers strategies in a dict. Adding a new customer type means adding a new class and registering it — zero modification to DiscountCalculator. The class is closed for modification (core logic unchanged) but open for extension (new strategies via register()).
class DiscountCalculator:
def calculate(self, customer_type: str, price: float) -> float:
if customer_type == "regular":
return price
elif customer_type == "vip":
return price * 0.9
elif customer_type == "employee":
return price * 0.7
# Adding a new customer type requires modifying this method
raise ValueError(f"Unknown type: {customer_type}")
calc = DiscountCalculator()
print(calc.calculate("vip", 100.0))
print(calc.calculate("employee", 100.0))Expected Output
90.0\n70.0Hints
Hint 1: OCP: open for extension, closed for modification.
Hint 2: Replace the if/elif chain with a dictionary of strategies or polymorphism.
Run the code and observe the LSP violation: resize_and_area(sq) returns 25 instead of 50. The Square subtype does not honour the Rectangle contract that setting width and height independently is valid.
class Rectangle:
def __init__(self, width: float, height: float):
self.width = width
self.height = height
def area(self) -> float:
return self.width * self.height
class Square(Rectangle):
def __init__(self, side: float):
super().__init__(side, side)
@property
def width(self):
return self._width
@width.setter
def width(self, value):
self._width = value
self._height = value
@property
def height(self):
return self._height
@height.setter
def height(self, value):
self._height = value
self._width = value
def resize_and_area(r: Rectangle) -> float:
r.width = 10
r.height = 5
return r.area()
rect = Rectangle(3, 4)
sq = Square(3)
print(resize_and_area(rect)) # 50.0
print(resize_and_area(sq)) # 25.0 — LSP violationSolution
# The LSP fix: do NOT make Square inherit Rectangle.
# Model them as independent shapes sharing a common interface.
from abc import ABC, abstractmethod
class Shape(ABC):
@abstractmethod
def area(self) -> float:
pass
class Rectangle(Shape):
def __init__(self, width: float, height: float) -> None:
self.width = width
self.height = height
def area(self) -> float:
return self.width * self.height
class Square(Shape):
def __init__(self, side: float) -> None:
self.side = side
def area(self) -> float:
return self.side ** 2
# resize_and_area now only works on Rectangle (correct contract)
def resize_and_area(r: Rectangle) -> float:
r.width = 10
r.height = 5
return r.area()
rect = Rectangle(3, 4)
sq = Square(3)
print(resize_and_area(rect)) # 50.0
# Square is NOT a Rectangle — passing sq would be a type error
# print(resize_and_area(sq)) # wrong usage, mypy would catch this
# But both share the Shape interface:
shapes: list[Shape] = [rect, sq]
for s in shapes:
print(s.area()) # 50.0, 9.0
Explanation: The classic "Square is-a Rectangle" hierarchy violates LSP because Square must override setters to maintain its invariant, breaking Rectangle's postcondition (independent width and height). The correct design removes the inheritance: Square and Rectangle share a Shape ABC but are not related to each other. A Square is not a substitutable Rectangle.
class Rectangle:
def __init__(self, width: float, height: float):
self.width = width
self.height = height
def area(self) -> float:
return self.width * self.height
class Square(Rectangle):
def __init__(self, side: float):
super().__init__(side, side)
@property
def width(self):
return self._width
@width.setter
def width(self, value):
self._width = value
self._height = value # keeps square constraint
@property
def height(self):
return self._height
@height.setter
def height(self, value):
self._height = value
self._width = value
def resize_and_area(r: Rectangle) -> float:
r.width = 10
r.height = 5
return r.area()
rect = Rectangle(3, 4)
sq = Square(3)
print(resize_and_area(rect)) # expect 50
print(resize_and_area(sq)) # expect 50, but gets 25 — LSP violationExpected Output
50.0\n25.0Hints
Hint 1: The Square overrides setters to enforce the square constraint, breaking the Rectangle contract.
Hint 2: LSP requires substituting a subtype for its parent without changing program correctness.
Split the fat Worker ABC into three small focused ABCs. Robot should implement only Workable, and Human all three.
from abc import ABC, abstractmethod
class Workable(ABC):
@abstractmethod
def work(self) -> str: pass
class Eatable(ABC):
@abstractmethod
def eat(self) -> str: pass
class Sleepable(ABC):
@abstractmethod
def sleep(self) -> str: pass
class Human(Workable, Eatable, Sleepable):
def work(self) -> str: return "human working"
def eat(self) -> str: return "human eating"
def sleep(self) -> str: return "human sleeping"
class Robot(Workable):
def work(self) -> str: return "robot working"
robot = Robot()
print(robot.work())Solution
from abc import ABC, abstractmethod
class Workable(ABC):
@abstractmethod
def work(self) -> str: pass
class Eatable(ABC):
@abstractmethod
def eat(self) -> str: pass
class Sleepable(ABC):
@abstractmethod
def sleep(self) -> str: pass
class Human(Workable, Eatable, Sleepable):
def work(self) -> str: return "human working"
def eat(self) -> str: return "human eating"
def sleep(self) -> str: return "human sleeping"
class Robot(Workable):
def work(self) -> str: return "robot working"
# No eat() or sleep() — not needed, not forced
# Functions targeted at specific capabilities:
def make_work(w: Workable) -> None:
print(w.work())
def feed(e: Eatable) -> None:
print(e.eat())
make_work(Human()) # human working
make_work(Robot()) # robot working
feed(Human()) # human eating
# feed(Robot()) # mypy would catch this — Robot is not Eatable
robot = Robot()
print(robot.work()) # robot working
Explanation: ISP says no class should be forced to implement methods it does not use. The fat Worker ABC forced Robot to raise NotImplementedError on eat() and sleep() — a code smell and runtime trap. Small focused ABCs mean each class implements only what it genuinely provides.
from abc import ABC, abstractmethod
# Fat interface — not all implementors need all methods
class Worker(ABC):
@abstractmethod
def work(self) -> str: pass
@abstractmethod
def eat(self) -> str: pass
@abstractmethod
def sleep(self) -> str: pass
class Robot(Worker):
def work(self) -> str:
return "robot working"
def eat(self) -> str:
raise NotImplementedError("Robots don't eat")
def sleep(self) -> str:
raise NotImplementedError("Robots don't sleep")
# TODO: split into Workable, Eatable, Sleepable ABCs
# Then create Human(Workable, Eatable, Sleepable) and
# Robot(Workable) that only implements what it needs.
robot = Robot()
print(robot.work())Expected Output
robot workingHints
Hint 1: Create three focused ABCs: Workable, Eatable, Sleepable, each with one @abstractmethod.
Hint 2: Robot only inherits Workable. Human inherits all three.
Medium
Implement PaymentService using DIP — it should depend on the Logger abstraction (Protocol), not any concrete logger. Swap in a ConsoleLogger at construction time.
from typing import Protocol
class Logger(Protocol):
def log(self, msg: str) -> None: ...
class ConsoleLogger:
def log(self, msg: str) -> None:
print(f"[CONSOLE] {msg}")
class FileLogger:
def log(self, msg: str) -> None:
print(f"[FILE] {msg}")
class PaymentService:
def __init__(self, logger: Logger) -> None:
self._logger = logger
def charge(self, amount: float) -> None:
self._logger.log(f"Charging {amount}")
print(f"Charged {amount}")
svc = PaymentService(ConsoleLogger())
svc.charge(99.99)Solution
from typing import Protocol
class Logger(Protocol):
def log(self, msg: str) -> None: ...
class ConsoleLogger:
def log(self, msg: str) -> None:
print(f"[CONSOLE] {msg}")
class FileLogger:
def log(self, msg: str) -> None:
print(f"[FILE] {msg}")
class NullLogger:
"""Used in tests — discards all messages."""
def log(self, msg: str) -> None:
pass
class PaymentService:
def __init__(self, logger: Logger) -> None:
self._logger = logger
def charge(self, amount: float) -> None:
self._logger.log(f"Charging {amount}")
print(f"Charged {amount}")
# Production: use console logger
svc = PaymentService(ConsoleLogger())
svc.charge(99.99)
# Different environment: swap in file logger
svc2 = PaymentService(FileLogger())
svc2.charge(49.00)
# Tests: use null logger (no output noise)
svc3 = PaymentService(NullLogger())
svc3.charge(10.00)
Explanation: DIP states high-level modules should not depend on low-level modules — both should depend on abstractions. PaymentService depends on the Logger Protocol, not ConsoleLogger. This makes the service testable (inject a null or capturing logger in tests), environment-agnostic (file vs. console), and closed for modification when the logging implementation changes.
from typing import Protocol
class Logger(Protocol):
def log(self, msg: str) -> None: ...
class ConsoleLogger:
def log(self, msg: str) -> None:
print(f"[CONSOLE] {msg}")
class FileLogger:
def log(self, msg: str) -> None:
print(f"[FILE] {msg}")
class PaymentService:
def __init__(self, logger):
# TODO: store logger
pass
def charge(self, amount: float) -> None:
# TODO: log "Charging X" and print "Charged X"
pass
svc = PaymentService(ConsoleLogger())
svc.charge(99.99)Expected Output
[CONSOLE] Charging 99.99\nCharged 99.99Hints
Hint 1: Store logger as self._logger.
Hint 2: Call self._logger.log(...) inside charge().
Run the existing code, then add a MarkdownFormatter that formats the dict as a markdown table. Reporter should work with it without any modification.
import json
from typing import Protocol
class Formatter(Protocol):
def format(self, data: dict) -> str: ...
class JSONFormatter:
def format(self, data: dict) -> str:
return json.dumps(data)
class HTMLFormatter:
def format(self, data: dict) -> str:
rows = "".join(f"<li>{k}: {v}</li>" for k, v in data.items())
return f"<ul>{rows}</ul>"
class MarkdownFormatter:
def format(self, data: dict) -> str:
header = "| Key | Value |\n|-----|-------|\n"
rows = "".join(f"| {k} | {v} |\n" for k, v in data.items())
return header + rows
class Reporter:
def __init__(self, formatter: Formatter) -> None:
self._formatter = formatter
def report(self, data: dict) -> str:
return self._formatter.format(data)
data = {"users": 42, "active": 38}
r1 = Reporter(JSONFormatter())
r2 = Reporter(HTMLFormatter())
r3 = Reporter(MarkdownFormatter())
print(r1.report(data))
print(r2.report(data))
print(r3.report(data))Solution
import json
from typing import Protocol
class Formatter(Protocol):
def format(self, data: dict) -> str: ...
class JSONFormatter:
def format(self, data: dict) -> str:
return json.dumps(data)
class HTMLFormatter:
def format(self, data: dict) -> str:
rows = "".join(f"<li>{k}: {v}</li>" for k, v in data.items())
return f"<ul>{rows}</ul>"
class MarkdownFormatter:
def format(self, data: dict) -> str:
header = "| Key | Value |\n|-----|-------|\n"
rows = "".join(f"| {k} | {v} |\n" for k, v in data.items())
return header + rows
class CSVFormatter:
def format(self, data: dict) -> str:
keys = ",".join(data.keys())
values = ",".join(str(v) for v in data.values())
return f"{keys}\n{values}"
class Reporter:
def __init__(self, formatter: Formatter) -> None:
self._formatter = formatter
def report(self, data: dict) -> str:
return self._formatter.format(data)
data = {"users": 42, "active": 38}
for fmt in [JSONFormatter(), HTMLFormatter(), MarkdownFormatter(), CSVFormatter()]:
print(Reporter(fmt).report(data))
print("---")
Explanation: Reporter never changes — it is closed for modification. Adding MarkdownFormatter and CSVFormatter extends the system without touching any existing class. The Protocol is the abstraction that makes this possible: any object with a matching format() method satisfies it, enabling open-world extension without a shared base class.
from typing import Protocol, List
class Formatter(Protocol):
def format(self, data: dict) -> str: ...
class JSONFormatter:
def format(self, data: dict) -> str:
import json
return json.dumps(data)
class HTMLFormatter:
def format(self, data: dict) -> str:
rows = "".join(f"<li>{k}: {v}</li>" for k, v in data.items())
return f"<ul>{rows}</ul>"
class Reporter:
def __init__(self, formatter: Formatter):
self._formatter = formatter
def report(self, data: dict) -> str:
return self._formatter.format(data)
data = {"users": 42, "active": 38}
r1 = Reporter(JSONFormatter())
r2 = Reporter(HTMLFormatter())
print(r1.report(data))
print(r2.report(data))Expected Output
{"users": 42, "active": 38}\n<ul><li>users: 42</li><li>active: 38</li></ul>Hints
Hint 1: Reporter is already OCP-compliant — run it and observe.
Hint 2: Then add a MarkdownFormatter without touching Reporter at all.
Implement BoundedStack.push() so it respects the Stack contract: a successful push always increases size() by 1. Enforce the capacity limit by raising OverflowError (not by silently dropping items).
class Stack:
def __init__(self):
self._items = []
def push(self, item) -> None:
self._items.append(item)
def pop(self):
return self._items.pop()
def size(self) -> int:
return len(self._items)
class BoundedStack(Stack):
def __init__(self, max_size: int):
super().__init__()
self.max_size = max_size
def push(self, item) -> None:
if len(self._items) >= self.max_size:
raise OverflowError(f"Stack is full (max_size={self.max_size})")
super().push(item)
bs = BoundedStack(max_size=2)
bs.push("a")
bs.push("b")
print(bs.size())
try:
bs.push("c")
except OverflowError as e:
print(f"OverflowError: {e}")Solution
class Stack:
"""Base contract: push always increases size by 1."""
def __init__(self) -> None:
self._items: list = []
def push(self, item) -> None:
self._items.append(item)
def pop(self):
if not self._items:
raise IndexError("pop from empty stack")
return self._items.pop()
def size(self) -> int:
return len(self._items)
class BoundedStack(Stack):
"""LSP-compliant: extends Stack with capacity constraint."""
def __init__(self, max_size: int) -> None:
super().__init__()
self.max_size = max_size
def push(self, item) -> None:
# Strengthening preconditions (capacity check) is LSP-safe
# as long as successful pushes preserve the parent postcondition
if self.size() >= self.max_size:
raise OverflowError(f"Stack is full (max_size={self.max_size})")
super().push(item) # delegates — postcondition preserved
# BoundedStack is substitutable for Stack:
def fill_stack(s: Stack, items: list) -> None:
for item in items:
s.push(item)
bs = BoundedStack(max_size=3)
fill_stack(bs, ["x", "y", "z"])
print(bs.size()) # 3
try:
fill_stack(bs, ["overflow"])
except OverflowError as e:
print(f"OverflowError: {e}")
Explanation: LSP allows strengthening preconditions (raising OverflowError before capacity is reached) as long as successful operations preserve the parent's postconditions (size increases by 1). Silent dropping is the LSP violation — it would change size() behaviour unexpectedly. An explicit exception is honest: the caller knows the operation failed.
class Stack:
def __init__(self):
self._items = []
def push(self, item) -> None:
self._items.append(item)
def pop(self):
return self._items.pop()
def size(self) -> int:
return len(self._items)
class BoundedStack(Stack):
def __init__(self, max_size: int):
super().__init__()
self.max_size = max_size
def push(self, item) -> None:
# TODO: enforce max_size, but keep the push contract:
# after a successful push, size() == old_size + 1
# Raise OverflowError if at capacity (do NOT silently drop items)
pass
bs = BoundedStack(max_size=2)
bs.push("a")
bs.push("b")
print(bs.size())
try:
bs.push("c")
except OverflowError as e:
print(f"OverflowError: {e}")Expected Output
2\nOverflowError: Stack is full (max_size=2)Hints
Hint 1: Check if len(self._items) >= self.max_size before pushing.
Hint 2: Raising OverflowError is LSP-compliant because it is a precondition violation — it does NOT alter the postcondition of successful pushes.
Verify the ISP design: cache_write and cache_read accept only the narrow interface they need. InMemoryStore satisfies both, but each function is type-safe at the narrower level.
from typing import Protocol
class Readable(Protocol):
def read(self, key: str) -> str: ...
class Writable(Protocol):
def write(self, key: str, value: str) -> None: ...
class InMemoryStore:
def __init__(self):
self._data = {}
def read(self, key: str) -> str:
return self._data.get(key, "")
def write(self, key: str, value: str) -> None:
self._data[key] = value
def cache_write(store: Writable, key: str, value: str) -> None:
store.write(key, value)
def cache_read(store: Readable, key: str) -> str:
return store.read(key)
store = InMemoryStore()
cache_write(store, "user:1", "Alice")
print(cache_read(store, "user:1"))Solution
from typing import Protocol
class Readable(Protocol):
def read(self, key: str) -> str: ...
class Writable(Protocol):
def write(self, key: str, value: str) -> None: ...
class InMemoryStore:
def __init__(self) -> None:
self._data: dict[str, str] = {}
def read(self, key: str) -> str:
return self._data.get(key, "")
def write(self, key: str, value: str) -> None:
self._data[key] = value
class ReadOnlyStore:
"""Only satisfies Readable — cannot be passed to cache_write."""
def __init__(self, data: dict[str, str]) -> None:
self._data = data
def read(self, key: str) -> str:
return self._data.get(key, "")
def cache_write(store: Writable, key: str, value: str) -> None:
store.write(key, value)
def cache_read(store: Readable, key: str) -> str:
return store.read(key)
store = InMemoryStore()
cache_write(store, "user:1", "Alice")
print(cache_read(store, "user:1")) # Alice
ro = ReadOnlyStore({"config": "prod"})
print(cache_read(ro, "config")) # prod
# cache_write(ro, "x", "y") # mypy error — ReadOnlyStore has no write()
Explanation: ISP-compliant interfaces make each function's capability requirements explicit. A function that only reads should never accidentally call write — narrow interfaces make this a compile-time guarantee (via mypy) rather than a convention. InMemoryStore satisfies both interfaces because it has both methods, but callers only see what they need.
from typing import Protocol
class Readable(Protocol):
def read(self, key: str) -> str: ...
class Writable(Protocol):
def write(self, key: str, value: str) -> None: ...
class ReadWriteStore(Readable, Writable, Protocol):
pass
class InMemoryStore:
def __init__(self):
self._data = {}
def read(self, key: str) -> str:
return self._data.get(key, "")
def write(self, key: str, value: str) -> None:
self._data[key] = value
def cache_write(store: Writable, key: str, value: str) -> None:
store.write(key, value)
def cache_read(store: Readable, key: str) -> str:
return store.read(key)
store = InMemoryStore()
cache_write(store, "user:1", "Alice")
print(cache_read(store, "user:1"))Expected Output
AliceHints
Hint 1: cache_write accepts only Writable — it cannot accidentally call read().
Hint 2: cache_read accepts only Readable — it cannot accidentally call write().
Hard
Implement UserService.activate_user() with full DIP: it depends on UserRepo and EmailSender protocols. Use the provided fakes to test in isolation.
from typing import Protocol
class UserRepo(Protocol):
def get(self, user_id: int) -> dict: ...
def save(self, user: dict) -> None: ...
class EmailSender(Protocol):
def send(self, to: str, subject: str, body: str) -> None: ...
class UserService:
def __init__(self, repo: UserRepo, emailer: EmailSender) -> None:
self._repo = repo
self._emailer = emailer
def activate_user(self, user_id: int) -> None:
user = self._repo.get(user_id)
user["active"] = True
self._repo.save(user)
self._emailer.send(
to=user["email"],
subject="Account Activated",
body=f"Hello, your account {user_id} is now active."
)
class InMemoryUserRepo:
def __init__(self):
self._users = {1: {"id": 1, "email": "[email protected]", "active": False}}
def get(self, user_id: int) -> dict:
return dict(self._users[user_id])
def save(self, user: dict) -> None:
self._users[user["id"]] = dict(user)
class FakeEmailSender:
def __init__(self):
self.sent = []
def send(self, to: str, subject: str, body: str) -> None:
self.sent.append({"to": to, "subject": subject})
repo = InMemoryUserRepo()
emailer = FakeEmailSender()
svc = UserService(repo, emailer)
svc.activate_user(1)
print(repo.get(1)["active"])
print(emailer.sent[0]["subject"])Solution
from typing import Protocol
class UserRepo(Protocol):
def get(self, user_id: int) -> dict: ...
def save(self, user: dict) -> None: ...
class EmailSender(Protocol):
def send(self, to: str, subject: str, body: str) -> None: ...
class UserService:
def __init__(self, repo: UserRepo, emailer: EmailSender) -> None:
self._repo = repo
self._emailer = emailer
def activate_user(self, user_id: int) -> None:
user = self._repo.get(user_id)
if user.get("active"):
return # idempotent
user["active"] = True
self._repo.save(user)
self._emailer.send(
to=user["email"],
subject="Account Activated",
body=f"Your account (id={user_id}) is now active."
)
# --- Infrastructure (production would be postgres + SMTP) ---
class InMemoryUserRepo:
def __init__(self) -> None:
self._users: dict[int, dict] = {
}
def get(self, user_id: int) -> dict:
return dict(self._users[user_id])
def save(self, user: dict) -> None:
self._users[user["id"]] = dict(user)
class FakeEmailSender:
def __init__(self) -> None:
self.sent: list[dict] = []
def send(self, to: str, subject: str, body: str) -> None:
self.sent.append({"to": to, "subject": subject, "body": body})
# Test-like usage:
repo = InMemoryUserRepo()
emailer = FakeEmailSender()
svc = UserService(repo, emailer)
svc.activate_user(1)
assert repo.get(1)["active"] is True
assert emailer.sent[0]["subject"] == "Account Activated"
# Idempotent:
svc.activate_user(1)
assert len(emailer.sent) == 1 # no second email
print(repo.get(1)["active"]) # True
print(emailer.sent[0]["subject"]) # Account Activated
Explanation: UserService has zero knowledge of databases or email providers. It depends entirely on the two Protocols. In production you inject a PostgresUserRepo and SMTPEmailSender. In tests you inject the in-memory fakes. The service logic is tested without a database or network — which is the direct payoff of DIP.
from typing import Protocol
class UserRepo(Protocol):
def get(self, user_id: int) -> dict: ...
def save(self, user: dict) -> None: ...
class EmailSender(Protocol):
def send(self, to: str, subject: str, body: str) -> None: ...
class UserService:
def __init__(self, repo: UserRepo, emailer: EmailSender):
# TODO: store both dependencies
pass
def activate_user(self, user_id: int) -> None:
# TODO: fetch user, set active=True, save, then send email
pass
class InMemoryUserRepo:
def __init__(self):
self._users = {1: {"id": 1, "email": "[email protected]", "active": False}}
def get(self, user_id: int) -> dict:
return dict(self._users[user_id])
def save(self, user: dict) -> None:
self._users[user["id"]] = dict(user)
class FakeEmailSender:
def __init__(self):
self.sent = []
def send(self, to: str, subject: str, body: str) -> None:
self.sent.append({"to": to, "subject": subject})
repo = InMemoryUserRepo()
emailer = FakeEmailSender()
svc = UserService(repo, emailer)
svc.activate_user(1)
print(repo.get(1)["active"])
print(emailer.sent[0]["subject"])Expected Output
True\nAccount ActivatedHints
Hint 1: In __init__, store self._repo and self._emailer.
Hint 2: In activate_user: user = self._repo.get(user_id), set user["active"] = True, self._repo.save(user), then self._emailer.send(...).
Identify and fix all three SOLID violations in UserSystem: SRP (mixed concerns), OCP (hardcoded algorithm), and DIP (depends on concretions). Refactor into four focused classes.
import json, hashlib
from typing import Protocol
class PasswordHasher(Protocol):
def hash(self, password: str) -> str: ...
class UserStore(Protocol):
def save(self, user: dict) -> None: ...
class Notifier(Protocol):
def notify(self, username: str) -> None: ...
class MD5Hasher:
def hash(self, password: str) -> str:
return hashlib.md5(password.encode()).hexdigest()
class PrintUserStore:
def save(self, user: dict) -> None:
print(f"DB: INSERT {json.dumps(user)}")
class PrintNotifier:
def notify(self, username: str) -> None:
print(f"EMAIL: Welcome {username}")
class UserRegistrationService:
def __init__(
self,
hasher: PasswordHasher,
store: UserStore,
notifier: Notifier,
) -> None:
self._hasher = hasher
self._store = store
self._notifier = notifier
def register(self, username: str, password: str) -> dict:
pw_hash = self._hasher.hash(password)
user = {"username": username, "pw_hash": pw_hash}
self._store.save(user)
self._notifier.notify(username)
return user
svc = UserRegistrationService(
hasher=MD5Hasher(),
store=PrintUserStore(),
notifier=PrintNotifier(),
)
user = svc.register("alice", "secret")
print(user["username"])Solution
import json, hashlib
from typing import Protocol
# Abstractions (Protocols)
class PasswordHasher(Protocol):
def hash(self, password: str) -> str: ...
class UserStore(Protocol):
def save(self, user: dict) -> None: ...
class Notifier(Protocol):
def notify(self, username: str) -> None: ...
# Concrete implementations
class MD5Hasher:
def hash(self, password: str) -> str:
return hashlib.md5(password.encode()).hexdigest()
class SHA256Hasher:
"""OCP: swap algorithm without touching service."""
def hash(self, password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest()
class PrintUserStore:
def save(self, user: dict) -> None:
print(f"DB: INSERT {json.dumps(user)}")
class PrintNotifier:
def notify(self, username: str) -> None:
print(f"EMAIL: Welcome {username}")
class NullNotifier:
"""For testing: discards notifications."""
def notify(self, username: str) -> None:
pass
class UserRegistrationService:
"""SRP: only handles registration orchestration."""
def __init__(
self,
hasher: PasswordHasher,
store: UserStore,
notifier: Notifier,
) -> None:
self._hasher = hasher
self._store = store
self._notifier = notifier
def register(self, username: str, password: str) -> dict:
pw_hash = self._hasher.hash(password)
user = {"username": username, "pw_hash": pw_hash}
self._store.save(user)
self._notifier.notify(username)
return user
svc = UserRegistrationService(
hasher=MD5Hasher(),
store=PrintUserStore(),
notifier=PrintNotifier(),
)
user = svc.register("alice", "secret")
print(user["username"]) # alice
Explanation: Three violations fixed: SRP — each class now has one concern (hashing, storing, notifying). OCP — swap MD5Hasher for SHA256Hasher by passing a different object at construction; no service code changes. DIP — UserRegistrationService depends on three Protocols, not on hashlib, print(), or any specific mailer. Every component can be tested independently with fakes.
import json, hashlib
class UserSystem:
"""
This class violates SRP, OCP, and DIP.
Identify all three violations and refactor.
"""
def register(self, username: str, password: str) -> dict:
# Hash inline (OCP violation: algorithm hardcoded)
pw_hash = hashlib.md5(password.encode()).hexdigest()
user = {"username": username, "pw_hash": pw_hash}
# Save inline (SRP violation: business logic + persistence)
print(f"DB: INSERT {json.dumps(user)}")
# Notify inline (SRP + DIP violation: hardcoded mechanism)
print(f"EMAIL: Welcome {username}")
return user
# TODO: refactor into:
# - PasswordHasher (Protocol + MD5 impl)
# - UserStore (Protocol + PrintStore impl)
# - Notifier (Protocol + PrintNotifier impl)
# - UserRegistrationService(hasher, store, notifier)
svc = UserRegistrationService(
hasher=MD5Hasher(),
store=PrintUserStore(),
notifier=PrintNotifier(),
)
user = svc.register("alice", "secret")
print(user["username"])Expected Output
DB: INSERT {"username": "alice", "pw_hash": "5ebe2294ecd0e0f08eab7690d2a6ee69"}
EMAIL: Welcome alice
aliceHints
Hint 1: Create three Protocols: PasswordHasher, UserStore, Notifier.
Hint 2: Create concrete implementations: MD5Hasher, PrintUserStore, PrintNotifier.
Hint 3: UserRegistrationService accepts all three via constructor injection.
Implement a SOLID-compliant EventBus. Multiple handlers can subscribe to the same event. Publishing an event calls all subscribed handlers in order. The bus itself never changes when new handlers are added.
from typing import Protocol
from collections import defaultdict
class EventHandler(Protocol):
def handle(self, event: str, data: dict) -> None: ...
class EventBus:
def __init__(self) -> None:
self._handlers: dict[str, list] = defaultdict(list)
def subscribe(self, event: str, handler: EventHandler) -> None:
self._handlers[event].append(handler)
def publish(self, event: str, data: dict) -> None:
for handler in self._handlers.get(event, []):
handler.handle(event, data)
class LogHandler:
def handle(self, event: str, data: dict) -> None:
print(f"LOG [{event}]: {data}")
class AuditHandler:
def handle(self, event: str, data: dict) -> None:
print(f"AUDIT [{event}]: user={data.get('user')}")
bus = EventBus()
bus.subscribe("user.created", LogHandler())
bus.subscribe("user.created", AuditHandler())
bus.publish("user.created", {"user": "alice", "plan": "pro"})Solution
from typing import Protocol
from collections import defaultdict
class EventHandler(Protocol):
def handle(self, event: str, data: dict) -> None: ...
class EventBus:
"""OCP: adding new handlers requires zero changes to EventBus."""
def __init__(self) -> None:
self._handlers: dict[str, list[EventHandler]] = defaultdict(list)
def subscribe(self, event: str, handler: EventHandler) -> None:
self._handlers[event].append(handler)
def unsubscribe(self, event: str, handler: EventHandler) -> None:
self._handlers[event] = [
h for h in self._handlers[event] if h is not handler
]
def publish(self, event: str, data: dict) -> None:
for handler in list(self._handlers[event]):
handler.handle(event, data)
class LogHandler:
def handle(self, event: str, data: dict) -> None:
print(f"LOG [{event}]: {data}")
class AuditHandler:
def handle(self, event: str, data: dict) -> None:
print(f"AUDIT [{event}]: user={data.get('user')}")
class MetricsHandler:
"""OCP extension: added without touching EventBus."""
def __init__(self) -> None:
self.counts: dict[str, int] = defaultdict(int)
def handle(self, event: str, data: dict) -> None:
self.counts[event] += 1
bus = EventBus()
metrics = MetricsHandler()
bus.subscribe("user.created", LogHandler())
bus.subscribe("user.created", AuditHandler())
bus.subscribe("user.created", metrics)
bus.subscribe("order.placed", LogHandler())
bus.subscribe("order.placed", metrics)
bus.publish("user.created", {"user": "alice", "plan": "pro"})
bus.publish("order.placed", {"user": "alice", "amount": 99})
print(dict(metrics.counts)) # {'user.created': 1, 'order.placed': 1}
Explanation: EventBus exemplifies all five SOLID principles working together. SRP: it only routes events. OCP: new handlers added by calling subscribe(), not by changing EventBus. LSP: any EventHandler is substitutable. ISP: EventHandler is a single-method Protocol. DIP: EventBus depends on the EventHandler Protocol abstraction, not on concrete handler classes. Adding MetricsHandler required zero modification to any existing class.
from typing import Protocol, List
class EventHandler(Protocol):
def handle(self, event: str, data: dict) -> None: ...
class EventBus:
def __init__(self):
self._handlers: dict = {}
def subscribe(self, event: str, handler: EventHandler) -> None:
# TODO: add handler to the list for event
pass
def publish(self, event: str, data: dict) -> None:
# TODO: call all handlers registered for event
pass
class LogHandler:
def handle(self, event: str, data: dict) -> None:
print(f"LOG [{event}]: {data}")
class AuditHandler:
def handle(self, event: str, data: dict) -> None:
print(f"AUDIT [{event}]: user={data.get('user')}")
bus = EventBus()
bus.subscribe("user.created", LogHandler())
bus.subscribe("user.created", AuditHandler())
bus.publish("user.created", {"user": "alice", "plan": "pro"})Expected Output
LOG [user.created]: {'user': 'alice', 'plan': 'pro'}\nAUDIT [user.created]: user=aliceHints
Hint 1: Use a defaultdict(list) or setdefault to store handlers per event name.
Hint 2: In publish(), iterate self._handlers.get(event, []) and call handler.handle(event, data).
