Skip to main content

Prefect

The 3 AM Incident

It is 3:17 AM on a Tuesday. Your phone buzzes. The on-call alert reads: "Training pipeline failed - step 4 of 11." You open your laptop, pull up Airflow, and stare at a sea of red squares. The logs span 40,000 lines. The error itself is buried somewhere around line 31,000. You scroll. You scroll more. The error turns out to be a transient S3 timeout that would have resolved itself on a retry - except your Airflow DAG has no retry logic because adding retries to a PythonOperator in Airflow 1.x requires wrapping everything in boilerplate you never had time to write.

You fix the pipeline by manually re-running the failed task from the Airflow UI - except Airflow 1.x cannot resume from the middle of a DAG run. It re-runs from the beginning. Two hours of GPU time, wasted. Your model training job finishes at 5 AM, your model evaluation completes at 6 AM, and by the time the deployment step runs, it is already your standup. You are explaining to your team why the Friday model did not deploy until Wednesday.

This is not a hypothetical. This is what MLOps looked like for thousands of teams running Airflow 1.x pipelines in 2019 and 2020. Airflow was built for data engineering workflows - ETL jobs, database syncs, reporting pipelines. It was never designed for ML workflows, where steps are long-running, expensive, stateful, and need to resume from the middle. The pain was real and widespread.

Prefect was built by engineers who felt exactly this pain. Its founding premise was simple: orchestration should not fight your Python code. Your workflows should be Python-first, not DAG-first. Retries should be one decorator. Caching should be one argument. Observability should come for free. You should never have to restart from the beginning because step 4 failed.

This lesson covers Prefect 2.x and 3.x - the modern rewrite that threw away the Airflow-inspired architecture and rebuilt everything from scratch. By the end, you will understand how to build a complete ML pipeline in Prefect, deploy it with work pools, add production-grade retries and caching, and monitor it through Prefect Cloud's observability dashboard.


:::tip 🎮 Interactive Playground Visualize this concept: Try the ML Pipeline Orchestration demo on the EngineersOfAI Playground - no code required. :::

Why Prefect Exists

Before Prefect, the dominant tool was Apache Airflow. Airflow defined workflows as Directed Acyclic Graphs (DAGs) using a Python DSL that felt like Python but was not really Python - you could not use loops, conditionals, or dynamic values at runtime because the DAG structure was evaluated at import time, not at execution time.

The problems with Airflow for ML workflows were structural:

  • No dynamic task generation at runtime - you had to know your task graph shape when you wrote the DAG
  • No first-class retry with backoff - you needed extensive configuration per-operator
  • No resumption from failure - re-running a failed DAG run restarted from the beginning
  • Heavy infrastructure overhead - Airflow needed a metadata database, a scheduler process, a web server, and at least one worker process just to run a single task
  • Confusing state model - the difference between a DAG run state and a task instance state was non-obvious
  • Airflow 1.x was actively hostile to modern Python - no async support, no type hints in the core API

Prefect 1.x tried to solve these problems while maintaining backward compatibility with Airflow concepts. The result was Prefect 1.x: better than Airflow, but still conceptually similar. Prefect 2.x (released 2022) was a clean-slate rewrite. The team looked at what ML engineers actually needed and built a new system from those requirements.

:::tip The Core Insight Prefect's core insight is that a workflow is just a Python function that calls other Python functions. The orchestrator should add observability, retries, and scheduling around normal Python code - not force you to rewrite your code in a special DSL. :::


Historical Context

Prefect was founded in 2018 by Jeremiah Lowin, who had previously led data infrastructure at a hedge fund. The 1.x series launched publicly in 2019 and gained traction as a "better Airflow" - it had a cleaner API, a commercial cloud offering, and first-class support for dynamic pipelines.

The real shift came in 2022 with Prefect 2.x, codenamed "Orion." The rewrite took the team nearly two years. The key design decisions that defined Prefect 2.x:

  • Flows are just decorated Python functions - no DAG class, no operator pattern
  • Tasks are just decorated Python functions - no inheritance, no special base class
  • State is tracked server-side - the execution environment does not need a database
  • Deployments are first-class - scheduling and infrastructure concerns are separate from flow logic
  • Work pools decouple infrastructure from pipelines - one pool can serve many deployments

