Skip to main content

Metaflow

How Netflix Runs 10,000 ML Experiments a Year

The year is 2018. Netflix's ML platform team has a problem that sounds like a good problem to have: too many data scientists doing too much ML. More than 100 researchers are running experiments - recommendation models, content ranking, A/B test analysis, content demand forecasting - across a hybrid infrastructure of laptops, on-premise clusters, and AWS. Every researcher has their own way of organizing code. Experiments cannot be reproduced because the environment that produced them is a snowflake. Jobs that need GPU instances require filing tickets with the infrastructure team. When a researcher leaves, their work is often lost because it lives in a Jupyter notebook on their laptop.

The infrastructure team tried Airflow. It failed the researchers immediately - the DAG model was incompatible with the way data scientists think. A researcher wants to write a loop that trains 50 model variants with different hyperparameters. In Airflow, that requires defining 50 tasks statically at DAG parse time. The mental model is wrong for exploration-heavy work. Luigi was similar. Kubernetes Jobs were too low-level. The researchers needed something that felt like Python, not like infrastructure.

Metaflow was born from this frustration. The core idea was radical in its simplicity: a workflow is just a Python class, and each step is just a method. You write Python the way you always have. Metaflow handles the hard parts - moving data between steps, running steps on cloud compute, versioning every run, and making old results reproducible. The framework was designed so that a data scientist could go from laptop experimentation to running on 100 AWS Batch nodes with a single decorator change.

Netflix open-sourced Metaflow in 2019. It has since been adopted by hundreds of companies, with a managed cloud offering (Metaflow Cloud, formerly Outerbounds) providing the infrastructure layer. What makes Metaflow distinctive is its philosophy: the framework should adapt to the researcher's workflow, not the other way around. You do not need to learn a new DAG DSL, a YAML configuration language, or a platform-specific API. You write Python classes.

This lesson covers Metaflow's core abstractions - the Flow class, @step decorator, branching with join, artifact persistence with self.data, and cloud execution with @batch and @kubernetes - culminating in a complete Netflix-style feature store population flow with branching, parameter sweeps, and Cards for documentation.


:::tip 🎮 Interactive Playground Visualize this concept: Try the ML Pipeline Orchestration demo on the EngineersOfAI Playground - no code required. :::

Why Metaflow Exists

The fundamental problem Metaflow solves is the gap between local ML development and production execution. This gap manifests in three ways:

The Reproducibility Gap: A model trained last month cannot be retrained because nobody remembers the exact data slice, preprocessing parameters, and library versions used. The experiment ran on someone's laptop and was never tracked.

The Scale Gap: A training job that takes 2 minutes on 1 machine takes 20 hours on 1 machine when scaled to full data. The researcher needs 100 machines for 12 minutes, but accessing those machines requires infrastructure tickets, custom job submission scripts, and manual log aggregation.

The Collaboration Gap: Researcher A built a great feature engineering pipeline. Researcher B rebuilds it from scratch because A's code is not structured in a way that is easy to import and run. Knowledge does not accumulate.

Metaflow addresses all three:

  • Every step's outputs are automatically persisted as named artifacts, versioned by run ID
  • The same flow runs locally or on AWS Batch/Kubernetes with a single decorator
  • Flows are Python classes that can be imported, extended, and run by anyone on the team

:::tip The Key Philosophy Metaflow's philosophy is "best practices by default, not by policy." Instead of telling engineers what to do, it makes the right thing the easy thing. Persisting your step output requires no extra code - it happens automatically when you assign self.x = value. :::


Historical Context

Metaflow was built at Netflix between 2016 and 2018 by a team led by Ville Tuulos (who later founded Outerbounds). The team studied how data scientists actually worked - not how ML textbooks said they should work - and designed the framework around observed patterns.

The earliest version used Amazon S3 for artifact storage and AWS Batch for compute. The framework was built to be incrementally adoptable: you could start running a flow locally and add cloud execution later by adding a decorator. This incremental path was crucial for adoption - researchers did not have to rewrite their code to get cloud scale.

Metaflow open-sourced in December 2019. Outerbounds, the company Tuulos co-founded, launched Metaflow Cloud (formerly called the Metaflow managed offering) in 2021 - adding a metadata service, an artifact store, and a UI for browsing runs and artifacts.


