Skip to main content

Plugin Systems - Building Extensible Applications

Here is an application that processes files in different formats. Study how it handles adding a new format.

# processor.py
class FileProcessor:
def process(self, filepath: str) -> dict:
if filepath.endswith(".csv"):
return self._process_csv(filepath)
elif filepath.endswith(".json"):
return self._process_json(filepath)
elif filepath.endswith(".xml"):
return self._process_xml(filepath)
elif filepath.endswith(".parquet"):
return self._process_parquet(filepath)
# Adding YAML? Edit this file.
# Adding TOML? Edit this file again.
# Adding Excel? Edit this file yet again.
else:
raise ValueError(f"Unsupported format: {filepath}")

Every new format requires modifying FileProcessor. A third-party developer who wants to add Avro support must fork your repository. This violates the Open/Closed Principle: the class is not open for extension without modification.

A plugin system solves this. New formats are registered externally - in separate packages, separate files, or even separate teams - without touching FileProcessor.

What You Will Learn

  • How entry_points in pyproject.toml enable cross-package plugin discovery
  • Using importlib.metadata to load plugins at runtime
  • The stevedore library for production-grade plugin management
  • __init_subclass__ for automatic registration of subclasses
  • Building a complete extensible CLI tool with plugins
  • Plugin lifecycle management: load, initialize, run, cleanup
  • Ordering plugins by dependencies using topological sort

Prerequisites

  • Solid understanding of Python classes, inheritance, and metaclasses
  • Familiarity with pyproject.toml and Python packaging
  • Experience with abstract base classes and typing.Protocol
  • Understanding of dependency injection (previous lesson)

Part 1 - The Plugin Architecture Pattern

A plugin system has three components: the host application that defines extension points, the plugin interface that specifies the contract, and the plugins themselves that implement the contract.

The Plugin Interface

# core/plugin_interface.py
from abc import ABC, abstractmethod
from typing import Any


class FileFormatPlugin(ABC):
"""Contract that all file format plugins must satisfy."""

@property
@abstractmethod
def name(self) -> str:
"""Human-readable name of the format."""
...

@property
@abstractmethod
def extensions(self) -> list[str]:
"""File extensions this plugin handles (e.g., ['.csv', '.tsv'])."""
...

@abstractmethod
def read(self, filepath: str) -> list[dict[str, Any]]:
"""Read a file and return a list of records."""
...

@abstractmethod
def write(self, filepath: str, records: list[dict[str, Any]]) -> None:
"""Write records to a file."""
...

def validate(self, filepath: str) -> bool:
"""Optional: validate file before reading. Default returns True."""
return True

Part 2 - __init_subclass__ for Automatic Registration

The simplest plugin discovery mechanism in Python: when a class inherits from your base class, __init_subclass__ fires automatically.

# core/registry.py
from typing import Any


class PluginRegistry:
"""Base class that auto-registers all subclasses."""

_plugins: dict[str, type] = {}

def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
# Only register concrete implementations (those with 'name')
if hasattr(cls, "name") and isinstance(cls.name, property):
# Cannot read property on uninstantiated class, skip abstract
pass
elif hasattr(cls, "name"):
PluginRegistry._plugins[cls.name] = cls

@classmethod
def get_plugin(cls, name: str) -> type:
if name not in cls._plugins:
raise KeyError(
f"No plugin named '{name}'. Available: {list(cls._plugins.keys())}"
)
return cls._plugins[name]

@classmethod
def all_plugins(cls) -> dict[str, type]:
return dict(cls._plugins)

@classmethod
def clear(cls) -> None:
"""Reset registry (useful for testing)."""
cls._plugins.clear()

Using the Registry

# plugins/csv_plugin.py
import csv
from core.registry import PluginRegistry


class CsvPlugin(PluginRegistry):
name = "csv"
extensions = [".csv", ".tsv"]

def read(self, filepath: str) -> list[dict]:
with open(filepath, newline="") as f:
delimiter = "\t" if filepath.endswith(".tsv") else ","
reader = csv.DictReader(f, delimiter=delimiter)
return list(reader)

def write(self, filepath: str, records: list[dict]) -> None:
if not records:
return
with open(filepath, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=records[0].keys())
writer.writeheader()
writer.writerows(records)


