:::tip 🎮 Interactive Playground Visualize this concept: Try the Pipeline Orchestration demo on the EngineersOfAI Playground - no code required. :::
Prefect and Modern Orchestration
200 DAGs, One Full Day, Zero Value Delivered
A team had been running Apache Airflow for three years. The system worked. Nobody was questioning whether it should be replaced. But when a new data engineer joined and was asked to write their first pipeline, something interesting happened.
The engineer spent 45 minutes reading the Airflow documentation. Then she created a Python file in the dags/ directory, defined a DAG object, instantiated Operators, wired them with >> operators, pushed the file to the repository, waited for the CI pipeline to deploy it, waited for the Airflow Scheduler to pick up the new file (up to five minutes), refreshed the UI, and discovered the DAG was not showing up. She had placed the DAG inside an if __name__ == "__main__": block, which the Scheduler skips. She fixed it, deployed again, waited again. The DAG appeared. She ran it. A task failed with a serialization error she had not seen before - one of her return values was a custom object that Airflow could not serialize into XCom.
She had spent the entire day and had not yet written any of the actual business logic.
Meanwhile, on the same team, another engineer was evaluating Prefect. He opened a Python file, wrote a function, added @flow decorator, added @task to each step, and ran python pipeline.py from the terminal. In three minutes he had a working local execution with logs, timing, retry configuration, and a visual run report in the browser. He deployed it to their test environment with a single CLI command.
The contrast made the team pay attention. This lesson is about what Prefect does differently, how its architecture works, and when you should choose it.
What Prefect Is
Prefect is a Python-native workflow orchestration framework created by Jeremiah Lowin in 2018 and open-sourced in 2019. The founding thesis was that workflows should be plain Python - not a separate DSL, not XML configuration, not a framework that demands you restructure your code to fit its patterns. Any Python function can become a Prefect task with one decorator. Any function that calls tasks becomes a flow.
This is meaningfully different from Airflow's model. In Airflow, you define a DAG object, instantiate Operator objects with configuration dictionaries, and wire them with >> operators. The dependency graph is static and must be fully defined at parse time. The "code" is really configuration - it describes a pipeline structure rather than implementing logic directly.
In Prefect, you write normal Python. Functions call other functions. The framework observes the execution, records state, handles retries, and provides observability - but it does not require you to restructure your code to accommodate its abstractions.
# Airflow pattern - configuration-heavy, framework-centric
from airflow.operators.python import PythonOperator
from airflow import DAG
with DAG("my_pipeline", ...) as dag:
extract = PythonOperator(task_id="extract", python_callable=extract_fn)
transform = PythonOperator(task_id="transform", python_callable=transform_fn)
load = PythonOperator(task_id="load", python_callable=load_fn)
extract >> transform >> load
# Prefect pattern - code-centric, framework is lightweight
from prefect import flow, task
@task
def extract() -> dict: ...
@task
def transform(data: dict) -> dict: ...
@task
def load(data: dict) -> None: ...
@flow
def my_pipeline():
raw = extract()
processed = transform(raw)
load(processed)
The Prefect version can be run with python pipeline.py or my_pipeline() - no infrastructure required for local testing. The Airflow version cannot run without an Airflow installation.
Historical Context
The broader context for Prefect's creation was the "second-generation orchestrator" movement of the late 2010s. Airflow, Oozie, and Luigi had proven the value of workflow orchestration, but they shared a common design: configuration-heavy, infrastructure-dependent, difficult to test locally, and tightly coupled between the "define the pipeline" and "run the pipeline" concerns.
Prefect 1.0 (2019) introduced the flow/task model. Prefect 2.0 (2022, codenamed "Orion") was a near-complete rewrite - the architecture shifted from a monolithic server model to a decoupled control plane + execution model. Prefect 2.0 introduced Work Pools, the Deployment system, and the prefect deploy CLI that made going from local code to scheduled production deployment a one-command operation.
In 2023, Prefect 3.0 refined the model further, improving performance, strengthening the artifact system, and deepening dbt and cloud integrations.
Core Concepts
Flows
A flow is a Python function decorated with @flow. It is the top-level orchestration unit - equivalent to an Airflow DAG. A flow can contain tasks, sub-flows, and arbitrary Python.
from prefect import flow
from prefect.logging import get_run_logger
from datetime import timedelta
@flow(
name="churn-feature-pipeline",
description="Compute churn prediction features from raw user events",
retries=1,
retry_delay_seconds=300,
timeout_seconds=3600, # fail if the flow takes more than 1 hour
)
def churn_feature_pipeline(train_date: str, feature_bucket: str = "s3://data/features"):
logger = get_run_logger()
logger.info(f"Starting feature pipeline for {train_date}")
# tasks are called like normal Python functions
raw = extract_events(train_date)
validated = validate_raw_data(raw)
features = compute_features(validated, train_date)
upload_features(features, feature_bucket, train_date)
logger.info("Feature pipeline complete")
Tasks
A task is a Python function decorated with @task. It is the individual unit of work - equivalent to an Airflow Operator instance. Tasks get retry logic, caching, timeout handling, and are individually tracked in the Prefect UI.
from prefect import task
from prefect.artifacts import create_table_artifact
@task(
name="extract-user-events",
retries=3,
retry_delay_seconds=60,
retry_jitter_factor=0.5, # add randomness to avoid retry thundering herd
timeout_seconds=600,
tags=["extraction", "s3"],
)
def extract_events(train_date: str) -> pd.DataFrame:
"""Pull raw user events from S3 for the given date."""
import pandas as pd
path = f"s3://data/events/raw/date={train_date}/events.parquet"
df = pd.read_parquet(path)
# Prefect artifacts - logged to the UI for observability
create_table_artifact(
key="raw-events-sample",
table=df.head(10).to_dict("records"),
description="Sample of extracted raw events",
)
return df
Task Caching
Prefect supports result caching - if a task has already run successfully with the same inputs, return the cached result rather than re-running. This is particularly useful for expensive transformations during development when you want to iterate on downstream tasks without re-running upstream ones.
from prefect import task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
cache_key_fn=task_input_hash, # hash all inputs to form the cache key
cache_expiration=timedelta(hours=24), # cache valid for 24 hours
)
def compute_features(raw: pd.DataFrame, train_date: str) -> pd.DataFrame:
"""Expensive feature computation - cached for 24 hours on same inputs."""
# ...heavy computation...
return features
Prefect Architecture
The Decoupled Architecture
Prefect separates the control plane (the API that knows about deployments, schedules, and run states) from the execution plane (the workers that actually run flow code). This is the fundamental architectural difference from Airflow.
In Airflow, the Scheduler runs inside your infrastructure and pushes tasks to workers. If the Scheduler crashes, execution stalls. The Scheduler must have access to both the metadata database and the DAG files.
In Prefect, the API (control plane) can be hosted separately from your execution environment. Workers poll the API for work rather than receiving pushed tasks. This means: the API can be Prefect Cloud (fully managed) while execution happens inside your VPC; the worker process is stateless and can crash and restart without losing run state; and local development uses the same worker model as production - there is no separate local execution path.
Work Pools - Where Execution Happens
A Work Pool defines the execution environment. Workers in a pool poll the Prefect API and run any flow runs dispatched to that pool.
| Pool Type | Where Flows Run | Best For |
|---|---|---|
Process | Local subprocess on the worker machine | Development, simple pipelines |
Docker | Docker containers on the worker machine | Dependency isolation |
Kubernetes | Kubernetes Jobs | Production, resource isolation |
ECS | AWS ECS Tasks | AWS-native production |
Vertex AI | Google Vertex AI | GCP, ML training with managed infra |
Modal | Modal serverless | Event-driven, fast cold-start |
# Create a Kubernetes work pool
prefect work-pool create --type kubernetes k8s-production
# Start a worker that pulls from that pool
prefect worker start --pool k8s-production
Workers are lightweight processes - they poll the API, receive a flow run assignment, and launch the actual execution according to the pool's infrastructure configuration. A worker does not run the flow itself (for Kubernetes pools) - it creates a Kubernetes Job and monitors it.
Deployments
A deployment is a registered version of a flow with a schedule, parameters, and an associated work pool. It answers the question: "when should this flow run, where should it run, and with what inputs?"
# prefect.yaml - deployment configuration
name: churn-feature-pipeline-prod
version: "1.2.0"
tags:
- ml
- churn
- production
deployments:
- name: weekly-churn-features
description: "Weekly churn feature computation - every Monday at 1am"
entrypoint: pipelines/churn_features.py:churn_feature_pipeline
schedule:
cron: "0 1 * * 1" # 1am every Monday
timezone: "UTC"
parameters:
feature_bucket: "s3://data/features"
work_pool:
name: k8s-production
job_variables:
image: "registry.company.com/ml-pipelines:v1.2.0"
cpu_request: "4"
memory_request: "16Gi"
pull:
- prefect.deployments.steps.git_clone:
repository: "https://github.com/company/ml-pipelines.git"
branch: "main"
# Register the deployment with the Prefect API
prefect deploy --name weekly-churn-features
# Trigger a one-off run with parameter overrides
prefect deployment run "churn-feature-pipeline/weekly-churn-features" \
--param train_date=2024-02-15 \
--param feature_bucket=s3://data/features/backfill
A Full ML Pipeline in Prefect
Here is a complete Prefect flow that mirrors the Airflow ML training pipeline from the previous lesson - data validation, feature engineering, training, evaluation, and conditional deployment:
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact
from prefect.logging import get_run_logger
from prefect.blocks.notifications import SlackWebhook
from datetime import timedelta
import pandas as pd
import mlflow
import json
# --------------------------------------------------------------------------
# Tasks
# --------------------------------------------------------------------------
@task(retries=3, retry_delay_seconds=60, name="validate-training-data")
def validate_training_data(train_date: str) -> dict:
"""Validate training data quality. Raises ValueError on failure."""
logger = get_run_logger()
path = f"s3://data/features/churn/date={train_date}/features.parquet"
df = pd.read_parquet(path)
checks = {
"row_count": len(df),
"max_null_rate": float(df.isnull().mean().max()),
"label_rate": float(df["churned"].mean()),
}
if checks["row_count"] < 10_000:
raise ValueError(f"Insufficient rows: {checks['row_count']} (need 10,000+)")
if checks["max_null_rate"] > 0.05:
raise ValueError(f"High null rate: {checks['max_null_rate']:.2%} (limit: 5%)")
logger.info(f"Data quality passed: {checks}")
return {**checks, "s3_path": path}
@task(name="run-feature-engineering")
def run_feature_engineering(metadata: dict, train_date: str) -> str:
"""Generate feature matrix for training."""
import numpy as np
df = pd.read_parquet(metadata["s3_path"])
df["days_since_last_login"] = (
pd.Timestamp(train_date) - pd.to_datetime(df["last_login_date"])
).dt.days
df["payment_failure_rate"] = (
df["payment_failures_30d"] / df["payment_attempts_30d"].clip(lower=1)
)
feature_cols = [
"days_since_last_login", "payment_failure_rate",
"avg_session_length_7d", "support_tickets_90d", "plan_tier",
]
output = df[feature_cols + ["churned"]].dropna(subset=feature_cols)
output_path = f"s3://data/features/training/churn/date={train_date}/train.parquet"
output.to_parquet(output_path, index=False)
return output_path
@task(
name="train-model",
retries=1,
retry_delay_seconds=600,
timeout_seconds=14400, # 4 hour timeout
)
def train_model(feature_path: str, train_date: str) -> dict:
"""Train churn model and log to MLflow. Returns run metadata."""
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
logger = get_run_logger()
df = pd.read_parquet(feature_path)
feature_cols = [c for c in df.columns if c != "churned"]
X_train, X_test, y_train, y_test = train_test_split(
df[feature_cols], df["churned"], test_size=0.2, stratify=df["churned"], random_state=42
)
mlflow.set_tracking_uri("https://mlflow.internal")
mlflow.set_experiment("churn_weekly_training")
with mlflow.start_run(run_name=f"churn_{train_date}") as run:
mlflow.log_params({"n_estimators": 300, "learning_rate": 0.05, "train_date": train_date})
model = GradientBoostingClassifier(n_estimators=300, learning_rate=0.05, random_state=42)
model.fit(X_train, y_train)
y_prob = model.predict_proba(X_test)[:, 1]
auc = roc_auc_score(y_test, y_prob)
mlflow.log_metric("val_auc", auc)
mlflow.sklearn.log_model(model, "model", registered_model_name="churn_prediction")
run_id = run.info.run_id
logger.info(f"Training complete - run_id: {run_id}, val_auc: {auc:.4f}")
return {"run_id": run_id, "val_auc": auc, "train_date": train_date}
@task(name="evaluate-champion-challenger")
def evaluate_champion_challenger(training_result: dict) -> dict:
"""Compare new model vs champion. Return evaluation with deploy decision."""
client = mlflow.tracking.MlflowClient(tracking_uri="https://mlflow.internal")
challenger_auc = training_result["val_auc"]
champions = client.get_latest_versions("churn_prediction", stages=["Production"])
if not champions:
return {**training_result, "champion_auc": 0.0, "improvement": challenger_auc, "deploy": True}
champion_run = client.get_run(champions[0].run_id)
champion_auc = float(champion_run.data.metrics.get("val_auc", 0.0))
improvement = challenger_auc - champion_auc
return {
**training_result,
"champion_auc": champion_auc,
"improvement": improvement,
"deploy": improvement >= 0.005,
}
@task(name="deploy-model")
def deploy_model(evaluation: dict) -> None:
"""Promote challenger to Production in MLflow registry."""
if not evaluation["deploy"]:
return
client = mlflow.tracking.MlflowClient(tracking_uri="https://mlflow.internal")
versions = client.get_latest_versions("churn_prediction", stages=["Staging"])
client.transition_model_version_stage(
"churn_prediction", versions[0].version, "Production", archive_existing_versions=True
)
get_run_logger().info(f"Deployed v{versions[0].version} - AUC {evaluation['val_auc']:.4f}")
@task(name="create-pipeline-report")
def create_pipeline_report(evaluation: dict) -> None:
"""Log a Markdown artifact summarising the pipeline run."""
action = "DEPLOYED" if evaluation["deploy"] else "SKIPPED"
create_markdown_artifact(
key="pipeline-summary",
markdown=f"""
## Churn Model Pipeline - {evaluation['train_date']}
| Metric | Value |
|--------|-------|
| Challenger AUC | `{evaluation['val_auc']:.4f}` |
| Champion AUC | `{evaluation['champion_auc']:.4f}` |
| Improvement | `{evaluation['improvement']:+.4f}` |
| Decision | **{action}** |
""",
description="Churn model training pipeline summary",
)
# --------------------------------------------------------------------------
# Flow
# --------------------------------------------------------------------------
@flow(
name="churn-model-training",
description="Weekly churn model retraining - data validation, training, champion/challenger deployment",
retries=0,
timeout_seconds=28800, # 8 hour overall timeout
)
def churn_model_training(train_date: str) -> None:
logger = get_run_logger()
logger.info(f"Starting churn model training for {train_date}")
# Each step is a task - Prefect tracks state, retries, timing
metadata = validate_training_data(train_date)
feature_path = run_feature_engineering(metadata, train_date)
training_result = train_model(feature_path, train_date)
evaluation = evaluate_champion_challenger(training_result)
deploy_model(evaluation)
create_pipeline_report(evaluation)
# Notification
slack = SlackWebhook.load("ml-alerts")
action = "DEPLOYED" if evaluation["deploy"] else "SKIPPED"
slack.notify(
f":robot_face: Churn Pipeline - {action}\n"
f"Challenger: {evaluation['val_auc']:.4f} | Champion: {evaluation['champion_auc']:.4f} | Δ {evaluation['improvement']:+.4f}"
)
if __name__ == "__main__":
from datetime import date
churn_model_training(train_date=str(date.today()))
Prefect Server vs. Prefect Cloud
| Feature | Prefect Server (self-hosted) | Prefect Cloud (managed) |
|---|---|---|
| Cost | Free | Free tier + paid plans |
| Operations | You manage the server | Prefect manages it |
| High availability | You configure it | Built-in |
| SSO / RBAC | Limited | Full enterprise features |
| Automation triggers | Yes | Yes + event webhooks |
| Data residency | On your infra | Prefect servers (EU option) |
For teams with strict data residency requirements or existing Kubernetes infrastructure, self-hosted Prefect Server is straightforward:
# Docker Compose self-hosted setup
prefect server start
# or
docker run -d \
-p 4200:4200 \
-e PREFECT_UI_ENABLED=true \
prefecthq/prefect:3-latest \
prefect server start --host 0.0.0.0
Automations - Event-Driven Flows
Prefect automations trigger flow runs based on events. This is particularly useful for data-driven scheduling: run the training pipeline when a new data file arrives, not on a fixed clock.
# Via Prefect Cloud UI or API - event-triggered automation
# Trigger the training pipeline when a new feature file is uploaded to S3
# prefect.yaml automation block (Prefect 3.x)
automations:
- name: trigger-training-on-new-features
description: "Run model training when new feature file arrives"
trigger:
type: event
expect:
- prefect.block.s3-bucket.new-object
match:
"prefect.resource.name": "s3://data/features/churn/date=*/features.parquet"
actions:
- type: run-deployment
deployment: "churn-model-training/weekly-churn-features"
parameters:
train_date: "{{ event.resource['s3.object.key'].split('date=')[1].split('/')[0] }}"
You can also trigger flows programmatically using the Prefect API client:
import asyncio
from prefect.client.orchestration import get_client
async def trigger_training_for_date(train_date: str):
async with get_client() as client:
deployment = await client.read_deployment_by_name(
"churn-model-training/weekly-churn-features"
)
flow_run = await client.create_flow_run_from_deployment(
deployment_id=deployment.id,
parameters={"train_date": train_date},
name=f"churn-training-{train_date}",
)
print(f"Triggered flow run: {flow_run.id}")
asyncio.run(trigger_training_for_date("2024-02-15"))
Prefect + dbt Integration
Running dbt models as Prefect tasks with result tracking and artifact logging:
from prefect import flow, task
from prefect_dbt.cli.commands import DbtCoreOperation
@task(name="run-dbt-models", retries=2)
def run_dbt_transformations(target: str = "prod") -> None:
"""Run all dbt models tagged as 'daily'."""
with DbtCoreOperation(
commands=["dbt run --select tag:daily --target {{ target }}"],
project_dir="/opt/dbt/project",
profiles_dir="/opt/dbt",
overrides={"target": target},
) as dbt_op:
dbt_process = dbt_op.trigger()
dbt_process.wait_for_completion()
result = dbt_process.fetch_result()
if result.return_code != 0:
raise RuntimeError(f"dbt run failed with return code {result.return_code}")
@task(name="run-dbt-tests")
def run_dbt_tests() -> None:
"""Run dbt data quality tests after transformation."""
with DbtCoreOperation(
commands=["dbt test --select tag:daily"],
project_dir="/opt/dbt/project",
) as dbt_op:
dbt_process = dbt_op.trigger()
dbt_process.wait_for_completion()
@flow(name="daily-dbt-pipeline")
def daily_dbt_pipeline():
transform = run_dbt_transformations()
run_dbt_tests(wait_for=[transform])
if __name__ == "__main__":
daily_dbt_pipeline()
Prefect vs. Airflow - Honest Comparison
| Dimension | Airflow | Prefect |
|---|---|---|
| Local development | Needs Airflow installation, no easy local run | python flow.py works immediately |
| DAG definition | Python configuration DSL (Operators, >>) | Plain Python functions with decorators |
| Scheduler overhead | Scheduler parses all DAG files every 30s | Flows are deployed as artifacts - no parse loop |
| Learning curve | Steep - many concepts before first working pipeline | Gentle - @flow + @task is the entire core API |
| Ecosystem maturity | 10+ years, 100s of providers, battle-tested | 5 years, growing provider ecosystem |
| Sensor ecosystem | Rich - S3, GCS, SFTP, ExternalTaskSensor, etc. | Less mature - often need to write custom polling |
| Data-aware scheduling | Datasets (Airflow 2.4+) | Automations (event-driven triggers) |
| Backfill | Native airflow dags backfill command | Manual - trigger runs programmatically |
| Self-hosting complexity | High (Scheduler + Webserver + Workers + DB + Broker) | Low (single server binary or Docker Compose) |
| Enterprise features | Astronomer (managed Airflow) is mature | Prefect Cloud has strong enterprise tier |
When to choose Airflow:
- Mature ecosystem with specific provider integrations (Salesforce, SAP, legacy systems)
- Large existing Airflow investment - migration cost outweighs developer experience gains
- Complex sensor-based workflows with many external dependencies
- Team has deep Airflow expertise
When to choose Prefect:
- New projects, greenfield data platforms
- Data science and ML teams who want to write pipelines without learning framework-specific APIs
- Pipelines that need to be runnable locally without infrastructure
- Teams who prioritize fast iteration over maximum ecosystem coverage
:::danger The "local feels great but production is different" trap
The biggest risk with Prefect is that the zero-friction local experience can mask production complexity. A flow that runs perfectly with python flow.py may have subtle issues when deployed with a Kubernetes work pool: the container image is stale, environment variables are not injected, result storage is not configured, or the flow's S3 credentials come from a local ~/.aws/ profile that does not exist in the container.
Always test the full deployment path - prefect deploy + prefect worker start + trigger a run - in a staging environment before going to production. The local run is a development convenience, not a production test.
:::
Error Handling and Observability
Prefect provides multiple observability primitives:
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact, create_link_artifact
from prefect.logging import get_run_logger
@task(name="feature-extraction")
def extract_features(date: str) -> pd.DataFrame:
logger = get_run_logger()
try:
df = pd.read_parquet(f"s3://data/features/{date}/features.parquet")
logger.info(f"Loaded {len(df)} rows")
# Prefect artifacts - visible in UI, attached to this task run
create_markdown_artifact(
key="data-quality-report",
markdown=f"| Metric | Value |\n|--------|-------|\n| Rows | {len(df)} |\n| Columns | {len(df.columns)} |",
description="Feature extraction data quality summary",
)
create_link_artifact(
key="source-data-location",
link=f"https://s3.console.aws.amazon.com/s3/buckets/data?prefix=features/{date}/",
description="S3 location of source feature data",
)
return df
except FileNotFoundError as e:
logger.error(f"Feature file missing for {date}: {e}")
raise # re-raise - Prefect will mark task as failed and retry per task config
Prefect also supports result persistence - storing task return values in a results backend (S3, GCS, local filesystem) so they can be referenced by downstream tasks even across flow runs:
from prefect.results import S3ResultStorage
@task(
result_storage=S3ResultStorage(bucket="prefect-results"),
result_serializer="pickle",
persist_result=True,
)
def expensive_computation(date: str) -> pd.DataFrame:
# Result is cached in S3 - downstream tasks can reference it
# Even if the flow is re-run, this task uses the cached result if cache_key matches
...
Interview Q&A
Q: What is the core architectural difference between Prefect and Airflow?
The fundamental difference is where the control plane lives and how work is dispatched. Airflow co-locates the Scheduler (control plane) with the execution infrastructure - the Scheduler pushes tasks to workers via a message broker. Everything runs in your infrastructure. Prefect separates the control plane (the API server) from the execution plane (workers). Workers poll the Prefect API for work rather than receiving pushed tasks. This means the control plane can be hosted externally (Prefect Cloud) while execution happens in your VPC, and it means the worker is a stateless process that can crash and restart without losing run state. For developers, the most visible difference is that Prefect flows are plain Python functions runnable locally without any infrastructure, while Airflow DAGs require an Airflow installation to run.
Q: How does Prefect handle task dependencies compared to Airflow?
In Airflow, task dependencies are explicitly declared using the >> operator or set_upstream()/set_downstream() methods. The dependency graph must be fully defined at parse time and cannot change at runtime. In Prefect, dependencies are implicit - they are inferred from data flow. When one task's return value is passed as an argument to another task, Prefect knows the second task depends on the first and will not start it until the first succeeds. You can also declare explicit dependencies without data passing using wait_for=[upstream_task_future]. This Python-native dependency model is more intuitive for engineers who think in terms of function composition rather than DAG construction.
Q: What is a Prefect Work Pool and why does it matter?
A Work Pool defines the execution environment where flow runs are dispatched. It is the answer to "where does this flow actually run?" - a Kubernetes cluster, an ECS cluster, local subprocesses, or serverless infrastructure like Modal. Workers are processes that join a work pool, poll the Prefect API for flow run assignments, and launch executions according to the pool's infrastructure type. The work pool abstraction decouples the flow definition from the execution environment - the same flow can be scheduled against a Process pool for development and a Kubernetes pool for production simply by changing the deployment configuration. This separation also means you can have multiple work pools with different infrastructure characteristics (a GPU pool and a CPU pool) and route specific deployments to the appropriate pool.
Q: How would you implement a data quality gate in Prefect (equivalent to Airflow's ShortCircuitOperator)?
Raise an exception in the validation task and catch it in the flow to decide whether to continue. Prefect does not have a direct equivalent to ShortCircuitOperator because its model is Python-native - you use normal Python control flow:
@task(name="validate-data")
def validate_data(date: str) -> dict:
# ...checks...
if row_count < 10_000:
raise ValueError(f"Insufficient data: {row_count} rows")
return metadata
@flow
def training_pipeline(date: str):
try:
metadata = validate_data(date)
except Exception:
notify_data_quality_failure(date)
return # early return - downstream tasks never run
# Continue only if validation passed
features = compute_features(metadata)
train_model(features)
Alternatively, use Prefect's allow_failure() wrapper with conditional logic, or use task state inspection with as_completed() for more complex branching patterns.
Q: What is a Prefect Deployment and how does it differ from just running a flow?
Running a flow directly (python flow.py or my_flow()) executes it immediately in the current Python process. A Deployment is a registered configuration that tells the Prefect control plane: "run this flow on this schedule, with these parameters, in this work pool, from this code location." Deployments persist in the Prefect API - they survive restarts, can be triggered via the UI or API, support parameter overrides at trigger time, and can be versioned. When you prefect deploy, you package the flow's entrypoint, parameters, schedule, and infrastructure configuration into a Deployment record. Workers then pull that configuration and execute the flow on the correct infrastructure. The deployment is how you go from "a Python script that works locally" to "a scheduled, monitored, retriggerable production pipeline."
Q: How does Prefect's event-driven automation system compare to Airflow's data-aware scheduling with Datasets?
Airflow Datasets are a tight coupling within the Airflow ecosystem - a producing DAG marks a Dataset as updated, and a consuming DAG is triggered by that update. Both DAGs must be in the same Airflow deployment and the dependency is tracked in Airflow's metadata database. This is elegant for intra-Airflow dependencies but cannot be triggered by external events.
Prefect Automations are more general - they can trigger on Prefect events (flow run completed, work queue backed up), external webhook events, or schedule-based conditions. You can trigger a Prefect flow from an S3 event notification, a Slack message, or an API call from any external system. The tradeoff is that Airflow's Dataset model is simpler and more auditable for intra-pipeline dependencies, while Prefect's Automation model is more flexible for cross-system event-driven workflows.
Q: How does Prefect handle backfill compared to Airflow?
Airflow has a native airflow dags backfill CLI command that creates historical DAG runs for a date range. Prefect does not have an equivalent native command - you handle backfill programmatically by triggering multiple flow runs with different parameter values:
from prefect.client.orchestration import get_client
import asyncio
from datetime import date, timedelta
async def backfill(start_date: date, end_date: date, deployment_name: str):
async with get_client() as client:
deployment = await client.read_deployment_by_name(deployment_name)
current = start_date
while current <= end_date:
await client.create_flow_run_from_deployment(
deployment.id,
parameters={"train_date": str(current)},
name=f"backfill-{current}",
)
current += timedelta(days=1)
asyncio.run(backfill(date(2024, 1, 1), date(2024, 1, 31), "churn-model-training/weekly-churn-features"))
This is more verbose but more flexible - you can control concurrency, add delays between runs, and target specific date ranges without Airflow's implicit catch-up behavior. The key requirement (as in Airflow) is that your tasks must be idempotent for backfill to be safe.
Prefect Blocks - Reusable Infrastructure Config
Prefect Blocks store reusable configuration and credentials - database connection strings, S3 bucket names, Slack webhooks, Docker image references. Blocks are stored in the Prefect server or Prefect Cloud and referenced by name in your flows.
from prefect.blocks.system import Secret
from prefect.filesystems import S3
from prefect.blocks.notifications import SlackWebhook
# Create and register blocks (done once, typically via Prefect UI or CLI)
# prefect block register -m prefect_aws
# Using a registered S3 block in a flow
@task
def write_results(df, date: str) -> str:
s3_block = S3.load("ml-data-prod") # resolves from Prefect server
output_path = f"processed/{date}/results.parquet"
# s3_block.basepath = "s3://company-ml-data"
df.to_parquet(s3_block.basepath + "/" + output_path)
return output_path
# Using a Secret block - credentials never appear in code
@task
def fetch_from_api(date: str) -> dict:
api_key = Secret.load("internal-api-key").get()
import requests
resp = requests.get(
"https://api.internal/events",
headers={"Authorization": f"Bearer {api_key}"},
params={"date": date},
)
resp.raise_for_status()
return resp.json()
# Slack notification block
@flow
def pipeline_with_notification(date: str):
try:
result = run_pipeline(date)
except Exception as e:
slack = SlackWebhook.load("ml-alerts")
slack.notify(f":red_circle: Pipeline failed for {date}: {e}")
raise
Blocks are versioned in Prefect - you can update a block's value (e.g., rotate a credential) without changing any flow code. All flows that reference the block by name pick up the new value on the next run.
Concurrent Task Execution and .map()
Prefect runs tasks concurrently by default when there is no data dependency between them. For explicit fan-out across a list of items, use .map() (Prefect 2.x+) or submit() within a thread pool:
from prefect import flow, task
from prefect.futures import PrefectFuture
from concurrent.futures import ThreadPoolExecutor
@task
def process_partition(partition_key: str) -> dict:
"""Process one partition - runs in parallel with other partitions."""
import pandas as pd
df = pd.read_parquet(f"s3://data/partitions/{partition_key}/data.parquet")
result = df.groupby("user_id").agg({"revenue": "sum", "sessions": "count"})
output_path = f"s3://data/processed/{partition_key}/aggregated.parquet"
result.to_parquet(output_path)
return {"partition": partition_key, "output": output_path, "rows": len(result)}
@task
def list_partitions(date: str) -> list[str]:
"""Discover all partitions for the given date."""
import boto3
s3 = boto3.client("s3")
response = s3.list_objects_v2(Bucket="data", Prefix=f"partitions/date={date}/", Delimiter="/")
return [cp["Prefix"].split("/")[-2] for cp in response.get("CommonPrefixes", [])]
@task
def merge_results(results: list[dict]) -> None:
"""Merge all partition results into a single output."""
import pandas as pd
paths = [r["output"] for r in results]
combined = pd.concat([pd.read_parquet(p) for p in paths])
combined.to_parquet(f"s3://data/daily/merged.parquet")
print(f"Merged {len(combined)} rows from {len(paths)} partitions")
@flow(name="parallel-partition-processing")
def process_all_partitions(date: str) -> None:
partitions = list_partitions(date)
# Submit all partition tasks concurrently - Prefect manages the fan-out
futures = [process_partition.submit(p) for p in partitions]
# Collect results - waits for all futures to complete
results = [f.result() for f in futures]
merge_results(results)
The submit() pattern submits tasks to Prefect's task runner without blocking - each task runs as soon as it is submitted and the flow continues. Using .result() later collects the output and waits if the task has not yet completed.
Prefect Concurrency Limits
Similar to Airflow Pools, Prefect provides concurrency limits at the tag level - limiting how many task runs with a given tag can execute simultaneously across all flow runs:
from prefect import task
@task(
tags=["gpu-training"], # tag for concurrency limit
name="train-model-variant",
)
def train_variant(config: dict) -> dict:
# This task will only run if the "gpu-training" concurrency limit allows it
# Set the limit via Prefect UI or API: prefect concurrency-limit create gpu-training 4
...
# Create a concurrency limit - max 4 gpu-training tasks across all flows
prefect concurrency-limit create gpu-training 4
This prevents your GPU cluster from being overloaded when multiple flows compete for training resources simultaneously.
Testing Prefect Flows
Because Prefect flows are plain Python, testing is straightforward. Tasks can be called directly in tests without any Prefect infrastructure:
# tests/test_churn_pipeline.py
import pytest
import pandas as pd
from unittest.mock import patch, MagicMock
from pipelines.churn_features import validate_training_data, compute_features
# Test tasks directly - no Prefect server needed
def test_validate_training_data_passes_with_good_data():
"""validate_training_data should return metadata for clean data."""
good_df = pd.DataFrame({
"user_id": range(15_000),
"churned": ([1] * 1500) + ([0] * 13500), # 10% churn rate
"last_login_date": "2024-01-01",
"payment_failures_30d": 0,
"payment_attempts_30d": 1,
})
with patch("pandas.read_parquet", return_value=good_df):
result = validate_training_data.fn("2024-01-15") # .fn() bypasses Prefect overhead
assert result["row_count"] == 15_000
assert result["max_null_rate"] < 0.05
def test_validate_training_data_raises_on_insufficient_rows():
"""validate_training_data should raise ValueError when row count is below threshold."""
small_df = pd.DataFrame({"user_id": range(500), "churned": 0, "last_login_date": "2024-01-01",
"payment_failures_30d": 0, "payment_attempts_30d": 1})
with patch("pandas.read_parquet", return_value=small_df):
with pytest.raises(ValueError, match="Insufficient rows"):
validate_training_data.fn("2024-01-15")
# Test the full flow in a local context
from prefect.testing.utilities import prefect_test_harness
def test_full_flow_runs_without_errors():
"""Integration test - runs the full flow locally with mocked external calls."""
with prefect_test_harness(): # sets up an in-memory Prefect server for the test
with patch("pandas.read_parquet") as mock_read, \
patch("pandas.DataFrame.to_parquet") as mock_write, \
patch("mlflow.start_run"):
mock_read.return_value = pd.DataFrame({
"user_id": range(15_000),
"churned": [1] * 1500 + [0] * 13500,
"last_login_date": "2024-01-01",
"payment_failures_30d": 0,
"payment_attempts_30d": 1,
"avg_session_length_7d": 30.0,
"support_tickets_90d": 0,
"plan_tier": "basic",
})
# Run the flow - all tasks execute in-process
from pipelines.churn_features import churn_feature_pipeline
churn_feature_pipeline(train_date="2024-01-15")
The .fn() method on a Prefect task calls the underlying Python function directly, bypassing all Prefect retry/caching logic. This is ideal for unit tests where you want to test the business logic without the orchestration layer.
The prefect_test_harness() context manager sets up a temporary in-memory Prefect API server - useful for integration tests that need to verify flow state, task counts, and artifact creation.
Prefect Recipes for Common ML Patterns
Pattern: Retry Only the Failed Partition
When processing large numbers of partitions and some fail, you do not want to re-run the entire flow - only the failed partitions. Use Prefect's allow_failure() wrapper to collect all futures (including failed ones) and re-submit only the failures:
from prefect import flow, task
from prefect.futures import PrefectFuture
from prefect.states import Failed
@flow
def resilient_partition_processing(date: str) -> None:
partitions = list_partitions(date)
futures = {p: process_partition.submit(p) for p in partitions}
failed_partitions = []
for partition, future in futures.items():
state = future.wait()
if state.is_failed():
failed_partitions.append(partition)
if failed_partitions:
# Retry failed partitions once with a different strategy
retry_futures = {p: process_partition.submit(p, retry_mode="aggressive") for p in failed_partitions}
for partition, future in retry_futures.items():
state = future.wait()
if state.is_failed():
raise RuntimeError(f"Partition {partition} failed after retry")
Pattern: Conditional Sub-flow
Prefect supports calling flows from within flows (sub-flows). This is useful for composing large pipelines from independently deployable pieces:
from pipelines.feature_engineering import feature_engineering_flow
from pipelines.model_training import model_training_flow
@flow(name="full-ml-platform-pipeline")
def full_pipeline(date: str) -> None:
# Sub-flows run synchronously by default - call with run_sync=False for parallel
feature_result = feature_engineering_flow(date=date)
training_result = model_training_flow(
date=date,
feature_path=feature_result["output_path"],
)
if training_result["deployed"]:
notify_deployment_success(training_result)
Sub-flows appear as separate flow runs in the Prefect UI, with their own task-level observability, while still being linked to the parent run for end-to-end tracing.