Core Concepts: The Flow-Step Model

Defining a Flow

Every Metaflow workflow is a Python class that inherits from FlowSpec. Each method decorated with @step is a pipeline step. Steps are connected by calling self.next() at the end of each step.

from metaflow import FlowSpec, step

class SimpleMLFlow(FlowSpec):

@step
def start(self):
"""Entry point - always the first step."""
self.dataset_version = "v2024-03-01"
print(f"Starting pipeline with dataset {self.dataset_version}")
self.next(self.load_data)

@step
def load_data(self):
"""Load the training dataset."""
import pandas as pd
# self.anything is automatically persisted between steps
self.df = pd.read_parquet(f"s3://data-lake/features/{self.dataset_version}.parquet")
self.num_rows = len(self.df)
print(f"Loaded {self.num_rows:,} rows")
self.next(self.train_model)

@step
def train_model(self):
"""Train the model on the loaded data."""
from sklearn.ensemble import RandomForestClassifier
feature_cols = [c for c in self.df.columns if c != "label"]
X = self.df[feature_cols]
y = self.df["label"]

self.model = RandomForestClassifier(n_estimators=100, random_state=42)
self.model.fit(X, y)
self.next(self.end)

@step
def end(self):
"""Final step - always required."""
print(f"Training complete. Model: {self.model}")


if __name__ == "__main__":
SimpleMLFlow()

Run this flow with:

python flow.py run # local execution
python flow.py run --with batch # every step on AWS Batch
python flow.py run --with kubernetes # every step on Kubernetes

Artifact Persistence

The most powerful feature of Metaflow is implicit artifact persistence. Anything you assign to self inside a step is automatically serialized and stored in the artifact store (S3 or local filesystem). When the next step starts, Metaflow deserializes the previous step's artifacts and makes them available as attributes.

@step
def train_model(self):
# self.df was set in load_data - Metaflow automatically restored it
model = RandomForestClassifier()
model.fit(self.df[feature_cols], self.df["label"])

# These will be available in ALL subsequent steps
self.model = model
self.feature_columns = feature_cols
self.next(self.evaluate)

This works across machine boundaries. If load_data ran on Machine A and train_model runs on Machine B (via AWS Batch), Metaflow reads the artifacts from S3 and reconstructs them transparently.


Key Decorators

@retry and @timeout

from metaflow import FlowSpec, step, retry, timeout, catch

class RobustFlow(FlowSpec):

@retry(times=3, minutes_between_retries=2)
@timeout(minutes=30)
@step
def call_external_api(self):
"""Fetch features from an external API with retries."""
import httpx
response = httpx.post(
"https://feature-store.internal/batch",
json={"entity_ids": self.entity_ids},
timeout=60.0,
)
self.features = response.json()
self.next(self.process_features)

@catch(var="exception_info")
@step
def risky_step(self):
"""If this step fails, store the exception and continue."""
# self.exception_info will be set if an exception occurs
import json
result = some_risky_operation()
self.result = result
self.next(self.end)

@step
def end(self):
if self.exception_info:
print(f"risky_step failed: {self.exception_info}")
else:
print(f"Result: {self.result}")

@resources for Cloud Compute

from metaflow import FlowSpec, step, batch, resources, kubernetes

class ResourceIntensiveFlow(FlowSpec):

@resources(cpu=4, memory=16000) # 4 vCPUs, 16 GB RAM
@batch(image="my-registry/ml-training:cuda12")
@step
def train_large_model(self):
"""Runs on AWS Batch with 4 vCPUs and 16 GB RAM."""
import torch
# GPU-intensive training here
self.model_weights = train_with_gpu()
self.next(self.end)

@resources(cpu=1, memory=4000, gpu=1)
@kubernetes(namespace="ml-team", image="my-registry/gpu-inference:latest")
@step
def run_inference(self):
"""Runs on a Kubernetes pod with 1 GPU."""
self.predictions = run_batch_inference(self.model_weights, self.test_data)
self.next(self.end)

@step
def end(self):
print("Done")

@conda for Environment Management

from metaflow import FlowSpec, step, conda, conda_base

@conda_base(python="3.11.0")
class IsolatedEnvironmentFlow(FlowSpec):