Prefect 3.x (released 2024) refined the model further, with better performance, improved async support, and a cleaner API for results and artifacts.


Core Concepts

Flows and Tasks

Everything in Prefect starts with two decorators: @flow and @task.

from prefect import flow, task

@task
def fetch_training_data(dataset_path: str) -> dict:
"""Download and validate training data."""
import pandas as pd
df = pd.read_parquet(dataset_path)
return {"shape": df.shape, "columns": list(df.columns), "data": df}

@flow(name="ml-training-pipeline")
def training_pipeline(dataset_path: str, model_name: str = "xgboost"):
data = fetch_training_data(dataset_path)
print(f"Loaded dataset: {data['shape']}")

When you call training_pipeline(...), Prefect:

  1. Creates a flow run in the Prefect backend (local SQLite or Prefect Cloud)
  2. Tracks each task invocation as a task run
  3. Records state transitions (Pending → Running → Completed or Failed)
  4. Persists results if configured

The critical difference from Airflow: this is real Python. You can call fetch_training_data in a loop, in a conditional, inside a try/except. The task graph is determined at runtime, not at import time.

Task Runners

Task runners determine how tasks execute - sequentially, concurrently, or distributed across a cluster.

from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner, SequentialTaskRunner
from prefect_dask import DaskTaskRunner

@task
def train_fold(fold_id: int, X_train, y_train, X_val, y_val) -> dict:
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score

model = GradientBoostingClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
val_preds = model.predict_proba(X_val)[:, 1]
auc = roc_auc_score(y_val, val_preds)
return {"fold": fold_id, "auc": auc, "model": model}

# Sequential - one fold at a time
@flow(task_runner=SequentialTaskRunner())
def sequential_cv_pipeline(n_folds: int = 5):
results = []
for fold_id in range(n_folds):
result = train_fold.submit(fold_id, ...) # .submit() for async execution
results.append(result)
return [r.result() for r in results]

# Concurrent - all folds in parallel (thread-based)
@flow(task_runner=ConcurrentTaskRunner())
def concurrent_cv_pipeline(n_folds: int = 5):
futures = [train_fold.submit(fold_id, ...) for fold_id in range(n_folds)]
return [f.result() for f in futures]

# Dask - distributed across a Dask cluster
@flow(task_runner=DaskTaskRunner(address="tcp://dask-scheduler:8786"))
def distributed_cv_pipeline(n_folds: int = 5):
futures = [train_fold.submit(fold_id, ...) for fold_id in range(n_folds)]
return [f.result() for f in futures]

:::note When to Use Each Runner

  • SequentialTaskRunner: debugging, development, when tasks have side effects that conflict
  • ConcurrentTaskRunner: I/O-bound tasks (API calls, database reads) - thread-based, not CPU-bound
  • DaskTaskRunner: CPU-bound parallel tasks (model training folds, feature engineering) - true multiprocessing :::

Retries with Exponential Backoff

This is where Prefect truly shines compared to Airflow. Retries are a single decorator argument:

import time
from prefect import task
from prefect.tasks import exponential_backoff

@task(
retries=3,
retry_delay_seconds=exponential_backoff(backoff_factor=2),
retry_jitter_factor=0.5, # adds randomness to avoid thundering herd
)
def call_feature_store_api(entity_ids: list[str]) -> dict:
"""Fetch features from an external feature store API."""
import httpx

response = httpx.post(
"https://feature-store.internal/batch",
json={"entity_ids": entity_ids},
timeout=30.0,
)
response.raise_for_status()
return response.json()

The exponential_backoff(backoff_factor=2) creates delays of approximately [2, 4, 8] seconds between attempts, with the retry_jitter_factor adding up to 50% randomness to each delay. This prevents all failed tasks from hammering a recovering service simultaneously.

You can also retry on specific exceptions only:

from prefect import task
import boto3
from botocore.exceptions import ClientError