# plugins/json_plugin.py
import json
from core.registry import PluginRegistry


class JsonPlugin(PluginRegistry):
name = "json"
extensions = [".json"]

def read(self, filepath: str) -> list[dict]:
with open(filepath) as f:
data = json.load(f)
return data if isinstance(data, list) else [data]

def write(self, filepath: str, records: list[dict]) -> None:
with open(filepath, "w") as f:
json.dump(records, f, indent=2)
# The host application - no if/elif chain
from core.registry import PluginRegistry
# Import plugins so __init_subclass__ fires
import plugins.csv_plugin
import plugins.json_plugin

# Discover all registered plugins
print(PluginRegistry.all_plugins())
# {'csv': <class 'CsvPlugin'>, 'json': <class 'JsonPlugin'>}

# Use a plugin
plugin_cls = PluginRegistry.get_plugin("csv")
plugin = plugin_cls()
records = plugin.read("data.csv")

:::tip When init_subclass Works Well This pattern works when all plugins live in the same codebase or are imported explicitly. For cross-package plugin discovery (third-party plugins installed via pip), you need entry_points. :::

Part 3 - Entry Points and importlib.metadata

Python's packaging system includes a plugin discovery mechanism called entry points. Any installed package can declare entry points in its pyproject.toml, and any application can discover them at runtime.

Declaring Entry Points (Plugin Side)

# In the plugin package's pyproject.toml
[project]
name = "my-csv-plugin"
version = "1.0.0"

[project.entry-points."fileprocessor.formats"]
csv = "my_csv_plugin:CsvPlugin"
tsv = "my_csv_plugin:TsvPlugin"

The key "fileprocessor.formats" is the group name - a namespace that the host application uses to find plugins. The values are module:attribute references to plugin classes.

Discovering Entry Points (Host Side)

# core/discovery.py
from importlib.metadata import entry_points
from typing import Any


def discover_plugins(group: str) -> dict[str, Any]:
"""
Discover all plugins registered under a given entry point group.

Returns a dict mapping plugin names to loaded plugin classes/objects.
"""
discovered = {}
eps = entry_points(group=group)

for ep in eps:
try:
plugin_cls = ep.load() # imports the module and gets the attribute
discovered[ep.name] = plugin_cls
except Exception as e:
# Log but do not crash - one bad plugin should not break the app
import logging
logging.getLogger(__name__).warning(
f"Failed to load plugin '{ep.name}': {e}"
)

return discovered
# Using discovered plugins
plugins = discover_plugins("fileprocessor.formats")
# {'csv': <class 'CsvPlugin'>, 'tsv': <class 'TsvPlugin'>}

csv_plugin = plugins["csv"]()
records = csv_plugin.read("data.csv")

Full Plugin Manager

# core/plugin_manager.py
from importlib.metadata import entry_points
from typing import Any, Optional
import logging

from core.plugin_interface import FileFormatPlugin

logger = logging.getLogger(__name__)


class PluginManager:
"""Manages plugin discovery, loading, and access."""

def __init__(self, group: str) -> None:
self._group = group
self._plugins: dict[str, FileFormatPlugin] = {}
self._extension_map: dict[str, FileFormatPlugin] = {}

def discover_and_load(self) -> None:
"""Discover all plugins via entry points and instantiate them."""
eps = entry_points(group=self._group)

for ep in eps:
try:
plugin_cls = ep.load()
if not issubclass(plugin_cls, FileFormatPlugin):
logger.warning(
f"Plugin '{ep.name}' does not implement FileFormatPlugin"
)
continue

plugin = plugin_cls()
self._plugins[ep.name] = plugin

for ext in plugin.extensions:
if ext in self._extension_map:
logger.warning(
f"Extension '{ext}' already handled by "
f"'{self._extension_map[ext].name}', "
f"overriding with '{ep.name}'"
)
self._extension_map[ext] = plugin

logger.info(f"Loaded plugin: {ep.name} (extensions: {plugin.extensions})")

except Exception as e:
logger.error(f"Failed to load plugin '{ep.name}': {e}")