@conda(libraries={"scikit-learn": "1.3.0", "pandas": "2.1.0"})
@step
def preprocess(self):
"""Runs in an isolated conda environment with specific library versions."""
import sklearn # guaranteed version 1.3.0
# ...
self.next(self.train)

@conda(libraries={"xgboost": "2.0.0", "shap": "0.43.0"})
@step
def train(self):
"""Different conda environment for this step."""
import xgboost as xgb # guaranteed version 2.0.0
# ...
self.next(self.end)

@step
def end(self):
pass

Branching: The Foreach Pattern

Metaflow's foreach enables dynamic task parallelism - running the same step with different inputs in parallel. This is perfect for hyperparameter search, multi-fold cross-validation, or processing multiple data shards.

from metaflow import FlowSpec, step, Parameter
import numpy as np

class CrossValidationFlow(FlowSpec):

n_folds = Parameter("n_folds", default=5, type=int)
n_estimators = Parameter("n_estimators", default=100, type=int)

@step
def start(self):
import pandas as pd
self.df = pd.read_parquet("s3://data-lake/training.parquet")
# Create fold assignments
n = len(self.df)
fold_size = n // self.n_folds
self.folds = [
{"fold_id": i, "val_start": i * fold_size, "val_end": (i + 1) * fold_size}
for i in range(self.n_folds)
]
# foreach=folds creates N parallel branches
self.next(self.train_fold, foreach="folds")

@step
def train_fold(self):
"""Runs in parallel - one execution per fold."""
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score

fold_info = self.input # the current fold dict
fold_id = fold_info["fold_id"]
val_start = fold_info["val_start"]
val_end = fold_info["val_end"]

train_df = pd.concat([self.df.iloc[:val_start], self.df.iloc[val_end:]])
val_df = self.df.iloc[val_start:val_end]

feature_cols = [c for c in self.df.columns if c != "label"]
X_train, y_train = train_df[feature_cols], train_df["label"]
X_val, y_val = val_df[feature_cols], val_df["label"]

model = RandomForestClassifier(n_estimators=self.n_estimators, random_state=42)
model.fit(X_train, y_train)

val_preds = model.predict_proba(X_val)[:, 1]
auc = roc_auc_score(y_val, val_preds)

self.fold_id = fold_id
self.fold_auc = auc
self.fold_model = model
self.next(self.join_folds)

@step
def join_folds(self, inputs):
"""Aggregates results from all parallel branches."""
# inputs is a list of the results from each parallel train_fold execution
aucs = [inp.fold_auc for inp in inputs]
fold_ids = [inp.fold_id for inp in inputs]

self.cv_auc_scores = dict(zip(fold_ids, aucs))
self.mean_auc = float(np.mean(aucs))
self.std_auc = float(np.std(aucs))

# Pick the best model
best_fold = max(inputs, key=lambda inp: inp.fold_auc)
self.best_model = best_fold.fold_model
self.best_fold_id = best_fold.fold_id

print(f"Cross-validation AUC: {self.mean_auc:.4f} ± {self.std_auc:.4f}")
self.next(self.end)

@step
def end(self):
print(f"Best fold: {self.best_fold_id}, AUC: {self.cv_auc_scores[self.best_fold_id]:.4f}")

Run this:

# Local - runs all folds sequentially (for debugging)
python cv_flow.py run --n_folds 5

# AWS Batch - runs all folds in parallel on separate machines
python cv_flow.py run --with batch --n_folds 5 --n_estimators 200

Complete Example: Netflix-Style Feature Store Population

This is a realistic feature store population flow - the kind of pipeline that runs nightly at Netflix to populate the feature store with user activity features for the recommendation system.

# feature_store_population_flow.py
"""
Netflix-style feature store population flow.
Runs on AWS Batch nightly. Computes user activity features
for the past 7, 14, and 30 day windows, then joins and writes
to the feature store.
"""

from metaflow import (
FlowSpec, step, Parameter, batch, resources, retry,
timeout, catch, card, current
)
from metaflow.cards import Table, Markdown, VegaChart
import numpy as np


FEATURE_STORE_TABLE = "user_activity_features"
S3_DATA_LAKE = "s3://data-lake/user-events"
FEATURE_STORE_URI = "s3://feature-store/user-activity"