@task(
retries=5,
retry_delay_seconds=30,
retry_condition_fn=lambda task, run, state: (
isinstance(state.result(raise_on_failure=False), ClientError)
and state.result(raise_on_failure=False).response["Error"]["Code"]
in ["ThrottlingException", "RequestLimitExceeded"]
),
)
def read_from_s3(bucket: str, key: str) -> bytes:
s3 = boto3.client("s3")
return s3.get_object(Bucket=bucket, Key=key)["Body"].read()

Caching with Cache Key Functions

Caching lets you skip re-running expensive tasks when their inputs have not changed - critical for ML pipelines where data ingestion and feature engineering can take hours.

from prefect import task
from prefect.tasks import task_input_hash
from datetime import timedelta
import hashlib

# Simple caching - uses task inputs as the cache key
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=24))
def load_raw_features(dataset_version: str, split: str) -> dict:
"""Load features - cached for 24 hours if same version+split."""
import pandas as pd
df = pd.read_parquet(f"s3://my-bucket/features/{dataset_version}/{split}.parquet")
return df.to_dict(orient="split")

# Custom cache key - hash based on file content, not just filename
def file_content_cache_key(context, parameters):
import boto3
s3 = boto3.client("s3")
bucket = parameters["bucket"]
key = parameters["key"]
head = s3.head_object(Bucket=bucket, Key=key)
etag = head["ETag"]
return hashlib.md5(f"{bucket}/{key}/{etag}".encode()).hexdigest()

@task(cache_key_fn=file_content_cache_key, cache_expiration=timedelta(days=7))
def compute_embeddings(bucket: str, key: str, model_name: str) -> list:
"""Compute embeddings - only re-runs if the file actually changed."""
# ... expensive embedding computation
pass

Results Persistence

By default, task results live in memory and are garbage-collected when the flow run completes. For ML pipelines where tasks produce large artifacts (datasets, model weights), you want to persist results to storage:

from prefect import flow, task
from prefect.filesystems import S3

# Configure result storage (do this once, or set in Prefect Cloud UI)
result_storage = S3(bucket_path="my-bucket/prefect-results")

@task(result_storage=result_storage, persist_result=True)
def train_model(X_train, y_train, hyperparams: dict):
import pickle
from sklearn.ensemble import RandomForestClassifier

model = RandomForestClassifier(**hyperparams)
model.fit(X_train, y_train)
return model # serialized to S3 automatically

@task(persist_result=True)
def evaluate_model(model, X_test, y_test) -> dict:
from sklearn.metrics import classification_report
preds = model.predict(X_test)
return classification_report(y_test, preds, output_dict=True)

Complete ML Pipeline Example

Here is a full, production-ready ML pipeline in Prefect - migrating from a broken Airflow 1.x pattern. This pipeline covers data ingestion, feature engineering, model training, evaluation, and conditional deployment.

# ml_pipeline.py
import logging
from datetime import timedelta
from typing import Optional

import pandas as pd
import numpy as np
from prefect import flow, task, get_run_logger
from prefect.artifacts import create_table_artifact, create_markdown_artifact
from prefect.tasks import task_input_hash, exponential_backoff
from prefect.task_runners import ConcurrentTaskRunner

logger = logging.getLogger(__name__)


# ──────────────────────────────────────────────────────────────
# Step 1: Data Ingestion
# ──────────────────────────────────────────────────────────────

@task(
name="ingest-raw-data",
retries=3,
retry_delay_seconds=exponential_backoff(backoff_factor=2),
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=12),
tags=["data", "ingestion"],
)
def ingest_raw_data(
source_path: str,
date_partition: str,
) -> pd.DataFrame:
log = get_run_logger()
log.info(f"Ingesting data from {source_path} for partition {date_partition}")

df = pd.read_parquet(f"{source_path}/{date_partition}/")
log.info(f"Loaded {len(df):,} rows, {df.shape[1]} columns")

# Validate schema
required_cols = ["user_id", "event_type", "timestamp", "amount", "label"]
missing = set(required_cols) - set(df.columns)
if missing:
raise ValueError(f"Missing required columns: {missing}")

return df


# ──────────────────────────────────────────────────────────────
# Step 2: Feature Engineering
# ──────────────────────────────────────────────────────────────