def get_by_name(self, name: str) -> Optional[FileFormatPlugin]:
return self._plugins.get(name)

def get_by_extension(self, ext: str) -> Optional[FileFormatPlugin]:
return self._extension_map.get(ext)

def list_plugins(self) -> list[str]:
return list(self._plugins.keys())

def process_file(self, filepath: str) -> list[dict[str, Any]]:
"""Automatically select the right plugin based on file extension."""
import os
_, ext = os.path.splitext(filepath)

plugin = self.get_by_extension(ext)
if plugin is None:
raise ValueError(
f"No plugin handles extension '{ext}'. "
f"Available: {list(self._extension_map.keys())}"
)

if not plugin.validate(filepath):
raise ValueError(f"File validation failed for {filepath}")

return plugin.read(filepath)
# main.py
manager = PluginManager("fileprocessor.formats")
manager.discover_and_load()
# Loaded plugin: csv (extensions: ['.csv', '.tsv'])
# Loaded plugin: json (extensions: ['.json'])

records = manager.process_file("data.csv") # auto-selects CsvPlugin

:::note Entry Points Work Across Packages The power of entry points is that a completely separate pip-installable package can register a plugin. pip install my-avro-plugin - and the next time your app starts, the Avro plugin is automatically discovered. No configuration changes needed. :::

Part 4 - The Stevedore Library

OpenStack's stevedore library wraps importlib.metadata with a richer API for plugin management.

pip install stevedore

Driver Pattern (One Plugin Selected)

# When you need exactly one plugin for a given name
from stevedore import driver


def load_format_driver(format_name: str, filepath: str) -> list[dict]:
mgr = driver.DriverManager(
namespace="fileprocessor.formats",
name=format_name,
invoke_on_load=False,
)
plugin = mgr.driver()
return plugin.read(filepath)


# Usage
records = load_format_driver("csv", "data.csv")

Extension Pattern (All Plugins Loaded)

# When you want to invoke all plugins (e.g., validation hooks)
from stevedore import ExtensionManager


def validate_with_all_plugins(data: dict) -> list[str]:
"""Run all registered validators and collect errors."""
errors = []

mgr = ExtensionManager(
namespace="myapp.validators",
invoke_on_load=False,
)

def _validate(ext, data):
plugin = ext.obj or ext.plugin()
result = plugin.validate(data)
if not result.is_valid:
errors.extend(result.errors)

mgr.map(_validate, data)
return errors

Named Extensions Pattern

# When you want specific plugins by name
from stevedore import NamedExtensionManager


def load_specific_plugins(names: list[str]):
mgr = NamedExtensionManager(
namespace="fileprocessor.formats",
names=names,
invoke_on_load=True,
)
return {ext.name: ext.obj for ext in mgr}


# Load only CSV and JSON, skip others
plugins = load_specific_plugins(["csv", "json"])
Stevedore ManagerBehaviorUse Case
DriverManagerLoads exactly one plugin by nameStrategy pattern (select one format)
ExtensionManagerLoads all plugins in a namespaceHooks, validators, event listeners
NamedExtensionManagerLoads specific named pluginsFeature flags, user-selected plugins
HookManagerLoads all, calls eachPipeline processing
EnabledExtensionManagerLoads all, filters by predicateConditional activation

Part 5 - Building an Extensible CLI Tool

Let us build a complete example: a data pipeline CLI that supports pluggable transformations.

The Plugin Interface

# pipeline/transform_interface.py
from abc import ABC, abstractmethod
from typing import Any


class TransformPlugin(ABC):
"""Interface for data transformation plugins."""

@property
@abstractmethod
def name(self) -> str:
"""Unique identifier for this transform."""
...

@property
@abstractmethod
def description(self) -> str:
"""Human-readable description."""
...

