Python Plugin Systems Practice Problems & Exercises
Practice: Plugin Systems
← Back to lessonEasy
Build a simple formatter plugin registry for exporting data in different formats.
from typing import Callable
_EXPORTERS: dict[str, Callable] = {}
def register_exporter(name: str) -> Callable:
def decorator(fn: Callable) -> Callable:
_EXPORTERS[name] = fn
return fn
return decorator
def export(format_name: str, data: list[dict]) -> str:
if format_name not in _EXPORTERS:
return f"Unknown plugin: {format_name}"
return _EXPORTERS[format_name](data)
# Plugin implementations
@register_exporter("csv")
def csv_exporter(data: list[dict]) -> str:
if not data:
return ""
headers = ",".join(data[0].keys())
rows = [",".join(str(v) for v in row.values()) for row in data]
return headers + "\n" + "\n".join(rows)
@register_exporter("json")
def json_exporter(data: list[dict]) -> str:
import json
return json.dumps(data)
@register_exporter("xml")
def xml_exporter(data: list[dict]) -> str:
rows = "".join(
"<row>" + "".join(f"<{k}>{v}</{k}>" for k, v in row.items()) + "</row>"
for row in data
)
return f"<data>{rows}</data>"
data = [{"name": "Alice", "age": 30}]
print(f"Registered plugins: {sorted(_EXPORTERS.keys())}")
print(f"csv output: {export('csv', data)}")
print(f"json output: {export('json', data)}")
print(export('parquet', data))Solution
The solution is above. This is the simplest possible plugin system: a decorator registers a callable into a global dict, and the dispatcher looks up by name.
Why this works for simple cases:
- Adding a new format requires only implementing the function and adding
@register_exporter("format_name"). - The core
export()function never needs to change. - Plugins can live in separate files and be imported lazily.
Scaling problem: As the app grows, you need to discover plugins without importing every possible module. That is where entry_points and importlib.metadata come in.
Expected Output
Registered plugins: ['csv', 'json', 'xml']
csv output: name,age
Alice,30
json output: [{"name": "Alice", "age": 30}]
Unknown plugin: parquetHints
Hint 1: A plugin registry is just a dict mapping plugin name to a callable or class. Register plugins by name; retrieve and call by name.
Hint 2: Use a module-level dict so plugins can register themselves by calling register_exporter() from anywhere.
Use __init_subclass__ to automatically register validator plugins when they are defined.
import re
class Validator:
_registry: dict = {}
def __init_subclass__(cls, name: str = "", **kwargs):
super().__init_subclass__(**kwargs)
if name:
Validator._registry[name] = cls()
def validate(self, value: str) -> bool:
raise NotImplementedError
@classmethod
def get(cls, name: str) -> "Validator":
if name not in cls._registry:
raise KeyError(f"No validator: {name}")
return cls._registry[name]
@classmethod
def available(cls) -> list[str]:
return sorted(cls._registry.keys())
# Plugins - auto-registered on class definition
class EmailValidator(Validator, name="email"):
def validate(self, value: str) -> bool:
return bool(re.match(r"^[^@]+@[^@]+\.[^@]+$", value))
class PhoneValidator(Validator, name="phone"):
def validate(self, value: str) -> bool:
return bool(re.match(r"^\+?[\d\s\-]{7,15}$", value))
class UrlValidator(Validator, name="url"):
def validate(self, value: str) -> bool:
return value.startswith("https://") or value.startswith("http://")
print(f"Available validators: {Validator.available()}")
print(f"email valid: {Validator.get('email').validate('[email protected]')}")
print(f"phone valid: {Validator.get('phone').validate('+1 555 0100')}")
print(f"url valid: {Validator.get('url').validate('not-a-url')}")Solution
The solution is above. __init_subclass__ fires when the class statement is evaluated — no explicit registration call needed from the plugin author.
The name argument: Passed as a keyword in the class definition — class EmailValidator(Validator, name="email"). Python routes keyword arguments in class definitions to __init_subclass__.
Preventing base class self-registration: if name: guards against the case where Validator itself (with no name argument) tries to register.
When to use __init_subclass__ vs decorator: Use __init_subclass__ when plugins must inherit from a base class anyway (for a shared interface). Use decorators when plugins are standalone functions or do not share an inheritance hierarchy.
Expected Output
Available validators: ['email', 'phone', 'url']\nemail valid: True\nphone valid: True\nurl valid: FalseHints
Hint 1: __init_subclass__ is called automatically when a class is subclassed. Override it in the base class to register every subclass in a global registry.
Hint 2: Each subclass must define a class-level name attribute. The base class stores them in a class variable dict.
Implement a plugin loader that discovers and imports plugin modules from a list of module paths, allowing them to self-register.
import importlib
import sys
import types
# Global registry (shared with plugins)
_EXPORTERS: dict = {}
def register_exporter(name: str):
def decorator(fn):
_EXPORTERS[name] = fn
return fn
return decorator
def load_plugins(module_paths: list[str]) -> None:
for path in module_paths:
module = importlib.import_module(path)
print(f"Loaded plugin from: {path}")
# Simulate plugin modules (normally in separate .py files)
def _make_plugin_module(mod_name: str, exporter_name: str, fn) -> None:
mod = types.ModuleType(mod_name)
mod.register_exporter = register_exporter
# Simulate: @register_exporter(exporter_name) def export(data): ...
register_exporter(exporter_name)(fn)
sys.modules[mod_name] = mod
_make_plugin_module("plugins.csv_plugin", "csv", lambda data: ",".join(str(r) for r in data))
_make_plugin_module("plugins.json_plugin", "json", lambda data: str(data))
load_plugins(["plugins.csv_plugin", "plugins.json_plugin"])
print(f"Available: {sorted(_EXPORTERS.keys())}")Solution
The solution above uses importlib.import_module — the standard way to dynamically load Python modules. In a real project, the plugin modules would be separate .py files, and loading them triggers their @register_exporter decorators.
Real project structure:
myapp/
plugins/
__init__.py
csv_plugin.py ← @register_exporter("csv") def export(...): ...
json_plugin.py ← @register_exporter("json") def export(...): ...
# Loader (in app startup):
load_plugins(["myapp.plugins.csv_plugin", "myapp.plugins.json_plugin"])
Auto-discovery from a directory:
from pathlib import Path
import importlib
def discover_plugins(package: str, plugin_dir: Path) -> None:
for path in plugin_dir.glob("*_plugin.py"):
module_name = f"{package}.{path.stem}"
importlib.import_module(module_name)
Expected Output
Loaded plugin from: plugins.csv_plugin\nLoaded plugin from: plugins.json_plugin\nAvailable: ['csv', 'json']Hints
Hint 1: Use importlib.import_module(module_path) to load a module by dotted path string. After import, the module registers itself via the @register_exporter decorator.
Hint 2: The plugin loader just needs a list of module paths to import. It does not need to know anything about what is in them.
Medium
Implement a PluginManager that manages plugin lifecycle: on_load, execute, and on_unload.
from typing import Protocol
from dataclasses import dataclass, field
class Plugin(Protocol):
name: str
def on_load(self) -> None: ...
def on_unload(self) -> None: ...
def execute(self, data: dict) -> dict: ...
class PluginManager:
def __init__(self):
self._plugins: list = []
def load(self, plugin) -> None:
print(f"Loading: {plugin.name}")
plugin.on_load()
self._plugins.append(plugin)
def execute_all(self, data: dict) -> dict:
result = dict(data)
for plugin in self._plugins:
result = plugin.execute(result)
print(f"Executed {plugin.name}: {result}")
return result
def unload_all(self) -> None:
for plugin in reversed(self._plugins):
print(f"Unloading: {plugin.name}")
plugin.on_unload()
self._plugins.clear()
# Plugin implementations
class AuditLoggerPlugin:
name = "audit_logger"
def on_load(self): pass
def on_unload(self): pass
def execute(self, data: dict) -> dict:
return {**data, "audited": True}
class DataEnricherPlugin:
name = "data_enricher"
def on_load(self): pass
def on_unload(self): pass
def execute(self, data: dict) -> dict:
return {**data, "enriched": True}
manager = PluginManager()
manager.load(AuditLoggerPlugin())
manager.load(DataEnricherPlugin())
manager.execute_all({"user": "alice"})
manager.unload_all()Solution
The solution is above. Lifecycle hooks (on_load, on_unload) allow plugins to acquire and release resources. Examples:
AuditLoggerPlugin.on_load(): open a log fileAuditLoggerPlugin.on_unload(): flush and close the log fileCachePlugin.on_load(): connect to RedisCachePlugin.on_unload(): close the Redis connection
LIFO unloading order: Plugins are unloaded in reverse order (LIFO) because later plugins may depend on earlier ones. If plugin B was loaded after A and depends on A, unloading A first would break B.
from typing import Protocol
from dataclasses import dataclass, field
class Plugin(Protocol):
name: str
def on_load(self) -> None: ...
def on_unload(self) -> None: ...
def execute(self, data: dict) -> dict: ...
class PluginManager:
"""Manages plugin lifecycle: load, execute, unload."""
passExpected Output
Loading: audit_logger\nLoading: data_enricher\nExecuted audit_logger: {'user': 'alice', 'audited': True}\nExecuted data_enricher: {'user': 'alice', 'audited': True, 'enriched': True}\nUnloading: data_enricher\nUnloading: audit_loggerHints
Hint 1: The PluginManager keeps an ordered list of loaded plugins. load() calls on_load(); unload_all() calls on_unload() in reverse order.
Hint 2: execute_all() passes data through each plugin in sequence — each plugin transforms the dict and returns it.
Simulate the importlib.metadata entry_points mechanism for discovering plugins across installed packages.
from typing import Callable
import importlib
import sys
import types
FAKE_ENTRY_POINTS: dict[str, dict[str, str]] = {}
def declare_entry_point(group: str, name: str, target: str) -> None:
FAKE_ENTRY_POINTS.setdefault(group, {})[name] = target
def load_entry_points(group: str) -> dict[str, Callable]:
result = {}
for name, target in FAKE_ENTRY_POINTS.get(group, {}).items():
module_path, fn_name = target.rsplit(":", 1)
module = importlib.import_module(module_path)
result[name] = getattr(module, fn_name)
print(f"{name} exporter loaded")
return result
# Simulate two "packages" registering exporters
def _register_fake_modules():
csv_mod = types.ModuleType("csv_pkg.plugin")
def csv_export(data):
headers = ",".join(data[0].keys())
rows = [",".join(str(v) for v in row.values()) for row in data]
return headers + "\n" + "\n".join(rows)
csv_mod.export = csv_export
sys.modules["csv_pkg.plugin"] = csv_mod
json_mod = types.ModuleType("json_pkg.plugin")
import json as _json
json_mod.export = lambda data: _json.dumps(data, separators=(",", ":"))
sys.modules["json_pkg.plugin"] = json_mod
_register_fake_modules()
# Packages declare their entry points (normally in pyproject.toml)
declare_entry_point("myapp.exporters", "csv", "csv_pkg.plugin:export")
declare_entry_point("myapp.exporters", "json", "json_pkg.plugin:export")
# App discovers and loads plugins
exporters = load_entry_points("myapp.exporters")
data = [{"name": "Alice", "age": 30}]
print(f"csv result: {exporters['csv'](data)}")Solution
The solution above shows how importlib.metadata entry_points work. In production:
pyproject.toml:
[project.entry-points."myapp.exporters"]
csv = "csv_exporter_pkg.plugin:export"
json = "json_exporter_pkg.plugin:export"
Discovery code:
from importlib.metadata import entry_points
def load_exporters() -> dict[str, Callable]:
result = {}
for ep in entry_points(group="myapp.exporters"):
result[ep.name] = ep.load() # imports and returns the callable
return result
The power: Installing pip install csv-exporter-plugin is enough to make the new exporter available. The core application needs zero changes. This is how pytest plugins, Flask extensions, and Sphinx extensions all work.
from typing import Callable
import importlib
# Simulate the entry_points discovery mechanism:
# In real projects, packages declare entry points in pyproject.toml:
# [project.entry-points."myapp.exporters"]
# csv = "myapp.plugins.csv:export"
#
# Simulate this with a fake metadata registry and implement
# a load_entry_points(group) function.
FAKE_ENTRY_POINTS: dict[str, dict[str, str]] = {}
def declare_entry_point(group: str, name: str, target: str) -> None:
pass
def load_entry_points(group: str) -> dict[str, Callable]:
passExpected Output
csv exporter loaded\njson exporter loaded\ncsv result: name,age\nAlice,30Hints
Hint 1: FAKE_ENTRY_POINTS[group][name] = "module_path:function_name". load_entry_points splits on ":", imports the module, and getattr() the function.
Hint 2: Use importlib.import_module() to get the module, then getattr(module, fn_name) to get the callable.
Implement a plugin dependency resolver that returns a safe load order using topological sort.
from dataclasses import dataclass, field
@dataclass
class PluginMeta:
name: str
depends_on: list[str] = field(default_factory=list)
def resolve_load_order(plugins: list[PluginMeta]) -> list[str]:
# Build in-degree and adjacency
in_degree = {p.name: 0 for p in plugins}
dependents: dict[str, list[str]] = {p.name: [] for p in plugins}
for p in plugins:
for dep in p.depends_on:
in_degree[p.name] += 1
dependents[dep].append(p.name)
# Kahn's algorithm
queue = [name for name, degree in in_degree.items() if degree == 0]
order = []
while queue:
node = queue.pop(0)
order.append(node)
for dep_name in dependents[node]:
in_degree[dep_name] -= 1
if in_degree[dep_name] == 0:
queue.append(dep_name)
if len(order) != len(plugins):
raise ValueError("Circular dependency detected")
return order
# Test 1: valid order
plugins = [
PluginMeta("frontend", depends_on=["api", "auth"]),
PluginMeta("api", depends_on=["database", "cache"]),
PluginMeta("auth", depends_on=["database"]),
PluginMeta("database"),
PluginMeta("cache"),
]
print(f"Load order: {resolve_load_order(plugins)}")
# Test 2: cycle
cyclic = [
PluginMeta("A", depends_on=["B"]),
PluginMeta("B", depends_on=["C"]),
PluginMeta("C", depends_on=["A"]),
]
try:
resolve_load_order(cyclic)
except ValueError as e:
print(e)Solution
The solution is above. Kahn's algorithm runs in O(V + E) time where V is the number of plugins and E is the total number of dependency edges.
Why topological sort matters for plugins:
- Plugin B that depends on Plugin A must load AFTER A.
- If A registers a hook that B wants to subscribe to, A must already be running when B loads.
- If the graph has a cycle (A depends on B, B depends on A), no valid load order exists — this is a hard error.
Real-world usage:
- pytest plugins declare
pytest11entry points and can specify ordering withtryfirst/trylastmarkers. - Django apps in
INSTALLED_APPSmust be ordered correctly for migrations and signal registration.
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class PluginMeta:
name: str
depends_on: list[str] = field(default_factory=list)
def resolve_load_order(plugins: list[PluginMeta]) -> list[str]:
"""Return plugin names in dependency-safe load order.
Raise ValueError if there is a circular dependency.
"""
passExpected Output
Load order: ['database', 'auth', 'cache', 'api', 'frontend']\nCircular dependency detectedHints
Hint 1: This is a topological sort problem. Use Kahns algorithm: start with nodes that have no dependencies, remove them from the graph, repeat.
Hint 2: If the final sorted list is shorter than the input, there is a cycle (some nodes were never reachable from the zero-in-degree set).
Implement a versioned plugin system where plugins declare their required API version and the loader validates compatibility.
from dataclasses import dataclass
class VersionError(Exception):
pass
@dataclass
class VersionedPlugin:
name: str
api_version: str # e.g. "1.0", "1.2"
def on_load(self) -> None:
pass
def execute(self, data: str) -> str:
return f"[{self.name}] {data}"
class VersionedPluginLoader:
def __init__(self, host_api_version: str):
self._host_major, self._host_minor = self._parse(host_api_version)
self._loaded: list[VersionedPlugin] = []
def _parse(self, version: str) -> tuple[int, int]:
parts = version.split(".")
return int(parts[0]), int(parts[1])
def load(self, plugin: VersionedPlugin) -> None:
p_major, p_minor = self._parse(plugin.api_version)
if p_major != self._host_major:
raise VersionError(
f"{plugin.name} requires API {plugin.api_version}, "
f"host is {self._host_major}.{self._host_minor}"
)
if p_minor > self._host_minor:
raise VersionError(
f"{plugin.name} requires API {plugin.api_version}, "
f"host is {self._host_major}.{self._host_minor}"
)
plugin.on_load()
self._loaded.append(plugin)
print(f"Plugin v{plugin.api_version} loaded: {plugin.name}")
def execute_all(self, data: str) -> list[str]:
return [p.execute(data) for p in self._loaded]
loader = VersionedPluginLoader("1.3")
loader.load(VersionedPlugin("text_analyzer", "1.0"))
loader.load(VersionedPlugin("sentiment_analyzer", "1.2"))
try:
loader.load(VersionedPlugin("image_processor", "2.0"))
except VersionError as e:
print(f"VersionError: {e}")
try:
loader.load(VersionedPlugin("legacy_plugin", "0.9"))
except VersionError as e:
print(f"VersionError: {e}")Solution
The solution is above. The compatibility rule: major must match exactly, plugin minor must be less than or equal to host minor.
This follows semantic versioning (semver):
- Major version bump: breaking changes — old plugins will not work.
- Minor version bump: new features, backward compatible — plugins targeting older minor versions still work.
In production, use the packaging library:
from packaging.version import Version
from packaging.specifiers import SpecifierSet
plugin_requires = ">=1.0,<2.0" # from plugin metadata
host_version = "1.3.0"
spec = SpecifierSet(plugin_requires)
if Version(host_version) not in spec:
raise VersionError(f"Host {host_version} does not satisfy {plugin_requires}")
from dataclasses import dataclass
from typing import Protocol
# Implement a versioned plugin system:
# - Plugins declare a PLUGIN_API_VERSION
# - The PluginLoader checks compatibility: major version must match
# - Minor version on the plugin can be <= host minor version
# - Load succeeds if compatible, raises VersionError if notExpected Output
Plugin v1.0 loaded: text_analyzer\nPlugin v1.2 loaded: sentiment_analyzer\nVersionError: image_processor requires API 2.0, host is 1.3\nVersionError: legacy_plugin requires API 0.9, host is 1.3Hints
Hint 1: Parse version strings into (major, minor) tuples for comparison. A plugin is compatible if plugin.major == host.major and plugin.minor <= host.minor.
Hint 2: Define a VersionError exception. The loader checks each plugin before calling on_load().
Hard
Build a hot-reload plugin system where plugins can be reloaded at runtime without restarting the application.
import importlib
import sys
import types
from typing import Callable
class HotReloadPluginSystem:
def __init__(self):
self._registry: dict[str, Callable] = {}
self._modules: dict[str, types.ModuleType] = {}
self._reload_counts: dict[str, int] = {}
def load(self, module_path: str, fn_name: str, plugin_name: str) -> None:
module = importlib.import_module(module_path)
self._modules[plugin_name] = module
self._registry[plugin_name] = getattr(module, fn_name)
self._reload_counts[plugin_name] = 1
def reload(self, plugin_name: str, fn_name: str) -> None:
module = self._modules[plugin_name]
importlib.reload(module)
self._registry[plugin_name] = getattr(module, fn_name)
self._reload_counts[plugin_name] += 1
def execute(self, plugin_name: str, *args) -> str:
return self._registry[plugin_name](*args)
def reload_count(self, plugin_name: str) -> int:
return self._reload_counts.get(plugin_name, 0)
# Simulate plugin module v1
def _install_v1():
mod = types.ModuleType("demo_plugin")
def process(text: str) -> str:
return text.upper()
mod.process = process
sys.modules["demo_plugin"] = mod
def _upgrade_to_v2():
mod = sys.modules["demo_plugin"]
def process(text: str) -> str:
return text.lower() # changed behavior
mod.process = process
# Simulate importlib.reload updating the module
return mod
_install_v1()
system = HotReloadPluginSystem()
system.load("demo_plugin", "process", "text_processor")
print(f"v1 loaded: result = {system.execute('text_processor', 'Hello World')}")
# Simulate upgrading the plugin
_upgrade_to_v2()
importlib.reload(sys.modules["demo_plugin"])
system.reload("text_processor", "process")
print(f"v2 reloaded: result = {system.execute('text_processor', 'Hello World')}")
print(f"Reload count: {system.reload_count('text_processor')}")Solution
The solution is above. importlib.reload() re-executes the module's top-level code and updates the sys.modules cache — any new def, class, or variable assignments reflect the latest version.
Production considerations:
- Modules with module-level state (singletons, caches) may behave unexpectedly on reload.
- Thread safety: ensure no other thread is executing plugin code during reload.
- File watching: use
watchdogorinotifyto trigger reload when the plugin.pyfile changes.
Watching for changes:
import time
import os
def watch_and_reload(plugin_path: str, plugin_name: str, fn_name: str, system: HotReloadPluginSystem):
last_mtime = os.path.getmtime(plugin_path)
while True:
time.sleep(1)
mtime = os.path.getmtime(plugin_path)
if mtime != last_mtime:
system.reload(plugin_name, fn_name)
last_mtime = mtime
print(f"Auto-reloaded: {plugin_name}")
import importlib
import sys
from typing import Callable
# Implement a plugin system where plugins can be reloaded at runtime:
# - load_plugin(module_path) imports and activates a plugin
# - reload_plugin(module_path) re-imports the module and updates the registry
# - The registry always holds the latest version of each plugin callableExpected Output
v1 loaded: result = HELLO WORLD\nv2 reloaded: result = hello world\nReload count: 2Hints
Hint 1: Use importlib.import_module() for initial load. For reload, use importlib.reload() which re-executes the module code and updates the module in sys.modules.
Hint 2: Store a reference to the module object, not the function. Re-fetch the function via getattr() after reload so callers get the new version.
Build a plugin sandbox that enforces execution timeouts and catches failures, returning structured result objects.
import signal
import time
from dataclasses import dataclass
from typing import Callable, Any, Optional
@dataclass
class PluginResult:
name: str
success: bool
value: Any = None
error: Optional[str] = None
elapsed_seconds: float = 0.0
class TimeoutError(Exception):
pass
class PluginSandbox:
def __init__(self, timeout_seconds: float = 1.0):
self._timeout = timeout_seconds
def _timeout_handler(self, signum, frame):
raise TimeoutError(f"Execution timed out after {self._timeout}s")
def run(self, name: str, fn: Callable, *args, **kwargs) -> PluginResult:
start = time.perf_counter()
try:
# Set alarm (Unix only; on Windows, use threading.Timer)
old_handler = signal.signal(signal.SIGALRM, self._timeout_handler)
signal.setitimer(signal.ITIMER_REAL, self._timeout)
try:
result = fn(*args, **kwargs)
finally:
signal.setitimer(signal.ITIMER_REAL, 0)
signal.signal(signal.SIGALRM, old_handler)
elapsed = time.perf_counter() - start
return PluginResult(name=name, success=True, value=result, elapsed_seconds=elapsed)
except TimeoutError:
elapsed = time.perf_counter() - start
return PluginResult(name=name, success=False, error=f"timeout after {self._timeout}s", elapsed_seconds=elapsed)
except Exception as e:
elapsed = time.perf_counter() - start
return PluginResult(name=name, success=False, error=str(e), elapsed_seconds=elapsed)
sandbox = PluginSandbox(timeout_seconds=0.5)
def fast_plugin(): return 42
def slow_plugin():
time.sleep(2.0)
return "done"
def crash_plugin(): return 1 / 0
plugins = [("fast_plugin", fast_plugin), ("slow_plugin", slow_plugin), ("crash_plugin", crash_plugin)]
for name, fn in plugins:
result = sandbox.run(name, fn)
if result.success:
print(f"{name}: success in ~{result.elapsed_seconds:.1f}s, result={result.value}")
elif "timeout" in (result.error or ""):
print(f"{name}: timeout after {sandbox._timeout}s")
else:
print(f"{name}: error={result.error}")
print(f"All {len(plugins)} plugins attempted")Solution
The solution is above. Note that signal.SIGALRM is only available on Unix/macOS.
Cross-platform timeout using threads:
import threading
class ThreadedSandbox:
def run(self, name: str, fn: Callable, timeout: float = 1.0) -> PluginResult:
result_holder = []
error_holder = []
def target():
try:
result_holder.append(fn())
except Exception as e:
error_holder.append(e)
t = threading.Thread(target=target, daemon=True)
t.start()
t.join(timeout=timeout)
if t.is_alive():
return PluginResult(name=name, success=False, error="timeout")
if error_holder:
return PluginResult(name=name, success=False, error=str(error_holder[0]))
return PluginResult(name=name, success=True, value=result_holder[0])
The threading approach works cross-platform but cannot forcibly terminate a thread — the plugin keeps running in the background after timeout.
import signal
import time
from typing import Callable, Any
# Implement a plugin sandbox that:
# 1. Limits execution time (timeout)
# 2. Catches exceptions and returns error results instead of crashing
# 3. Provides execution metrics (time taken, success/failure)
# 4. Runs plugins in isolation (exception in one does not affect others)Expected Output
fast_plugin: success in ~0.0s, result=42\nslow_plugin: timeout after 0.5s\ncrash_plugin: error=division by zero\nAll 3 plugins attemptedHints
Hint 1: Use signal.alarm(seconds) on Unix to set a timeout. Handle SIGALRM to raise TimeoutError. Reset with signal.alarm(0) after execution.
Hint 2: Wrap each plugin call in try/except to catch all exceptions. Record start/end time for metrics.
Implement a plugin configuration schema and validator that checks required fields, types, and fills in defaults.
from dataclasses import dataclass, field
from typing import Any, Optional, Type
@dataclass
class ConfigField:
name: str
type_: Type
required: bool = True
default: Any = None
description: str = ""
@dataclass
class PluginConfigSchema:
fields: list[ConfigField] = field(default_factory=list)
def field(self, name: str, type_: Type, required: bool = True, default: Any = None, description: str = "") -> "PluginConfigSchema":
self.fields.append(ConfigField(name, type_, required, default, description))
return self
class PluginConfigValidator:
def __init__(self, schema: PluginConfigSchema):
self._schema = schema
def validate(self, config: dict) -> tuple[bool, list[str], dict]:
errors = []
result = {}
for f in self._schema.fields:
if f.name in config:
value = config[f.name]
if not isinstance(value, f.type_):
errors.append(f"{f.name}: expected {f.type_.__name__}, got {type(value).__name__}")
else:
result[f.name] = value
elif f.required:
errors.append(f"{f.name}: required field missing")
else:
result[f.name] = f.default
return len(errors) == 0, errors, result
# Define email plugin schema
email_schema = (
PluginConfigSchema()
.field("host", str, required=True)
.field("port", int, required=True)
.field("use_tls", bool, required=True)
.field("timeout", int, required=False, default=30)
)
validator = PluginConfigValidator(email_schema)
# Valid config
ok, errors, result = validator.validate({"host": "smtp.gmail.com", "port": 587, "use_tls": True})
print(f"email_plugin config valid: {result}")
# Invalid config
ok, errors, result = validator.validate({"host": "smtp.gmail.com", "port": "587"})
print(f"Validation errors: {errors}")
# Minimal config - defaults filled
ok, errors, result = validator.validate({"host": "localhost", "port": 25, "use_tls": False})
print(f"Default filled: {result}")Solution
The solution is above. In production, use pydantic models instead of hand-rolled validators:
from pydantic import BaseModel, Field
class EmailPluginConfig(BaseModel):
host: str
port: int
use_tls: bool
timeout: int = 30
# Validation
try:
cfg = EmailPluginConfig(**user_provided_dict)
except ValidationError as e:
print(e.errors())
Pydantic gives you: type coercion, nested models, custom validators, JSON schema export, and IDE support — all for free.
from dataclasses import dataclass, field
from typing import Any, Optional
# Implement a plugin configuration system where:
# - Each plugin declares a config schema (required fields, types, defaults)
# - The PluginConfigValidator validates config dicts against schemas
# - Missing required fields and wrong types produce clear error messages
# - Default values are filled in for optional fieldsExpected Output
email_plugin config valid: {'host': 'smtp.gmail.com', 'port': 587, 'use_tls': True, 'timeout': 30}\nValidation errors: ['port: expected int, got str', 'use_tls: required field missing']\nDefault filled: {'host': 'localhost', 'port': 25, 'use_tls': False, 'timeout': 30}Hints
Hint 1: Define a ConfigField dataclass with name, type_, required, default. The validator iterates over schema fields and checks the config dict.
Hint 2: For type checking, use isinstance(value, field.type_). For missing required fields, check if key is in config.
Build a complete, production-quality plugin framework with metadata, version checking, dependency resolution, lifecycle management, and a data pipeline.
from dataclasses import dataclass, field
from typing import Optional, Any
from abc import ABC, abstractmethod
# ── PLUGIN BASE ───────────────────────────────────────────────────────────────
@dataclass
class PluginMetadata:
name: str
version: str
description: str
depends_on: list[str] = field(default_factory=list)
min_api_version: str = "1.0"
class PluginBase(ABC):
metadata: PluginMetadata
def on_load(self) -> None:
pass
def on_unload(self) -> None:
pass
@abstractmethod
def process(self, data: list[dict]) -> list[dict]:
pass
# ── PLUGIN REGISTRY ───────────────────────────────────────────────────────────
class PluginRegistry:
HOST_API_VERSION = (1, 3)
def __init__(self):
self._registered: dict[str, type] = {} # name -> class
self._loaded: dict[str, PluginBase] = {} # name -> instance (load order)
def register(self, cls: type) -> type:
meta = cls.metadata
self._registered[meta.name] = cls
return cls
def _check_api_version(self, meta: PluginMetadata) -> None:
major, minor = map(int, meta.min_api_version.split("."))
h_major, h_minor = self.HOST_API_VERSION
if major != h_major or minor > h_minor:
raise ValueError(
f"{meta.name} requires API {meta.min_api_version}, "
f"host is {h_major}.{h_minor}"
)
def _resolve_order(self, names: list[str]) -> list[str]:
in_degree = {n: 0 for n in names}
dependents: dict[str, list[str]] = {n: [] for n in names}
for n in names:
for dep in self._registered[n].metadata.depends_on:
in_degree[n] += 1
dependents.setdefault(dep, []).append(n)
queue = [n for n, d in in_degree.items() if d == 0]
order = []
while queue:
node = queue.pop(0)
order.append(node)
for dep_name in dependents.get(node, []):
in_degree[dep_name] -= 1
if in_degree[dep_name] == 0:
queue.append(dep_name)
if len(order) != len(names):
raise ValueError("Circular plugin dependencies detected")
return order
def load_all(self) -> None:
order = self._resolve_order(list(self._registered.keys()))
for name in order:
cls = self._registered[name]
self._check_api_version(cls.metadata)
instance = cls()
instance.on_load()
self._loaded[name] = instance
print(f"Loaded: {name} v{cls.metadata.version}")
def unload_all(self) -> None:
for name in reversed(list(self._loaded.keys())):
self._loaded[name].on_unload()
print(f"Unloaded: {name}")
self._loaded.clear()
def get_pipeline(self) -> list[PluginBase]:
return list(self._loaded.values())
# ── PLUGINS ───────────────────────────────────────────────────────────────────
registry = PluginRegistry()
@registry.register
class FilterPlugin(PluginBase):
metadata = PluginMetadata("filter", "1.0.0", "Removes rows with missing values")
def process(self, data: list[dict]) -> list[dict]:
result = [row for row in data if all(v is not None for v in row.values())]
print(f" Filter: {len(data)} -> {len(result)} rows")
return result
@registry.register
class TransformPlugin(PluginBase):
metadata = PluginMetadata("transform", "1.1.0", "Normalizes string fields", depends_on=["filter"])
def process(self, data: list[dict]) -> list[dict]:
result = [{k: v.strip().lower() if isinstance(v, str) else v for k, v in row.items()} for row in data]
print(f" Transform: normalized {len(result)} rows")
return result
@registry.register
class AggregatorPlugin(PluginBase):
metadata = PluginMetadata("aggregator", "1.2.0", "Adds row count metadata", depends_on=["transform"])
def process(self, data: list[dict]) -> list[dict]:
for i, row in enumerate(data):
row["_row_index"] = i
print(f" Aggregator: indexed {len(data)} rows")
return data
# ── DATA PIPELINE ─────────────────────────────────────────────────────────────
class DataPipeline:
def __init__(self, registry: PluginRegistry):
self._registry = registry
def run(self, data: list[dict]) -> list[dict]:
result = data
for plugin in self._registry.get_pipeline():
result = plugin.process(result)
return result
# ── MAIN ──────────────────────────────────────────────────────────────────────
registry.load_all()
print("---")
pipeline = DataPipeline(registry)
raw_data = [
{"name": " Alice ", "score": 95},
{"name": None, "score": 80},
{"name": "BOB", "score": 72},
]
result = pipeline.run(raw_data)
print(f"Final result: {result}")
print("---")
registry.unload_all()Solution
The solution is a complete plugin framework demonstrating:
- Metadata-driven design: Each plugin declares its version, description, and dependencies in
PluginMetadata— no implicit coupling. - Decorator registration:
@registry.registerkeeps plugin definition clean and explicit. - Dependency-safe load order: Topological sort ensures
filterloads beforetransformbeforeaggregator. - API version checking: Plugins with incompatible
min_api_versionare rejected at load time. - Clean lifecycle:
on_load/on_unloadhooks bracket the plugin's active lifetime, with LIFO unloading. - Pipeline composition:
DataPipelineis agnostic to specific plugins — it callsprocess()on whatever the registry loaded.
To add a new plugin: implement PluginBase, add @registry.register, declare dependencies. Zero changes to the framework.
# Build a complete plugin framework for a data processing pipeline:
# - PluginBase: abstract base with metadata (name, version, description)
# - PluginRegistry: discover, validate versions, resolve load order, manage lifecycle
# - DataPipeline: runs data through all loaded plugins in order
# - Three plugins: FilterPlugin, TransformPlugin, AggregatorPlugin
# - Show the full lifecycle: register -> load -> execute -> unloadExpected Output
See solution for expected outputHints
Hint 1: PluginRegistry combines the version checker, dependency resolver, and lifecycle manager from earlier problems.
Hint 2: DataPipeline.run(data) passes data through each plugin.process(data) in load order and collects results.