@task(
name="engineer-features",
retries=2,
retry_delay_seconds=10,
tags=["features"],
)
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
log = get_run_logger()
log.info("Starting feature engineering")

df = df.copy()
df["hour_of_day"] = pd.to_datetime(df["timestamp"]).dt.hour
df["day_of_week"] = pd.to_datetime(df["timestamp"]).dt.dayofweek
df["log_amount"] = np.log1p(df["amount"])

# Rolling aggregates per user
df = df.sort_values(["user_id", "timestamp"])
df["user_txn_count_7d"] = (
df.groupby("user_id")["amount"]
.transform(lambda x: x.rolling(window=7, min_periods=1).count())
)
df["user_avg_amount_7d"] = (
df.groupby("user_id")["amount"]
.transform(lambda x: x.rolling(window=7, min_periods=1).mean())
)

feature_cols = [
"hour_of_day", "day_of_week", "log_amount",
"user_txn_count_7d", "user_avg_amount_7d",
]
log.info(f"Engineered {len(feature_cols)} features")
return df[feature_cols + ["label"]]


# ──────────────────────────────────────────────────────────────
# Step 3: Train/Val Split
# ──────────────────────────────────────────────────────────────

@task(name="split-dataset", tags=["data"])
def split_dataset(
df: pd.DataFrame,
val_fraction: float = 0.2,
random_seed: int = 42,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
log = get_run_logger()

feature_cols = [c for c in df.columns if c != "label"]
X = df[feature_cols]
y = df["label"]

from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=val_fraction, random_state=random_seed, stratify=y
)
log.info(f"Train: {len(X_train):,} | Val: {len(X_val):,}")
return X_train, X_val, y_train, y_val


# ──────────────────────────────────────────────────────────────
# Step 4: Model Training
# ──────────────────────────────────────────────────────────────

@task(
name="train-model",
tags=["training"],
persist_result=True,
)
def train_model(
X_train: pd.DataFrame,
y_train: pd.Series,
hyperparams: dict,
) -> object:
log = get_run_logger()
log.info(f"Training with hyperparams: {hyperparams}")

from sklearn.ensemble import GradientBoostingClassifier
model = GradientBoostingClassifier(**hyperparams, random_state=42)
model.fit(X_train, y_train)

log.info("Model training complete")
return model


# ──────────────────────────────────────────────────────────────
# Step 5: Evaluation
# ──────────────────────────────────────────────────────────────

@task(name="evaluate-model", tags=["evaluation"])
def evaluate_model(
model,
X_val: pd.DataFrame,
y_val: pd.Series,
) -> dict:
log = get_run_logger()

from sklearn.metrics import roc_auc_score, average_precision_score, f1_score

val_proba = model.predict_proba(X_val)[:, 1]
val_preds = (val_proba >= 0.5).astype(int)

metrics = {
"roc_auc": round(roc_auc_score(y_val, val_proba), 4),
"avg_precision": round(average_precision_score(y_val, val_proba), 4),
"f1": round(f1_score(y_val, val_preds), 4),
}
log.info(f"Evaluation results: {metrics}")

# Create a Prefect artifact - shows up in the UI
create_table_artifact(
key="model-metrics",
table=[{"metric": k, "value": v} for k, v in metrics.items()],
description="Validation set evaluation metrics",
)
return metrics


# ──────────────────────────────────────────────────────────────
# Step 6: Conditional Deployment
# ──────────────────────────────────────────────────────────────

@task(name="deploy-model", tags=["deployment"])
def deploy_model(
model,
metrics: dict,
auc_threshold: float = 0.85,
) -> Optional[str]:
log = get_run_logger()

if metrics["roc_auc"] < auc_threshold:
log.warning(
f"Model AUC {metrics['roc_auc']:.4f} below threshold {auc_threshold}. "
"Skipping deployment."
)
create_markdown_artifact(
key="deployment-decision",
markdown=f"**SKIPPED** - AUC {metrics['roc_auc']:.4f} < threshold {auc_threshold}",
)
return None