@abstractmethod
def transform(self, records: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Transform a list of records and return the result."""
...

@property
def dependencies(self) -> list[str]:
"""Names of transforms that must run before this one. Default: none."""
return []

Built-In Plugins

# pipeline/transforms/filter_nulls.py
from pipeline.transform_interface import TransformPlugin


class FilterNullsTransform(TransformPlugin):
name = "filter_nulls"
description = "Remove records where any field is None or empty string"

def transform(self, records: list[dict]) -> list[dict]:
return [
r for r in records
if all(v is not None and v != "" for v in r.values())
]


# pipeline/transforms/normalize_emails.py
class NormalizeEmailsTransform(TransformPlugin):
name = "normalize_emails"
description = "Lowercase and strip whitespace from email fields"

def transform(self, records: list[dict]) -> list[dict]:
result = []
for record in records:
new_record = dict(record)
for key, value in new_record.items():
if "email" in key.lower() and isinstance(value, str):
new_record[key] = value.strip().lower()
result.append(new_record)
return result


# pipeline/transforms/deduplicate.py
class DeduplicateTransform(TransformPlugin):
name = "deduplicate"
description = "Remove duplicate records based on all fields"
dependencies = ["filter_nulls"] # run after nulls are removed

def transform(self, records: list[dict]) -> list[dict]:
seen = set()
unique = []
for record in records:
key = tuple(sorted(record.items()))
if key not in seen:
seen.add(key)
unique.append(record)
return unique

Entry Point Registration

# pyproject.toml
[project.entry-points."pipeline.transforms"]
filter_nulls = "pipeline.transforms.filter_nulls:FilterNullsTransform"
normalize_emails = "pipeline.transforms.normalize_emails:NormalizeEmailsTransform"
deduplicate = "pipeline.transforms.deduplicate:DeduplicateTransform"

The Pipeline Runner with Dependency Ordering

# pipeline/runner.py
from importlib.metadata import entry_points
from typing import Any
import logging

from pipeline.transform_interface import TransformPlugin

logger = logging.getLogger(__name__)


class Pipeline:
def __init__(self) -> None:
self._transforms: dict[str, TransformPlugin] = {}

def discover(self) -> None:
"""Load all transform plugins from entry points."""
eps = entry_points(group="pipeline.transforms")
for ep in eps:
try:
cls = ep.load()
plugin = cls()
self._transforms[plugin.name] = plugin
logger.info(f"Loaded transform: {plugin.name}")
except Exception as e:
logger.error(f"Failed to load transform '{ep.name}': {e}")

def register(self, plugin: TransformPlugin) -> None:
"""Manually register a plugin (useful for testing)."""
self._transforms[plugin.name] = plugin

def _topological_sort(self, names: list[str]) -> list[str]:
"""Sort transforms respecting dependency ordering."""
visited: set[str] = set()
order: list[str] = []
visiting: set[str] = set()

def visit(name: str) -> None:
if name in visited:
return
if name in visiting:
raise ValueError(f"Circular dependency detected involving '{name}'")
if name not in self._transforms:
raise ValueError(f"Unknown transform: '{name}'")

visiting.add(name)

for dep in self._transforms[name].dependencies:
if dep in names or dep in self._transforms:
visit(dep)

visiting.remove(name)
visited.add(name)
order.append(name)

for name in names:
visit(name)

return order

def run(
self,
records: list[dict[str, Any]],
transform_names: list[str] | None = None,
) -> list[dict[str, Any]]:
"""Run the pipeline, optionally with only specific transforms."""
if transform_names is None:
transform_names = list(self._transforms.keys())

ordered = self._topological_sort(transform_names)
logger.info(f"Running transforms in order: {ordered}")

result = records
for name in ordered:
transform = self._transforms[name]
before_count = len(result)
result = transform.transform(result)
after_count = len(result)
logger.info(
f" {name}: {before_count} -> {after_count} records"
)

return result

The CLI

# pipeline/cli.py
import click
import json
from pipeline.runner import Pipeline


@click.group()
def cli():
"""Data pipeline with pluggable transforms."""
pass


@cli.command()
def list_transforms():
"""List all available transform plugins."""
pipeline = Pipeline()
pipeline.discover()
for name, plugin in sorted(pipeline._transforms.items()):
deps = f" (after: {', '.join(plugin.dependencies)})" if plugin.dependencies else ""
click.echo(f" {name}: {plugin.description}{deps}")


@cli.command()
@click.argument("input_file")
@click.argument("output_file")
@click.option("--transforms", "-t", multiple=True, help="Specific transforms to run")
def run(input_file: str, output_file: str, transforms: tuple[str]):
"""Run the pipeline on a file."""
pipeline = Pipeline()
pipeline.discover()

with open(input_file) as f:
records = json.load(f)

transform_names = list(transforms) if transforms else None
result = pipeline.run(records, transform_names)

with open(output_file, "w") as f:
json.dump(result, f, indent=2)

click.echo(f"Processed {len(records)} -> {len(result)} records")
# Usage
$ python -m pipeline list-transforms
deduplicate: Remove duplicate records based on all fields (after: filter_nulls)
filter_nulls: Remove records where any field is None or empty string
normalize_emails: Lowercase and strip whitespace from email fields

$ python -m pipeline run input.json output.json -t filter_nulls -t deduplicate
# Running transforms in order: ['filter_nulls', 'deduplicate']
# filter_nulls: 1000 -> 873 records
# deduplicate: 873 -> 841 records
# Processed 1000 -> 841 records

Part 6 - Plugin Lifecycle Management

Production plugins often need initialization (connect to services) and cleanup (close connections, flush buffers).

# core/lifecycle.py
from abc import ABC, abstractmethod
from typing import Any


class ManagedPlugin(ABC):
"""Plugin with lifecycle hooks."""

@abstractmethod
def name(self) -> str: ...

def on_load(self) -> None:
"""Called when the plugin is first loaded. Setup resources here."""
pass

def on_init(self, config: dict[str, Any]) -> None:
"""Called with plugin-specific configuration."""
pass

@abstractmethod
def execute(self, *args, **kwargs) -> Any:
"""The plugin's main operation."""
...

def on_cleanup(self) -> None:
"""Called during shutdown. Release resources here."""
pass

def health_check(self) -> bool:
"""Called periodically to verify plugin is healthy."""
return True
# core/managed_runner.py
import logging
from typing import Any

logger = logging.getLogger(__name__)


class ManagedPluginRunner:
"""Manages the full lifecycle of plugins."""

def __init__(self) -> None:
self._plugins: list[ManagedPlugin] = []
self._initialized: set[str] = set()

def load(self, plugin: ManagedPlugin) -> None:
"""Load a plugin and call on_load."""
try:
plugin.on_load()
self._plugins.append(plugin)
logger.info(f"Plugin loaded: {plugin.name()}")
except Exception as e:
logger.error(f"Plugin {plugin.name()} failed to load: {e}")
raise

def initialize_all(self, configs: dict[str, dict[str, Any]]) -> None:
"""Initialize all plugins with their configurations."""
for plugin in self._plugins:
config = configs.get(plugin.name(), {})
try:
plugin.on_init(config)
self._initialized.add(plugin.name())
logger.info(f"Plugin initialized: {plugin.name()}")
except Exception as e:
logger.error(f"Plugin {plugin.name()} failed to initialize: {e}")

def execute_all(self, *args, **kwargs) -> list[Any]:
"""Execute all initialized plugins."""
results = []
for plugin in self._plugins:
if plugin.name() not in self._initialized:
logger.warning(f"Skipping uninitialized plugin: {plugin.name()}")
continue
try:
result = plugin.execute(*args, **kwargs)
results.append(result)
except Exception as e:
logger.error(f"Plugin {plugin.name()} execution failed: {e}")
return results

def cleanup_all(self) -> None:
"""Cleanup all plugins in reverse order."""
for plugin in reversed(self._plugins):
try:
plugin.on_cleanup()
logger.info(f"Plugin cleaned up: {plugin.name()}")
except Exception as e:
logger.error(f"Plugin {plugin.name()} cleanup failed: {e}")
self._plugins.clear()
self._initialized.clear()

def __enter__(self):
return self

def __exit__(self, *args):
self.cleanup_all()

Example: Database Export Plugin with Lifecycle

class DatabaseExportPlugin(ManagedPlugin):
def name(self) -> str:
return "db_export"

def on_load(self) -> None:
self._engine = None
self._session = None

def on_init(self, config: dict) -> None:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

self._engine = create_engine(config["database_url"])
Session = sessionmaker(bind=self._engine)
self._session = Session()

def execute(self, records: list[dict]) -> int:
# Bulk insert records
for record in records:
self._session.execute(
text("INSERT INTO exports (data) VALUES (:data)"),
{"data": json.dumps(record)},
)
self._session.commit()
return len(records)

def on_cleanup(self) -> None:
if self._session:
self._session.close()
if self._engine:
self._engine.dispose()

def health_check(self) -> bool:
try:
self._session.execute(text("SELECT 1"))
return True
except Exception:
return False

:::danger Always Handle Plugin Failures Gracefully A crashing plugin should never bring down the host application. Catch exceptions at every lifecycle boundary. Log the error, disable the plugin, and continue operating. :::

Part 7 - Dependency Ordering Between Plugins

When plugins depend on each other, you need topological sorting to determine the correct execution order.

# core/topo_sort.py
from typing import TypeVar

T = TypeVar("T")


def topological_sort(
items: dict[str, list[str]],
) -> list[str]:
"""
Sort items respecting dependencies.

Args:
items: mapping of item name -> list of dependency names

Returns:
Ordered list of item names (dependencies first)

Raises:
ValueError: if circular dependencies are detected
"""
in_degree: dict[str, int] = {name: 0 for name in items}
dependents: dict[str, list[str]] = {name: [] for name in items}

for name, deps in items.items():
for dep in deps:
if dep not in items:
raise ValueError(f"'{name}' depends on unknown item '{dep}'")
in_degree[name] += 1
dependents[dep].append(name)

# Start with items that have no dependencies
queue = [name for name, deg in in_degree.items() if deg == 0]
result: list[str] = []

while queue:
# Sort for deterministic ordering among equal-priority items
queue.sort()
current = queue.pop(0)
result.append(current)

for dependent in dependents[current]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)