class FeatureStorePopulationFlow(FlowSpec):
"""
Populates the user activity feature store with rolling window features.
Runs on AWS Batch. Each window computes in parallel.
"""

run_date = Parameter(
"run_date",
help="Date to compute features for (YYYY-MM-DD)",
default="2024-03-01",
)
lookback_windows = Parameter(
"lookback_windows",
help="Comma-separated list of day windows",
default="7,14,30",
)
sample_fraction = Parameter(
"sample_fraction",
help="Fraction of users to process (1.0 = all)",
default=1.0,
type=float,
)

# ──────────────────────────────────────────────────────────
# Step 1: Initialize and load user list
# ──────────────────────────────────────────────────────────

@step
def start(self):
"""Parse parameters and load the active user list."""
import pandas as pd

self.windows = [int(w) for w in self.lookback_windows.split(",")]
print(f"Computing features for windows: {self.windows} days")
print(f"Run date: {self.run_date}")

# Load active user IDs from data lake
users_df = pd.read_parquet(
f"{S3_DATA_LAKE}/active_users/{self.run_date}.parquet"
)

if self.sample_fraction < 1.0:
users_df = users_df.sample(frac=self.sample_fraction, random_state=42)

self.user_ids = users_df["user_id"].tolist()
self.num_users = len(self.user_ids)
print(f"Processing {self.num_users:,} users")

# Branch into parallel window computation
self.next(self.compute_window_features, foreach="windows")

# ──────────────────────────────────────────────────────────
# Step 2: Compute features for each window (parallel)
# ──────────────────────────────────────────────────────────

@retry(times=2, minutes_between_retries=5)
@timeout(hours=2)
@resources(cpu=8, memory=32000)
@batch(
image="my-registry/feature-engineering:latest",
queue="ml-feature-engineering",
)
@step
def compute_window_features(self):
"""
Compute rolling window features for a specific day window.
Runs on AWS Batch - one job per window, in parallel.
"""
import pandas as pd
from datetime import datetime, timedelta

window_days = self.input # e.g., 7, 14, or 30
self.window_days = window_days

run_dt = datetime.strptime(self.run_date, "%Y-%m-%d")
start_dt = run_dt - timedelta(days=window_days)

print(f"Computing {window_days}-day features ({start_dt.date()} to {run_dt.date()})")

# Load event data for the window
events_dfs = []
current_dt = start_dt
while current_dt <= run_dt:
date_str = current_dt.strftime("%Y-%m-%d")
try:
day_df = pd.read_parquet(f"{S3_DATA_LAKE}/daily/{date_str}.parquet")
events_dfs.append(day_df)
except FileNotFoundError:
pass # Some dates may not have data (e.g., system downtime)
current_dt += timedelta(days=1)

events_df = pd.concat(events_dfs, ignore_index=True)
# Filter to active users only
events_df = events_df[events_df["user_id"].isin(self.user_ids)]

print(f" Events in window: {len(events_df):,}")

# Compute per-user aggregates
user_features = (
events_df.groupby("user_id").agg(
total_events=(f"event_type", "count"),
unique_content_count=("content_id", "nunique"),
total_watch_minutes=("watch_duration_s", lambda x: x.sum() / 60),
avg_session_length=("session_id", lambda x: events_df.loc[x.index].groupby("session_id").size().mean()),
play_events=("event_type", lambda x: (x == "play").sum()),
search_events=("event_type", lambda x: (x == "search").sum()),
skip_events=("event_type", lambda x: (x == "skip").sum()),
)
.reset_index()
)

# Add derived features
user_features["engagement_score"] = (
user_features["play_events"] * 2
+ user_features["total_watch_minutes"] * 0.1
- user_features["skip_events"] * 0.5
).clip(lower=0)

user_features["content_diversity"] = (
user_features["unique_content_count"] / user_features["total_events"].clip(lower=1)
)

# Rename columns to include window suffix
suffix = f"_{window_days}d"
feature_cols = [c for c in user_features.columns if c != "user_id"]
user_features = user_features.rename(
columns={c: f"{c}{suffix}" for c in feature_cols}
)

self.window_features = user_features
self.window_row_count = len(user_features)
self.window_event_count = len(events_df)

print(f" Computed features for {self.window_row_count:,} users")
self.next(self.join_windows)