import pickle, boto3, datetime
model_key = f"models/fraud-detector/{datetime.date.today()}/model.pkl"
s3 = boto3.client("s3")
s3.put_object(
Bucket="my-models-bucket",
Key=model_key,
Body=pickle.dumps(model),
)
log.info(f"Model deployed to s3://my-models-bucket/{model_key}")

create_markdown_artifact(
key="deployment-decision",
markdown=f"**DEPLOYED** to `s3://my-models-bucket/{model_key}`\n\nAUC: {metrics['roc_auc']:.4f}",
)
return model_key


# ──────────────────────────────────────────────────────────────
# Main Flow
# ──────────────────────────────────────────────────────────────

@flow(
name="fraud-detection-training",
description="End-to-end fraud detection model training and deployment",
task_runner=ConcurrentTaskRunner(),
log_prints=True,
)
def fraud_detection_pipeline(
source_path: str = "s3://data-lake/fraud/events",
date_partition: str = "2024-01-15",
n_estimators: int = 200,
max_depth: int = 5,
learning_rate: float = 0.05,
auc_threshold: float = 0.85,
):
# Sequential flow - each step depends on the previous
raw_df = ingest_raw_data(source_path, date_partition)
feature_df = engineer_features(raw_df)
X_train, X_val, y_train, y_val = split_dataset(feature_df)

hyperparams = {
"n_estimators": n_estimators,
"max_depth": max_depth,
"learning_rate": learning_rate,
}
model = train_model(X_train, y_train, hyperparams)
metrics = evaluate_model(model, X_val, y_val)
deployment_key = deploy_model(model, metrics, auc_threshold)

return {"metrics": metrics, "deployment": deployment_key}


if __name__ == "__main__":
result = fraud_detection_pipeline(
source_path="./data/fraud",
date_partition="2024-01-15",
)
print(f"Pipeline complete: {result}")

Deployments and Work Pools

Running a flow locally is useful for development. Production requires deployments - a way to schedule flows, parameterize them, and run them on managed infrastructure.

# deploy.py
from prefect import flow
from fraud_pipeline import fraud_detection_pipeline

# Create a deployment from the flow
if __name__ == "__main__":
fraud_detection_pipeline.deploy(
name="fraud-detector-daily",
work_pool_name="kubernetes-pool",
cron="0 2 * * *", # Run at 2 AM daily
parameters={
"source_path": "s3://data-lake/fraud/events",
"auc_threshold": 0.87,
},
tags=["production", "fraud"],
description="Daily fraud detection model retraining",
)

Work pools represent infrastructure - they decouple your pipeline logic from where it runs:

# Create a Kubernetes work pool
prefect work-pool create kubernetes-pool --type kubernetes

# Start a worker that polls the work pool
prefect worker start --pool kubernetes-pool

The worker process runs wherever you want your tasks to execute - on a VM, in a Kubernetes pod, or on your laptop during development. The Prefect server (cloud or self-hosted) coordinates scheduling and state tracking without needing to be on the same network as the worker.

# prefect.yaml - deployment configuration file
name: fraud-detector-daily
prefect-version: "3.0.0"

build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.0
image_name: my-registry/fraud-detector
tag: latest
dockerfile: Dockerfile

push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.3.0
image_name: "{{ build_image.image_name }}"
tag: "{{ build_image.tag }}"

deployments:
- name: fraud-detector-daily
work_pool:
name: kubernetes-pool
job_variables:
image: "{{ build_image.image }}"
cpu_request: "2000m"
memory_request: "4Gi"
schedules:
- cron: "0 2 * * *"
timezone: "UTC"
parameters:
source_path: "s3://data-lake/fraud/events"
auc_threshold: 0.87

Prefect Cloud vs Self-Hosted

Prefect Cloud is the managed SaaS offering. The API server, dashboard, and metadata store are hosted by Prefect. Your code still runs in your infrastructure - only state metadata and coordination traffic crosses to Prefect Cloud. This is the recommended starting point for most teams.

Self-hosted Prefect Server runs the same API server in your own infrastructure. You manage a PostgreSQL database and the server process. Use this when you have strict data residency requirements or when Prefect Cloud's pricing does not work at your scale.


Flow Architecture Diagram