if len(result) != len(items):
remaining = set(items.keys()) - set(result)
raise ValueError(f"Circular dependency detected among: {remaining}")

return result
# Usage
plugins = {
"csv_reader": [], # no dependencies
"filter_nulls": ["csv_reader"],
"normalize": ["filter_nulls"],
"deduplicate": ["normalize"],
"export": ["deduplicate"],
}

order = topological_sort(plugins)
# ['csv_reader', 'filter_nulls', 'normalize', 'deduplicate', 'export']

Part 8 - Testing Plugin Systems

Testing Individual Plugins

# tests/test_transforms.py
import pytest
from pipeline.transforms.filter_nulls import FilterNullsTransform
from pipeline.transforms.deduplicate import DeduplicateTransform


def test_filter_nulls_removes_none_values():
plugin = FilterNullsTransform()
records = [
{"name": "Alice", "email": "[email protected]"},
{"name": None, "email": "[email protected]"},
{"name": "Charlie", "email": ""},
]
result = plugin.transform(records)
assert len(result) == 1
assert result[0]["name"] == "Alice"


def test_deduplicate_removes_exact_duplicates():
plugin = DeduplicateTransform()
records = [
{"name": "Alice", "email": "[email protected]"},
{"name": "Alice", "email": "[email protected]"}, # duplicate
{"name": "Bob", "email": "[email protected]"},
]
result = plugin.transform(records)
assert len(result) == 2