# ──────────────────────────────────────────────────────────
# Step 3: Join all window results
# ──────────────────────────────────────────────────────────

@step
def join_windows(self, inputs):
"""Join features from all window computations."""
import pandas as pd

# Collect stats from each window
self.window_stats = {
inp.window_days: {
"rows": inp.window_row_count,
"events": inp.window_event_count,
}
for inp in inputs
}

# Join all window feature DataFrames on user_id
joined = inputs[0].window_features
for inp in inputs[1:]:
joined = joined.merge(
inp.window_features, on="user_id", how="outer"
)

# Fill NaN for users with no events in some windows
joined = joined.fillna(0)
joined["run_date"] = self.run_date

self.joined_features = joined
self.total_feature_cols = len([c for c in joined.columns if c != "user_id"])
print(f"Joined features: {len(joined):,} users, {self.total_feature_cols} feature columns")
self.next(self.validate_features)

# ──────────────────────────────────────────────────────────
# Step 4: Validate feature quality
# ──────────────────────────────────────────────────────────

@step
def validate_features(self):
"""Run quality checks on computed features before writing to store."""
import pandas as pd
import numpy as np

df = self.joined_features
issues = []

# Check for unexpected nulls
null_pct = df.isnull().mean()
high_null_cols = null_pct[null_pct > 0.05].to_dict()
if high_null_cols:
issues.append(f"High null columns: {high_null_cols}")

# Check for negative values where unexpected
engagement_cols = [c for c in df.columns if "engagement_score" in c]
for col in engagement_cols:
if (df[col] < 0).any():
issues.append(f"Negative engagement scores in {col}")

# Check user count against expected
expected_users = self.num_users
actual_users = len(df)
coverage_pct = actual_users / expected_users * 100
if coverage_pct < 95:
issues.append(f"Low user coverage: {coverage_pct:.1f}% (expected >95%)")

self.validation_issues = issues
self.coverage_pct = coverage_pct
self.passed_validation = len(issues) == 0

if issues:
print(f"Validation WARNINGS ({len(issues)} issues):")
for issue in issues:
print(f" - {issue}")
else:
print(f"Validation PASSED - coverage {coverage_pct:.1f}%")

self.next(self.write_to_feature_store)

# ──────────────────────────────────────────────────────────
# Step 5: Write to Feature Store
# ──────────────────────────────────────────────────────────

@retry(times=3, minutes_between_retries=2)
@timeout(minutes=30)
@resources(cpu=4, memory=16000)
@batch(queue="ml-feature-store-writes")
@step
def write_to_feature_store(self):
"""Write computed features to the feature store (S3 + DynamoDB index)."""
import pandas as pd
import boto3
import json

df = self.joined_features
output_path = f"{FEATURE_STORE_URI}/{self.run_date}/user_activity.parquet"

# Write to S3
df.to_parquet(output_path, index=False)
print(f"Written {len(df):,} rows to {output_path}")

# Update the feature store index (points to latest run)
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
table = dynamodb.Table("feature-store-index")
table.put_item(
Item={
"feature_group": FEATURE_STORE_TABLE,
"run_date": self.run_date,
"s3_path": output_path,
"num_users": len(df),
"num_features": self.total_feature_cols,
"coverage_pct": str(round(self.coverage_pct, 2)),
"run_id": current.run_id,
}
)

self.output_path = output_path
print(f"Feature store index updated for {FEATURE_STORE_TABLE}")
self.next(self.report)

# ──────────────────────────────────────────────────────────
# Step 6: Generate Card Report
# ──────────────────────────────────────────────────────────

@card(type="default")
@step
def report(self):
"""Generate a Metaflow Card documenting this run's results."""
from metaflow.cards import Table, Markdown

current.card.append(Markdown("# Feature Store Population Report"))
current.card.append(Markdown(f"**Run date**: {self.run_date}"))
current.card.append(Markdown(f"**Metaflow Run ID**: {current.run_id}"))
current.card.append(Markdown(f"**Output path**: `{self.output_path}`"))