Prefect vs Airflow: API Comparison

The table below shows equivalent patterns in Airflow 2.x vs Prefect 2.x/3.x:

PatternAirflow 2.xPrefect 2.x/3.x
Define workflow@dag decorator@flow decorator
Define task@task decorator@task decorator
Retry@task(retries=3, retry_delay=timedelta(seconds=30))@task(retries=3, retry_delay_seconds=30)
Exponential backoffNot built-in - custom retry policyretry_delay_seconds=exponential_backoff(2)
Caching@task(trigger_rule=...) - not real caching@task(cache_key_fn=task_input_hash)
Dynamic tasksLimited - expand() in 2.3+Full Python: loops, conditionals, recursion
ConcurrencyRequires Celery/K8s executorBuilt-in ConcurrentTaskRunner
DistributedRequires CeleryExecutor setupPlug in DaskTaskRunner
ResultsXComs - 48KB limit by defaultArbitrary - stored in S3/GCS/local
ObservabilityBasic logs + Gantt chartFull artifact tracking, metrics, Markdown

Production Engineering Notes

Subflow Patterns for Modular Pipelines

Break large pipelines into subflows that can run independently:

from prefect import flow

@flow(name="data-ingestion")
def data_ingestion_flow(date: str) -> dict:
raw = ingest_raw_data("s3://...", date)
features = engineer_features(raw)
return {"features": features, "date": date}

@flow(name="model-training")
def model_training_flow(features_ref: dict, hyperparams: dict) -> dict:
X_train, X_val, y_train, y_val = split_dataset(features_ref["features"])
model = train_model(X_train, y_train, hyperparams)
metrics = evaluate_model(model, X_val, y_val)
return {"model": model, "metrics": metrics}

@flow(name="master-pipeline")
def master_pipeline(date: str):
# Subflows appear as nested flow runs in the UI
data_result = data_ingestion_flow(date)
training_result = model_training_flow(data_result, {"n_estimators": 200})
deploy_model(training_result["model"], training_result["metrics"])

Secrets Management

from prefect.blocks.system import Secret

@task
def fetch_from_database():
# Secrets are stored encrypted in Prefect Cloud / Server
db_password = Secret.load("production-db-password").get()
conn_string = f"postgresql://user:{db_password}@db-host:5432/mydb"
# ... use connection

Observability with Artifacts

from prefect.artifacts import create_markdown_artifact, create_link_artifact
import matplotlib.pyplot as plt
import io, base64

@task
def create_training_report(model, metrics: dict, feature_importance: dict):
# Embed a plot as base64 in a Markdown artifact
fig, ax = plt.subplots(figsize=(8, 4))
features = list(feature_importance.keys())
importance = list(feature_importance.values())
ax.barh(features, importance)
ax.set_title("Feature Importance")

buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
buf.seek(0)
img_b64 = base64.b64encode(buf.read()).decode()

create_markdown_artifact(
key="training-report",
markdown=f"""
## Training Report

| Metric | Value |
|--------|-------|
| ROC AUC | {metrics['roc_auc']:.4f} |
| Avg Precision | {metrics['avg_precision']:.4f} |
| F1 Score | {metrics['f1']:.4f} |

![Feature Importance](data:image/png;base64,{img_b64})
""",
)

Common Mistakes

:::danger Returning Large DataFrames from Tasks Do not return large pandas DataFrames or NumPy arrays from tasks without configuring result storage. By default, task results are stored in memory and passed between tasks via serialization. A 2 GB DataFrame will be serialized, held in memory by the Prefect engine, and deserialized by the next task - tripling your memory usage.

Fix: Configure result storage with S3/GCS, or use result serializers that write to disk. :::

:::danger Using Global State in Tasks Tasks must be stateless. Do not use module-level variables that tasks mutate:

# WRONG - global state breaks when tasks run concurrently
model_cache = {}

@task
def get_model(model_id: str):
if model_id not in model_cache: # Race condition!
model_cache[model_id] = load_model(model_id)
return model_cache[model_id]

Fix: Use Prefect's caching (cache_key_fn) or pass models explicitly between tasks. :::