Testing Plugin Discovery

# tests/test_pipeline.py
from pipeline.runner import Pipeline
from pipeline.transforms.filter_nulls import FilterNullsTransform
from pipeline.transforms.deduplicate import DeduplicateTransform


def test_pipeline_respects_dependency_order():
pipeline = Pipeline()
# Register in wrong order - pipeline should sort them
pipeline.register(DeduplicateTransform()) # depends on filter_nulls
pipeline.register(FilterNullsTransform()) # no dependencies

records = [
{"name": "Alice"},
{"name": None},
{"name": "Alice"}, # duplicate
]
result = pipeline.run(records)
assert len(result) == 1 # null filtered, then deduplicated


def test_pipeline_detects_circular_dependency():
class PluginA(TransformPlugin):
name = "a"
description = "A"
dependencies = ["b"]
def transform(self, records): return records

class PluginB(TransformPlugin):
name = "b"
description = "B"
dependencies = ["a"]
def transform(self, records): return records

pipeline = Pipeline()
pipeline.register(PluginA())
pipeline.register(PluginB())

with pytest.raises(ValueError, match="Circular"):
pipeline.run([{"x": 1}])

Testing with a Fake Plugin

class CountingPlugin(TransformPlugin):
"""Test plugin that counts how many times it was called."""
name = "counter"
description = "Counts invocations"