# Summary table
current.card.append(Markdown("## Pipeline Summary"))
summary_rows = [
["Users processed", f"{len(self.joined_features):,}"],
["Feature columns", str(self.total_feature_cols)],
["User coverage", f"{self.coverage_pct:.1f}%"],
["Validation passed", "YES" if self.passed_validation else "NO"],
]
current.card.append(Table(summary_rows, headers=["Metric", "Value"]))

# Per-window stats
current.card.append(Markdown("## Window Statistics"))
window_rows = [
[f"{days}d", f"{stats['rows']:,}", f"{stats['events']:,}"]
for days, stats in sorted(self.window_stats.items())
]
current.card.append(Table(window_rows, headers=["Window", "Users", "Events"]))

if self.validation_issues:
current.card.append(Markdown("## Validation Warnings"))
for issue in self.validation_issues:
current.card.append(Markdown(f"- {issue}"))

self.next(self.end)

@step
def end(self):
"""Pipeline complete."""
print(f"\nFeature store population complete:")
print(f" Run date: {self.run_date}")
print(f" Users: {len(self.joined_features):,}")
print(f" Features: {self.total_feature_cols}")
print(f" Output: {self.output_path}")
print(f" Run ID: {current.run_id}")


if __name__ == "__main__":
FeatureStorePopulationFlow()

Running and Inspecting Flows

# Run locally (sequential, for debugging)
python feature_store_population_flow.py run \
--run_date 2024-03-01 \
--lookback_windows "7,14,30" \
--sample_fraction 0.01 # 1% sample for local testing

# Run on AWS Batch (full scale)
python feature_store_population_flow.py run \
--with batch \
--run_date 2024-03-01 \
--lookback_windows "7,14,30"

# View run logs
python feature_store_population_flow.py logs 3

# Inspect past run results from Python
from metaflow import Flow, Run

flow = Flow("FeatureStorePopulationFlow")

# Get the latest run
latest_run = flow.latest_run
print(f"Latest run: {latest_run.id}, status: {latest_run.successful}")

# Access artifacts from a completed run
run = Run("FeatureStorePopulationFlow/3")
print(f"Users processed: {run['end'].task.data.num_users}")
print(f"Feature columns: {run['end'].task.data.total_feature_cols}")
print(f"Validation issues: {run['end'].task.data.validation_issues}")

# Load the features DataFrame from a specific run
features_df = run["join_windows"].task.data.joined_features
print(features_df.head())

Metaflow Architecture Diagram


Namespaces for Team Isolation

Metaflow namespaces prevent experiments from different users from polluting each other's artifact history:

from metaflow import namespace

# Access runs from a specific user's namespace
namespace("user:[email protected]")
alice_flow = Flow("FeatureStorePopulationFlow")
alice_latest = alice_flow.latest_run

# Access runs from a production namespace
namespace("production")
prod_flow = Flow("FeatureStorePopulationFlow")
prod_latest = prod_flow.latest_run

# Reset to your personal namespace
namespace(None)

In production, CI/CD pipelines run flows in the production namespace. Researchers run in their personal namespace. This ensures that Flow.latest_run in production always refers to a production run, not a researcher's experimental run.


Common Mistakes

:::danger Modifying self.data After join In the join step, self.data contains the merged state from all branches. Modifying self.data in the join step only affects the current step and subsequent steps - it does not modify the per-branch artifacts. A common mistake is trying to loop back over individual branch results and modify them.

@step
def join_step(self, inputs):
# WRONG - iterating and modifying won't change branch artifacts
for inp in inputs:
inp.result = inp.result * 2 # Does nothing useful

# CORRECT - collect results into a new artifact on self
self.all_results = [inp.result for inp in inputs]
self.next(self.end)

:::

:::danger Using Non-Serializable Objects as Artifacts Metaflow serializes step artifacts using pickle. Objects that cannot be pickled - database connections, file handles, generator objects, Lambda functions - will cause the step to fail when Metaflow tries to persist them.

@step
def bad_step(self):
import sqlite3
self.connection = sqlite3.connect("db.sqlite") # Cannot pickle! Will fail.
self.next(self.next_step)

Close connections and resources before the step ends, and only persist serializable data. :::

:::warning Forgetting @step on Every Method Metaflow only treats methods with @step as pipeline steps. A method without @step is just a helper method - it will not appear in the flow graph and its results will not be tracked. If you write a method you expect to run as a step but forget @step, it will be silently ignored. :::

