Plugin Systems - Building Extensible Applications
Here is an application that processes files in different formats. Study how it handles adding a new format.
# processor.py
class FileProcessor:
def process(self, filepath: str) -> dict:
if filepath.endswith(".csv"):
return self._process_csv(filepath)
elif filepath.endswith(".json"):
return self._process_json(filepath)
elif filepath.endswith(".xml"):
return self._process_xml(filepath)
elif filepath.endswith(".parquet"):
return self._process_parquet(filepath)
# Adding YAML? Edit this file.
# Adding TOML? Edit this file again.
# Adding Excel? Edit this file yet again.
else:
raise ValueError(f"Unsupported format: {filepath}")
Every new format requires modifying FileProcessor. A third-party developer who wants to add Avro support must fork your repository. This violates the Open/Closed Principle: the class is not open for extension without modification.
A plugin system solves this. New formats are registered externally - in separate packages, separate files, or even separate teams - without touching FileProcessor.
What You Will Learn
- How
entry_pointsinpyproject.tomlenable cross-package plugin discovery - Using
importlib.metadatato load plugins at runtime - The
stevedorelibrary for production-grade plugin management __init_subclass__for automatic registration of subclasses- Building a complete extensible CLI tool with plugins
- Plugin lifecycle management: load, initialize, run, cleanup
- Ordering plugins by dependencies using topological sort
Prerequisites
- Solid understanding of Python classes, inheritance, and metaclasses
- Familiarity with
pyproject.tomland Python packaging - Experience with abstract base classes and
typing.Protocol - Understanding of dependency injection (previous lesson)
Part 1 - The Plugin Architecture Pattern
A plugin system has three components: the host application that defines extension points, the plugin interface that specifies the contract, and the plugins themselves that implement the contract.
The Plugin Interface
# core/plugin_interface.py
from abc import ABC, abstractmethod
from typing import Any
class FileFormatPlugin(ABC):
"""Contract that all file format plugins must satisfy."""
@property
@abstractmethod
def name(self) -> str:
"""Human-readable name of the format."""
...
@property
@abstractmethod
def extensions(self) -> list[str]:
"""File extensions this plugin handles (e.g., ['.csv', '.tsv'])."""
...
@abstractmethod
def read(self, filepath: str) -> list[dict[str, Any]]:
"""Read a file and return a list of records."""
...
@abstractmethod
def write(self, filepath: str, records: list[dict[str, Any]]) -> None:
"""Write records to a file."""
...
def validate(self, filepath: str) -> bool:
"""Optional: validate file before reading. Default returns True."""
return True
Part 2 - __init_subclass__ for Automatic Registration
The simplest plugin discovery mechanism in Python: when a class inherits from your base class, __init_subclass__ fires automatically.
# core/registry.py
from typing import Any
class PluginRegistry:
"""Base class that auto-registers all subclasses."""
_plugins: dict[str, type] = {}
def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
# Only register concrete implementations (those with 'name')
if hasattr(cls, "name") and isinstance(cls.name, property):
# Cannot read property on uninstantiated class, skip abstract
pass
elif hasattr(cls, "name"):
PluginRegistry._plugins[cls.name] = cls
@classmethod
def get_plugin(cls, name: str) -> type:
if name not in cls._plugins:
raise KeyError(
f"No plugin named '{name}'. Available: {list(cls._plugins.keys())}"
)
return cls._plugins[name]
@classmethod
def all_plugins(cls) -> dict[str, type]:
return dict(cls._plugins)
@classmethod
def clear(cls) -> None:
"""Reset registry (useful for testing)."""
cls._plugins.clear()
Using the Registry
# plugins/csv_plugin.py
import csv
from core.registry import PluginRegistry
class CsvPlugin(PluginRegistry):
name = "csv"
extensions = [".csv", ".tsv"]
def read(self, filepath: str) -> list[dict]:
with open(filepath, newline="") as f:
delimiter = "\t" if filepath.endswith(".tsv") else ","
reader = csv.DictReader(f, delimiter=delimiter)
return list(reader)
def write(self, filepath: str, records: list[dict]) -> None:
if not records:
return
with open(filepath, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=records[0].keys())
writer.writeheader()
writer.writerows(records)
# plugins/json_plugin.py
import json
from core.registry import PluginRegistry
class JsonPlugin(PluginRegistry):
name = "json"
extensions = [".json"]
def read(self, filepath: str) -> list[dict]:
with open(filepath) as f:
data = json.load(f)
return data if isinstance(data, list) else [data]
def write(self, filepath: str, records: list[dict]) -> None:
with open(filepath, "w") as f:
json.dump(records, f, indent=2)
# The host application - no if/elif chain
from core.registry import PluginRegistry
# Import plugins so __init_subclass__ fires
import plugins.csv_plugin
import plugins.json_plugin
# Discover all registered plugins
print(PluginRegistry.all_plugins())
# {'csv': <class 'CsvPlugin'>, 'json': <class 'JsonPlugin'>}
# Use a plugin
plugin_cls = PluginRegistry.get_plugin("csv")
plugin = plugin_cls()
records = plugin.read("data.csv")
:::tip When init_subclass Works Well
This pattern works when all plugins live in the same codebase or are imported explicitly. For cross-package plugin discovery (third-party plugins installed via pip), you need entry_points.
:::
Part 3 - Entry Points and importlib.metadata
Python's packaging system includes a plugin discovery mechanism called entry points. Any installed package can declare entry points in its pyproject.toml, and any application can discover them at runtime.
Declaring Entry Points (Plugin Side)
# In the plugin package's pyproject.toml
[project]
name = "my-csv-plugin"
version = "1.0.0"
[project.entry-points."fileprocessor.formats"]
csv = "my_csv_plugin:CsvPlugin"
tsv = "my_csv_plugin:TsvPlugin"
The key "fileprocessor.formats" is the group name - a namespace that the host application uses to find plugins. The values are module:attribute references to plugin classes.
Discovering Entry Points (Host Side)
# core/discovery.py
from importlib.metadata import entry_points
from typing import Any
def discover_plugins(group: str) -> dict[str, Any]:
"""
Discover all plugins registered under a given entry point group.
Returns a dict mapping plugin names to loaded plugin classes/objects.
"""
discovered = {}
eps = entry_points(group=group)
for ep in eps:
try:
plugin_cls = ep.load() # imports the module and gets the attribute
discovered[ep.name] = plugin_cls
except Exception as e:
# Log but do not crash - one bad plugin should not break the app
import logging
logging.getLogger(__name__).warning(
f"Failed to load plugin '{ep.name}': {e}"
)
return discovered
# Using discovered plugins
plugins = discover_plugins("fileprocessor.formats")
# {'csv': <class 'CsvPlugin'>, 'tsv': <class 'TsvPlugin'>}
csv_plugin = plugins["csv"]()
records = csv_plugin.read("data.csv")
Full Plugin Manager
# core/plugin_manager.py
from importlib.metadata import entry_points
from typing import Any, Optional
import logging
from core.plugin_interface import FileFormatPlugin
logger = logging.getLogger(__name__)
class PluginManager:
"""Manages plugin discovery, loading, and access."""
def __init__(self, group: str) -> None:
self._group = group
self._plugins: dict[str, FileFormatPlugin] = {}
self._extension_map: dict[str, FileFormatPlugin] = {}
def discover_and_load(self) -> None:
"""Discover all plugins via entry points and instantiate them."""
eps = entry_points(group=self._group)
for ep in eps:
try:
plugin_cls = ep.load()
if not issubclass(plugin_cls, FileFormatPlugin):
logger.warning(
f"Plugin '{ep.name}' does not implement FileFormatPlugin"
)
continue
plugin = plugin_cls()
self._plugins[ep.name] = plugin
for ext in plugin.extensions:
if ext in self._extension_map:
logger.warning(
f"Extension '{ext}' already handled by "
f"'{self._extension_map[ext].name}', "
f"overriding with '{ep.name}'"
)
self._extension_map[ext] = plugin
logger.info(f"Loaded plugin: {ep.name} (extensions: {plugin.extensions})")
except Exception as e:
logger.error(f"Failed to load plugin '{ep.name}': {e}")
def get_by_name(self, name: str) -> Optional[FileFormatPlugin]:
return self._plugins.get(name)
def get_by_extension(self, ext: str) -> Optional[FileFormatPlugin]:
return self._extension_map.get(ext)
def list_plugins(self) -> list[str]:
return list(self._plugins.keys())
def process_file(self, filepath: str) -> list[dict[str, Any]]:
"""Automatically select the right plugin based on file extension."""
import os
_, ext = os.path.splitext(filepath)
plugin = self.get_by_extension(ext)
if plugin is None:
raise ValueError(
f"No plugin handles extension '{ext}'. "
f"Available: {list(self._extension_map.keys())}"
)
if not plugin.validate(filepath):
raise ValueError(f"File validation failed for {filepath}")
return plugin.read(filepath)
# main.py
manager = PluginManager("fileprocessor.formats")
manager.discover_and_load()
# Loaded plugin: csv (extensions: ['.csv', '.tsv'])
# Loaded plugin: json (extensions: ['.json'])
records = manager.process_file("data.csv") # auto-selects CsvPlugin
:::note Entry Points Work Across Packages
The power of entry points is that a completely separate pip-installable package can register a plugin. pip install my-avro-plugin - and the next time your app starts, the Avro plugin is automatically discovered. No configuration changes needed.
:::
Part 4 - The Stevedore Library
OpenStack's stevedore library wraps importlib.metadata with a richer API for plugin management.
pip install stevedore
Driver Pattern (One Plugin Selected)
# When you need exactly one plugin for a given name
from stevedore import driver
def load_format_driver(format_name: str, filepath: str) -> list[dict]:
mgr = driver.DriverManager(
namespace="fileprocessor.formats",
name=format_name,
invoke_on_load=False,
)
plugin = mgr.driver()
return plugin.read(filepath)
# Usage
records = load_format_driver("csv", "data.csv")
Extension Pattern (All Plugins Loaded)
# When you want to invoke all plugins (e.g., validation hooks)
from stevedore import ExtensionManager
def validate_with_all_plugins(data: dict) -> list[str]:
"""Run all registered validators and collect errors."""
errors = []
mgr = ExtensionManager(
namespace="myapp.validators",
invoke_on_load=False,
)
def _validate(ext, data):
plugin = ext.obj or ext.plugin()
result = plugin.validate(data)
if not result.is_valid:
errors.extend(result.errors)
mgr.map(_validate, data)
return errors
Named Extensions Pattern
# When you want specific plugins by name
from stevedore import NamedExtensionManager
def load_specific_plugins(names: list[str]):
mgr = NamedExtensionManager(
namespace="fileprocessor.formats",
names=names,
invoke_on_load=True,
)
return {ext.name: ext.obj for ext in mgr}
# Load only CSV and JSON, skip others
plugins = load_specific_plugins(["csv", "json"])
| Stevedore Manager | Behavior | Use Case |
|---|---|---|
DriverManager | Loads exactly one plugin by name | Strategy pattern (select one format) |
ExtensionManager | Loads all plugins in a namespace | Hooks, validators, event listeners |
NamedExtensionManager | Loads specific named plugins | Feature flags, user-selected plugins |
HookManager | Loads all, calls each | Pipeline processing |
EnabledExtensionManager | Loads all, filters by predicate | Conditional activation |
Part 5 - Building an Extensible CLI Tool
Let us build a complete example: a data pipeline CLI that supports pluggable transformations.
The Plugin Interface
# pipeline/transform_interface.py
from abc import ABC, abstractmethod
from typing import Any
class TransformPlugin(ABC):
"""Interface for data transformation plugins."""
@property
@abstractmethod
def name(self) -> str:
"""Unique identifier for this transform."""
...
@property
@abstractmethod
def description(self) -> str:
"""Human-readable description."""
...
@abstractmethod
def transform(self, records: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Transform a list of records and return the result."""
...
@property
def dependencies(self) -> list[str]:
"""Names of transforms that must run before this one. Default: none."""
return []
Built-In Plugins
# pipeline/transforms/filter_nulls.py
from pipeline.transform_interface import TransformPlugin
class FilterNullsTransform(TransformPlugin):
name = "filter_nulls"
description = "Remove records where any field is None or empty string"
def transform(self, records: list[dict]) -> list[dict]:
return [
r for r in records
if all(v is not None and v != "" for v in r.values())
]
# pipeline/transforms/normalize_emails.py
class NormalizeEmailsTransform(TransformPlugin):
name = "normalize_emails"
description = "Lowercase and strip whitespace from email fields"
def transform(self, records: list[dict]) -> list[dict]:
result = []
for record in records:
new_record = dict(record)
for key, value in new_record.items():
if "email" in key.lower() and isinstance(value, str):
new_record[key] = value.strip().lower()
result.append(new_record)
return result
# pipeline/transforms/deduplicate.py
class DeduplicateTransform(TransformPlugin):
name = "deduplicate"
description = "Remove duplicate records based on all fields"
dependencies = ["filter_nulls"] # run after nulls are removed
def transform(self, records: list[dict]) -> list[dict]:
seen = set()
unique = []
for record in records:
key = tuple(sorted(record.items()))
if key not in seen:
seen.add(key)
unique.append(record)
return unique
Entry Point Registration
# pyproject.toml
[project.entry-points."pipeline.transforms"]
filter_nulls = "pipeline.transforms.filter_nulls:FilterNullsTransform"
normalize_emails = "pipeline.transforms.normalize_emails:NormalizeEmailsTransform"
deduplicate = "pipeline.transforms.deduplicate:DeduplicateTransform"
The Pipeline Runner with Dependency Ordering
# pipeline/runner.py
from importlib.metadata import entry_points
from typing import Any
import logging
from pipeline.transform_interface import TransformPlugin
logger = logging.getLogger(__name__)
class Pipeline:
def __init__(self) -> None:
self._transforms: dict[str, TransformPlugin] = {}
def discover(self) -> None:
"""Load all transform plugins from entry points."""
eps = entry_points(group="pipeline.transforms")
for ep in eps:
try:
cls = ep.load()
plugin = cls()
self._transforms[plugin.name] = plugin
logger.info(f"Loaded transform: {plugin.name}")
except Exception as e:
logger.error(f"Failed to load transform '{ep.name}': {e}")
def register(self, plugin: TransformPlugin) -> None:
"""Manually register a plugin (useful for testing)."""
self._transforms[plugin.name] = plugin
def _topological_sort(self, names: list[str]) -> list[str]:
"""Sort transforms respecting dependency ordering."""
visited: set[str] = set()
order: list[str] = []
visiting: set[str] = set()
def visit(name: str) -> None:
if name in visited:
return
if name in visiting:
raise ValueError(f"Circular dependency detected involving '{name}'")
if name not in self._transforms:
raise ValueError(f"Unknown transform: '{name}'")
visiting.add(name)
for dep in self._transforms[name].dependencies:
if dep in names or dep in self._transforms:
visit(dep)
visiting.remove(name)
visited.add(name)
order.append(name)
for name in names:
visit(name)
return order
def run(
self,
records: list[dict[str, Any]],
transform_names: list[str] | None = None,
) -> list[dict[str, Any]]:
"""Run the pipeline, optionally with only specific transforms."""
if transform_names is None:
transform_names = list(self._transforms.keys())
ordered = self._topological_sort(transform_names)
logger.info(f"Running transforms in order: {ordered}")
result = records
for name in ordered:
transform = self._transforms[name]
before_count = len(result)
result = transform.transform(result)
after_count = len(result)
logger.info(
f" {name}: {before_count} -> {after_count} records"
)
return result
The CLI
# pipeline/cli.py
import click
import json
from pipeline.runner import Pipeline
@click.group()
def cli():
"""Data pipeline with pluggable transforms."""
pass
@cli.command()
def list_transforms():
"""List all available transform plugins."""
pipeline = Pipeline()
pipeline.discover()
for name, plugin in sorted(pipeline._transforms.items()):
deps = f" (after: {', '.join(plugin.dependencies)})" if plugin.dependencies else ""
click.echo(f" {name}: {plugin.description}{deps}")
@cli.command()
@click.argument("input_file")
@click.argument("output_file")
@click.option("--transforms", "-t", multiple=True, help="Specific transforms to run")
def run(input_file: str, output_file: str, transforms: tuple[str]):
"""Run the pipeline on a file."""
pipeline = Pipeline()
pipeline.discover()
with open(input_file) as f:
records = json.load(f)
transform_names = list(transforms) if transforms else None
result = pipeline.run(records, transform_names)
with open(output_file, "w") as f:
json.dump(result, f, indent=2)
click.echo(f"Processed {len(records)} -> {len(result)} records")
# Usage
$ python -m pipeline list-transforms
deduplicate: Remove duplicate records based on all fields (after: filter_nulls)
filter_nulls: Remove records where any field is None or empty string
normalize_emails: Lowercase and strip whitespace from email fields
$ python -m pipeline run input.json output.json -t filter_nulls -t deduplicate
# Running transforms in order: ['filter_nulls', 'deduplicate']
# filter_nulls: 1000 -> 873 records
# deduplicate: 873 -> 841 records
# Processed 1000 -> 841 records
Part 6 - Plugin Lifecycle Management
Production plugins often need initialization (connect to services) and cleanup (close connections, flush buffers).
# core/lifecycle.py
from abc import ABC, abstractmethod
from typing import Any
class ManagedPlugin(ABC):
"""Plugin with lifecycle hooks."""
@abstractmethod
def name(self) -> str: ...
def on_load(self) -> None:
"""Called when the plugin is first loaded. Setup resources here."""
pass
def on_init(self, config: dict[str, Any]) -> None:
"""Called with plugin-specific configuration."""
pass
@abstractmethod
def execute(self, *args, **kwargs) -> Any:
"""The plugin's main operation."""
...
def on_cleanup(self) -> None:
"""Called during shutdown. Release resources here."""
pass
def health_check(self) -> bool:
"""Called periodically to verify plugin is healthy."""
return True
# core/managed_runner.py
import logging
from typing import Any
logger = logging.getLogger(__name__)
class ManagedPluginRunner:
"""Manages the full lifecycle of plugins."""
def __init__(self) -> None:
self._plugins: list[ManagedPlugin] = []
self._initialized: set[str] = set()
def load(self, plugin: ManagedPlugin) -> None:
"""Load a plugin and call on_load."""
try:
plugin.on_load()
self._plugins.append(plugin)
logger.info(f"Plugin loaded: {plugin.name()}")
except Exception as e:
logger.error(f"Plugin {plugin.name()} failed to load: {e}")
raise
def initialize_all(self, configs: dict[str, dict[str, Any]]) -> None:
"""Initialize all plugins with their configurations."""
for plugin in self._plugins:
config = configs.get(plugin.name(), {})
try:
plugin.on_init(config)
self._initialized.add(plugin.name())
logger.info(f"Plugin initialized: {plugin.name()}")
except Exception as e:
logger.error(f"Plugin {plugin.name()} failed to initialize: {e}")
def execute_all(self, *args, **kwargs) -> list[Any]:
"""Execute all initialized plugins."""
results = []
for plugin in self._plugins:
if plugin.name() not in self._initialized:
logger.warning(f"Skipping uninitialized plugin: {plugin.name()}")
continue
try:
result = plugin.execute(*args, **kwargs)
results.append(result)
except Exception as e:
logger.error(f"Plugin {plugin.name()} execution failed: {e}")
return results
def cleanup_all(self) -> None:
"""Cleanup all plugins in reverse order."""
for plugin in reversed(self._plugins):
try:
plugin.on_cleanup()
logger.info(f"Plugin cleaned up: {plugin.name()}")
except Exception as e:
logger.error(f"Plugin {plugin.name()} cleanup failed: {e}")
self._plugins.clear()
self._initialized.clear()
def __enter__(self):
return self
def __exit__(self, *args):
self.cleanup_all()
Example: Database Export Plugin with Lifecycle
class DatabaseExportPlugin(ManagedPlugin):
def name(self) -> str:
return "db_export"
def on_load(self) -> None:
self._engine = None
self._session = None
def on_init(self, config: dict) -> None:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
self._engine = create_engine(config["database_url"])
Session = sessionmaker(bind=self._engine)
self._session = Session()
def execute(self, records: list[dict]) -> int:
# Bulk insert records
for record in records:
self._session.execute(
text("INSERT INTO exports (data) VALUES (:data)"),
{"data": json.dumps(record)},
)
self._session.commit()
return len(records)
def on_cleanup(self) -> None:
if self._session:
self._session.close()
if self._engine:
self._engine.dispose()
def health_check(self) -> bool:
try:
self._session.execute(text("SELECT 1"))
return True
except Exception:
return False
:::danger Always Handle Plugin Failures Gracefully A crashing plugin should never bring down the host application. Catch exceptions at every lifecycle boundary. Log the error, disable the plugin, and continue operating. :::
Part 7 - Dependency Ordering Between Plugins
When plugins depend on each other, you need topological sorting to determine the correct execution order.
# core/topo_sort.py
from typing import TypeVar
T = TypeVar("T")
def topological_sort(
items: dict[str, list[str]],
) -> list[str]:
"""
Sort items respecting dependencies.
Args:
items: mapping of item name -> list of dependency names
Returns:
Ordered list of item names (dependencies first)
Raises:
ValueError: if circular dependencies are detected
"""
in_degree: dict[str, int] = {name: 0 for name in items}
dependents: dict[str, list[str]] = {name: [] for name in items}
for name, deps in items.items():
for dep in deps:
if dep not in items:
raise ValueError(f"'{name}' depends on unknown item '{dep}'")
in_degree[name] += 1
dependents[dep].append(name)
# Start with items that have no dependencies
queue = [name for name, deg in in_degree.items() if deg == 0]
result: list[str] = []
while queue:
# Sort for deterministic ordering among equal-priority items
queue.sort()
current = queue.pop(0)
result.append(current)
for dependent in dependents[current]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
if len(result) != len(items):
remaining = set(items.keys()) - set(result)
raise ValueError(f"Circular dependency detected among: {remaining}")
return result
# Usage
plugins = {
"csv_reader": [], # no dependencies
"filter_nulls": ["csv_reader"],
"normalize": ["filter_nulls"],
"deduplicate": ["normalize"],
"export": ["deduplicate"],
}
order = topological_sort(plugins)
# ['csv_reader', 'filter_nulls', 'normalize', 'deduplicate', 'export']
Part 8 - Testing Plugin Systems
Testing Individual Plugins
# tests/test_transforms.py
import pytest
from pipeline.transforms.filter_nulls import FilterNullsTransform
from pipeline.transforms.deduplicate import DeduplicateTransform
def test_filter_nulls_removes_none_values():
plugin = FilterNullsTransform()
records = [
{"name": "Charlie", "email": ""},
]
result = plugin.transform(records)
assert len(result) == 1
assert result[0]["name"] == "Alice"
def test_deduplicate_removes_exact_duplicates():
plugin = DeduplicateTransform()
records = [
]
result = plugin.transform(records)
assert len(result) == 2
Testing Plugin Discovery
# tests/test_pipeline.py
from pipeline.runner import Pipeline
from pipeline.transforms.filter_nulls import FilterNullsTransform
from pipeline.transforms.deduplicate import DeduplicateTransform
def test_pipeline_respects_dependency_order():
pipeline = Pipeline()
# Register in wrong order - pipeline should sort them
pipeline.register(DeduplicateTransform()) # depends on filter_nulls
pipeline.register(FilterNullsTransform()) # no dependencies
records = [
{"name": "Alice"},
{"name": None},
{"name": "Alice"}, # duplicate
]
result = pipeline.run(records)
assert len(result) == 1 # null filtered, then deduplicated
def test_pipeline_detects_circular_dependency():
class PluginA(TransformPlugin):
name = "a"
description = "A"
dependencies = ["b"]
def transform(self, records): return records
class PluginB(TransformPlugin):
name = "b"
description = "B"
dependencies = ["a"]
def transform(self, records): return records
pipeline = Pipeline()
pipeline.register(PluginA())
pipeline.register(PluginB())
with pytest.raises(ValueError, match="Circular"):
pipeline.run([{"x": 1}])
Testing with a Fake Plugin
class CountingPlugin(TransformPlugin):
"""Test plugin that counts how many times it was called."""
name = "counter"
description = "Counts invocations"
def __init__(self):
self.call_count = 0
def transform(self, records: list[dict]) -> list[dict]:
self.call_count += 1
return records
def test_pipeline_calls_each_plugin_once():
counter = CountingPlugin()
pipeline = Pipeline()
pipeline.register(counter)
pipeline.run([{"x": 1}])
assert counter.call_count == 1
Key Takeaways
__init_subclass__provides zero-config registration for plugins that live in the same codebase. Subclass a base class and the plugin is automatically discovered.- Entry points enable cross-package plugins: third-party packages register plugins in their
pyproject.toml, and the host application discovers them viaimportlib.metadata.entry_points(). - Stevedore simplifies production plugin management with patterns like Driver (one plugin), Extension (all plugins), and Named (specific plugins).
- Plugin interfaces should be minimal and stable: adding a method to a plugin interface breaks all existing plugins. Prefer optional methods with default implementations.
- Lifecycle management (load, init, execute, cleanup) is essential for plugins that hold resources like database connections or file handles.
- Topological sorting resolves plugin dependencies: when plugins depend on each other, sort them before execution and detect circular dependencies early.
- Always handle plugin failures gracefully: catch exceptions at every boundary, log errors, and continue operating without the failed plugin.
Graded Practice Challenges
Level 1 - Identify the Pattern
Question 1: What happens when a new class inherits from PluginRegistry but does not define a name attribute?
Answer
The __init_subclass__ hook fires, but the if hasattr(cls, "name") check fails (or the check for property prevents registration), so the class is not added to _plugins. This is by design - abstract intermediate classes should not be registered as plugins.
Question 2: A third-party developer installs their Avro plugin package but the host application does not find it. What is the most likely cause?
Answer
The most likely cause is that the entry point group name in the plugin's pyproject.toml does not match the group name the host application is scanning. For example, the plugin declares [project.entry-points."file_processor.formats"] (underscore) but the host calls entry_points(group="fileprocessor.formats") (no underscore). Other causes: the package is installed in a different virtual environment, or the package was not installed in editable mode during development.
Question 3: Why does the DeduplicateTransform declare dependencies = ["filter_nulls"]?
Answer
Deduplication compares records by their field values. If records contain None values, two records like {"name": None, "email": "[email protected]"} and {"name": None, "email": "[email protected]"} should be treated as duplicates, but inconsistent null handling could cause issues. By running filter_nulls first, the deduplication operates on clean data. The dependency declaration ensures the pipeline runner executes transforms in the correct order via topological sort.
Level 2 - Refactoring Challenge
Take this hardcoded notification system and refactor it into a plugin architecture:
class NotificationSender:
def send(self, user: User, message: str):
# Email
smtp = smtplib.SMTP("smtp.company.com")
# SMS
twilio.send(user.phone, message)
# Slack
requests.post(SLACK_WEBHOOK, json={"text": f"{user.name}: {message}"})
# Push notification
firebase.send(user.device_token, message)
Design a NotificationPlugin interface, implement each channel as a plugin, use entry points for registration, and allow enabling/disabling channels per user preference.
Level 3 - Design Challenge
Design a plugin system for a web scraping framework that supports:
- Pluggable page parsers (HTML, JSON API, PDF)
- Pluggable storage backends (filesystem, S3, database)
- Pluggable rate limiters (fixed window, sliding window, token bucket)
- Middleware plugins (logging, caching, retry, proxy rotation)
Define the plugin interfaces, the entry point groups, the lifecycle hooks, and the dependency ordering. How do you handle plugin configuration (each plugin needs different settings)?
What's Next
In the next lesson, Configuration Management - Environment-Driven Apps, we will explore how to externalize application configuration, validate it at startup, manage secrets, and follow the 12-factor config principle - which is essential for any pluggable, deployable system.