def __init__(self):
self.call_count = 0

def transform(self, records: list[dict]) -> list[dict]:
self.call_count += 1
return records


def test_pipeline_calls_each_plugin_once():
counter = CountingPlugin()
pipeline = Pipeline()
pipeline.register(counter)
pipeline.run([{"x": 1}])
assert counter.call_count == 1

Key Takeaways

  • __init_subclass__ provides zero-config registration for plugins that live in the same codebase. Subclass a base class and the plugin is automatically discovered.
  • Entry points enable cross-package plugins: third-party packages register plugins in their pyproject.toml, and the host application discovers them via importlib.metadata.entry_points().
  • Stevedore simplifies production plugin management with patterns like Driver (one plugin), Extension (all plugins), and Named (specific plugins).
  • Plugin interfaces should be minimal and stable: adding a method to a plugin interface breaks all existing plugins. Prefer optional methods with default implementations.
  • Lifecycle management (load, init, execute, cleanup) is essential for plugins that hold resources like database connections or file handles.
  • Topological sorting resolves plugin dependencies: when plugins depend on each other, sort them before execution and detect circular dependencies early.
  • Always handle plugin failures gracefully: catch exceptions at every boundary, log errors, and continue operating without the failed plugin.

Graded Practice Challenges

Level 1 - Identify the Pattern

Question 1: What happens when a new class inherits from PluginRegistry but does not define a name attribute?

Answer

The __init_subclass__ hook fires, but the if hasattr(cls, "name") check fails (or the check for property prevents registration), so the class is not added to _plugins. This is by design - abstract intermediate classes should not be registered as plugins.

Question 2: A third-party developer installs their Avro plugin package but the host application does not find it. What is the most likely cause?

Answer

The most likely cause is that the entry point group name in the plugin's pyproject.toml does not match the group name the host application is scanning. For example, the plugin declares [project.entry-points."file_processor.formats"] (underscore) but the host calls entry_points(group="fileprocessor.formats") (no underscore). Other causes: the package is installed in a different virtual environment, or the package was not installed in editable mode during development.

Question 3: Why does the DeduplicateTransform declare dependencies = ["filter_nulls"]?

Answer

Deduplication compares records by their field values. If records contain None values, two records like {"name": None, "email": "[email protected]"} and {"name": None, "email": "[email protected]"} should be treated as duplicates, but inconsistent null handling could cause issues. By running filter_nulls first, the deduplication operates on clean data. The dependency declaration ensures the pipeline runner executes transforms in the correct order via topological sort.

Level 2 - Refactoring Challenge

Take this hardcoded notification system and refactor it into a plugin architecture:

class NotificationSender:
def send(self, user: User, message: str):
# Email
smtp = smtplib.SMTP("smtp.company.com")
smtp.sendmail("[email protected]", user.email, message)
# SMS
twilio.send(user.phone, message)
# Slack
requests.post(SLACK_WEBHOOK, json={"text": f"{user.name}: {message}"})
# Push notification
firebase.send(user.device_token, message)

Design a NotificationPlugin interface, implement each channel as a plugin, use entry points for registration, and allow enabling/disabling channels per user preference.

Level 3 - Design Challenge

Design a plugin system for a web scraping framework that supports:

  • Pluggable page parsers (HTML, JSON API, PDF)
  • Pluggable storage backends (filesystem, S3, database)
  • Pluggable rate limiters (fixed window, sliding window, token bucket)
  • Middleware plugins (logging, caching, retry, proxy rotation)

Define the plugin interfaces, the entry point groups, the lifecycle hooks, and the dependency ordering. How do you handle plugin configuration (each plugin needs different settings)?

What's Next

In the next lesson, Configuration Management - Environment-Driven Apps, we will explore how to externalize application configuration, validate it at startup, manage secrets, and follow the 12-factor config principle - which is essential for any pluggable, deployable system.

© 2026 EngineersOfAI. All rights reserved.