:::warning Using Large Artifacts Without Compression Metaflow serializes and stores every self.x assignment in S3. If you store a 5 GB DataFrame as a step artifact, Metaflow uploads 5 GB to S3 on step completion and downloads 5 GB on the next step start. For large datasets, store only the S3 path as an artifact and read the data freshly inside each step that needs it. :::


Interview Questions and Answers

Q1: What is the Metaflow flow-step model and how does it differ from DAG-based orchestrators?

In Metaflow, a workflow is defined as a Python class inheriting from FlowSpec. Each step is a method decorated with @step. The order of execution is defined by calling self.next() at the end of each step - this is a runtime call, not a compile-time declaration. The key difference from DAG-based orchestrators like Airflow is that the flow graph is determined by runtime execution, not by static import-time evaluation. In Airflow, the entire task graph must be known when the DAG file is imported. In Metaflow, you can call self.next() conditionally, making the actual execution path dependent on data values. However, Metaflow's foreach pattern still requires the list of items to be known before branching starts.

Q2: How does Metaflow handle artifact persistence between steps?

Metaflow automatically serializes any attribute assigned to self inside a step using Python's pickle protocol (or custom serializers for supported types like pandas DataFrames). The serialized artifact is stored in the artifact store - S3 in cloud deployments or the local filesystem for local runs. The artifact is keyed by a combination of the flow name, run ID, step name, and task ID. When the next step starts, Metaflow automatically fetches and deserializes the artifacts from the previous step, making them available as attributes. This works transparently across machine boundaries - if step A runs on Machine X and step B runs on Machine Y (via AWS Batch), Metaflow handles the S3 transfer with no user code required.

Q3: What is the purpose of foreach in Metaflow and how do you aggregate results?

foreach in Metaflow creates dynamic parallelism - instead of calling self.next(step_name), you call self.next(step_name, foreach="list_attribute"). Metaflow creates one parallel execution of step_name for each item in self.list_attribute. Inside the foreach step, self.input contains the current item. To aggregate results from all parallel branches, you define a join step that accepts inputs as a parameter. The inputs is a list of namespace objects - one per branch - and you can access each branch's artifacts via inp.attribute_name. The join step then assigns the aggregated result to self to make it available to subsequent steps.

Q4: How do you run a Metaflow flow on AWS Batch instead of locally?

There are three ways. First, using the --with batch command-line flag when running the flow: python flow.py run --with batch. This applies @batch to every step. Second, applying the @batch decorator selectively to specific steps in the code. This allows some steps to run locally and others on AWS Batch - useful when data ingestion should run locally but training should run on a high-memory Batch instance. Third, using --with batch:queue=my-queue,image=my-image:tag to customize Batch parameters from the command line. The @resources decorator sets the CPU and memory request for the Batch job. Prerequisites: the AWS Batch job queue and compute environment must be configured, and the Metaflow metadata service must be accessible from the Batch workers.

Q5: What are Metaflow Cards and why are they useful for ML teams?

Metaflow Cards are a documentation and reporting system built into the framework. Applying @card to a step causes Metaflow to generate an HTML report for that step's execution. Inside the step, you use current.card.append() to add elements - Markdown text, Tables, images, Vega charts. Cards are stored alongside run artifacts and viewable in the Metaflow GUI or via python flow.py card view <run_id>/<step_name>. For ML teams, Cards solve the documentation problem: every training run automatically generates a human-readable report showing the metrics, data statistics, and validation results from that run. Unlike external experiment tracking tools, Cards are part of the flow definition and require no separate integration.

Q6: How do namespaces work in Metaflow and why are they important for teams?

Metaflow namespaces are scopes that isolate run metadata between users and environments. By default, each user runs in their personal namespace (based on their username). When you call Flow("MyFlow").latest_run, you only see runs from your namespace. Production pipelines should run in a dedicated production namespace, set via --namespace production at run time. This prevents a researcher's experimental run from overwriting latest_run in the production namespace. The namespace is enforced at the metadata service level - runs from different namespaces have different visibility scopes. Teams typically configure CI/CD pipelines to run in production and individual researchers to run in personal namespaces, then use namespace("production") in serving code to always retrieve the latest production model.

© 2026 EngineersOfAI. All rights reserved.