:::warning Calling Tasks Outside a Flow Tasks can only be called with full Prefect tracking inside a @flow function. Calling a @task-decorated function directly outside a flow will run it as a plain Python function with no retry, caching, or state tracking.

# This runs WITHOUT Prefect tracking - no retries, no caching
result = ingest_raw_data("s3://bucket", "2024-01-15")

# This runs WITH Prefect tracking
@flow
def my_flow():
result = ingest_raw_data("s3://bucket", "2024-01-15")

:::

:::warning Ignoring Task Runner Memory Limits The ConcurrentTaskRunner uses threads, not processes. CPU-bound tasks (model training, feature engineering with heavy NumPy) will NOT be parallelized - the GIL prevents it. Use DaskTaskRunner for CPU-bound workloads. :::


Interview Questions and Answers

Q1: How does Prefect's task execution model differ from Airflow's DAG model?

Airflow evaluates the DAG structure at import time - the task graph is static and determined before any data exists. Prefect evaluates the flow function at runtime, so the task graph is determined by the actual execution of your Python code. This means Prefect flows can use Python loops to dynamically generate tasks, branch based on real data values, and call the same task multiple times with different inputs. Airflow 2.x added limited dynamic task expansion via task.expand(), but Prefect's model is fundamentally more Pythonic.

Q2: What is the difference between a Work Pool and a Worker in Prefect?

A Work Pool is a configuration object that defines the infrastructure type (Kubernetes, Docker, local process, etc.) and its settings. It does not execute anything - it is a template. A Worker is a long-running process that polls a specific Work Pool for flow runs to execute. When Prefect Cloud schedules a run, it places it in the work pool's queue. A worker picks it up and executes it on the local machine where the worker process is running. This decoupling means your pipeline code does not need to know about infrastructure details, and you can change where pipelines run by changing the work pool configuration.

Q3: Explain Prefect's caching mechanism. When does a cached task re-run?

Prefect caching works by computing a cache key for a task run based on the cache_key_fn argument. The built-in task_input_hash function hashes the task's name and its inputs. When a task runs, Prefect checks whether a completed run with the same cache key exists within the cache_expiration window. If yes, the task returns the cached result without executing. The cache key can be any function - you can hash file ETags, database row counts, or any other signal that represents "the inputs have changed." Tasks re-run when the cache key changes or when the expiration window expires.

Q4: How do you handle secrets in Prefect without hardcoding credentials?

Prefect has a Blocks system for storing configuration and secrets. Secrets are stored encrypted in Prefect Cloud or your self-hosted server. You create a secret block via the UI or CLI and reference it by name in your code using Secret.load("my-secret").get(). For infrastructure credentials (AWS, GCP), you can store them as credential blocks (e.g., AWSCredentials) that are automatically used by integrations like prefect-aws. The credentials never appear in your code or version control.

Q5: How would you migrate an Airflow 1.x DAG to Prefect?

The migration has four steps. First, identify each PythonOperator task in the DAG and convert it to a @task-decorated function - the Python callables are often reusable as-is. Second, convert the DAG function to a @flow function and replace the >> dependency syntax with direct Python function calls (since Prefect infers dependencies from the call graph). Third, add retry logic and caching to tasks that previously had none - this is where you get immediate value. Fourth, create a deployment to replace the Airflow scheduler's schedule. The most common trap is Airflow's XCom pattern - in Airflow, tasks communicate via XComs stored in the metadata database. In Prefect, tasks return values directly and pass them as function arguments, which is more natural and removes the 48KB XCom size limit.

Q6: What happens when a Prefect task raises an exception?

When a task raises an exception, Prefect catches it and transitions the task run to a Failed state. The exception is stored in the task run's state data. If the task has retries configured, Prefect schedules a retry after the retry_delay_seconds period. After all retries are exhausted, the task run remains in Failed state. The parent flow run then receives a Failed state for the task future, and if the flow function does not catch this, the flow run itself transitions to Failed. You can handle task failures gracefully using .result(raise_on_failure=False) on task futures, or by wrapping task calls in try/except blocks within the flow.

© 2026 EngineersOfAI. All rights reserved.