Kubeflow Pipelines
The Retraining Problem at Scale
Your team has deployed a recommendation model to production. It performs well at launch. Three months later, the business notices a steady decline in click-through rate. The model has drifted - user behavior changed after a product redesign, but the model was trained on pre-redesign data. The fix is obvious: retrain the model. The challenge is making that process reliable, repeatable, and automatic.
Your current setup: a collection of Python scripts on a shared VM. 1_preprocess.py, 2_train.py, 3_evaluate.py, 4_push_model.py. You run them manually in order. This worked when retraining happened monthly. Now the business wants weekly retraining, triggered automatically when data drift is detected. The shared VM cannot handle concurrent runs - if someone manually triggers a run while the weekly job is running, the intermediate files get overwritten. There is no audit trail. When a run fails at step 3, nobody knows why because the logs are on a VM that sometimes restarts.
You need a system that runs containerized steps, tracks every input and output as versioned metadata, handles retries and failure gracefully, and scales to run multiple experiments in parallel without file conflicts. You need something that runs on Kubernetes because that is what your infrastructure team supports. This is precisely the problem Kubeflow Pipelines was designed to solve.
Kubeflow Pipelines (KFP) is an ML workflow orchestration platform built on Kubernetes. Every step runs in its own container. Every input, output, and parameter is tracked in a metadata store. Pipelines are compiled to a portable YAML representation that can be version-controlled, audited, and rerun. The platform handles container scheduling, parallel execution, artifact lineage, and experiment tracking - letting your team focus on the ML logic rather than the infrastructure plumbing.
This lesson covers Kubeflow Pipelines v2 with the KFP SDK v2. We build a complete retraining pipeline that triggers on data drift detection, covering lightweight Python components, containerized components, pipeline compilation, experiment management, and metadata tracking with ML Metadata (MLMD).
:::tip 🎮 Interactive Playground Visualize this concept: Try the ML Pipeline Orchestration demo on the EngineersOfAI Playground - no code required. :::
Why Kubeflow Pipelines Exists
Before Kubeflow, ML teams at large companies ran their pipelines on bare-metal scripts, Airflow, or proprietary internal systems. These approaches shared a common weakness: they did not track what artifact was produced by what code with what parameters. When a model performed poorly, the team could not reliably answer "which dataset was used to train this model?" or "what were the exact hyperparameters?"
The other problem was portability. A script that worked on a data scientist's MacBook with a specific Conda environment would fail on a production Linux server with different library versions. Making ML code reproducible required rigorous environment management that most teams did not have.
Kubeflow was started at Google in 2018 as a way to port Google's internal ML workflow patterns (similar to TFX) to the open-source Kubernetes ecosystem. Kubeflow Pipelines was one of the first components - a Kubernetes-native way to define, compile, and run ML workflows where each step runs in an isolated, reproducible container.
:::note The Container Isolation Insight The key insight behind KFP is that container isolation solves two problems simultaneously: reproducibility (the container has a fixed environment) and parallelism (containers can run concurrently without file conflicts). Every step gets its own filesystem, Python environment, and compute resources. :::
Historical Context
Kubeflow Pipelines v1 (2018–2022) used a Python DSL to define pipelines. Components were Docker images invoked via Kubernetes Jobs. The Argo Workflows engine handled the scheduling. The system worked but had significant friction - the Python DSL was complex, components were verbose to write, and the v1/v2 transition was painful for teams that adopted early.
KFP SDK v2 (2022+) introduced a cleaner authoring experience. The pipeline compiler generates an intermediate representation (IR) YAML that is backend-agnostic, meaning the same pipeline definition can run on standard Kubeflow Pipelines, Vertex AI Pipelines, or other KFP-compatible backends. This was a major improvement for teams working across cloud providers.
The metadata story also improved significantly in v2. ML Metadata (MLMD) is now deeply integrated - every artifact (dataset, model, metrics) is automatically tracked with lineage information, enabling root-cause analysis and audit trails without extra instrumentation.
Architecture Overview
KFP API Server: The control plane. Accepts pipeline runs, manages experiments, provides the dashboard UI.
Argo Workflows: The execution engine. KFP compiles pipelines to Argo Workflow specs and submits them. Argo handles pod scheduling, retry logic, and parallel execution.
ML Metadata (MLMD): A metadata store that tracks every artifact (dataset, model, metrics file) and the executions that produced or consumed them. This is what gives KFP its lineage tracking capabilities.
Artifact Store: Object storage where large artifacts (datasets, model files) are stored. KFP tracks the URI in MLMD and the file content in the artifact store.
Core Concepts
The Two Types of Components
KFP v2 offers two ways to define pipeline components:
Lightweight Python Components - for simple logic that can run in a standard base image. No Dockerfile required.
Containerized Components - for steps that need custom Docker images with specific dependencies (GPU libraries, proprietary packages, etc.).
Lightweight Python Components
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.1.0", "scikit-learn==1.3.0", "pyarrow"],
)
def detect_data_drift(
reference_data: Input[Dataset],
current_data: Input[Dataset],
drift_threshold: float,
drift_report: Output[Dataset],
is_drift_detected: dsl.OutputPath(bool),
):
"""
Detect data drift between reference and current dataset.
Uses Population Stability Index (PSI) on numerical features.
"""
import pandas as pd
import numpy as np
import json
ref_df = pd.read_parquet(reference_data.path)
cur_df = pd.read_parquet(current_data.path)
def compute_psi(expected: np.ndarray, actual: np.ndarray, n_bins: int = 10) -> float:
"""Compute Population Stability Index."""
breakpoints = np.percentile(expected, np.linspace(0, 100, n_bins + 1))
breakpoints[0] = -np.inf
breakpoints[-1] = np.inf
expected_pct = np.histogram(expected, bins=breakpoints)[0] / len(expected)
actual_pct = np.histogram(actual, bins=breakpoints)[0] / len(actual)
# Avoid division by zero
expected_pct = np.where(expected_pct == 0, 0.0001, expected_pct)
actual_pct = np.where(actual_pct == 0, 0.0001, actual_pct)
psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
return float(psi)
numeric_cols = ref_df.select_dtypes(include=[np.number]).columns.tolist()
psi_scores = {}
for col in numeric_cols:
psi = compute_psi(ref_df[col].dropna().values, cur_df[col].dropna().values)
psi_scores[col] = round(psi, 4)
max_psi = max(psi_scores.values())
drift_detected = max_psi > drift_threshold
report = {
"psi_scores": psi_scores,
"max_psi": max_psi,
"threshold": drift_threshold,
"drift_detected": drift_detected,
}
# Write drift report as an artifact
import json
with open(drift_report.path, "w") as f:
json.dump(report, f, indent=2)
# Write scalar output
with open(is_drift_detected, "w") as f:
f.write(str(drift_detected).lower())
Notice several important patterns:
Input[Dataset]andOutput[Dataset]are KFP-typed artifacts - they carry metadata (URI, name, type) that MLMD tracksdsl.OutputPath(bool)writes a scalar value to a file path - KFP reads it and passes it to downstream components- All imports happen inside the function body because the function body is serialized and executed in a container
Containerized Components
For steps with heavy dependencies, use a pre-built Docker image:
# Dockerfile for the training component
# FROM nvidia/cuda:12.1-runtime-ubuntu22.04
# RUN pip install torch==2.1.0 transformers==4.35.0 datasets==2.14.0
@dsl.component(
base_image="my-registry.io/ml-training:cuda12-torch2.1",
)
def fine_tune_model(
train_dataset: Input[Dataset],
val_dataset: Input[Dataset],
base_model_name: str,
num_epochs: int,
learning_rate: float,
trained_model: Output[Model],
training_metrics: Output[Metrics],
):
"""Fine-tune a pre-trained model on the provided dataset."""
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from datasets import load_from_disk
train_data = load_from_disk(train_dataset.path)
val_data = load_from_disk(val_dataset.path)
tokenizer = AutoTokenizer.from_pretrained(base_model_name)
model = AutoModelForSequenceClassification.from_pretrained(base_model_name, num_labels=2)
def tokenize(batch):
return tokenizer(batch["text"], truncation=True, padding="max_length", max_length=512)
train_data = train_data.map(tokenize, batched=True)
val_data = val_data.map(tokenize, batched=True)
training_args = TrainingArguments(
output_dir=trained_model.path,
num_train_epochs=num_epochs,
learning_rate=learning_rate,
evaluation_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
metric_for_best_model="eval_loss",
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_data,
eval_dataset=val_data,
)
trainer.train()
eval_results = trainer.evaluate()
# Log metrics to MLMD via the Metrics artifact
training_metrics.log_metric("eval_loss", eval_results["eval_loss"])
training_metrics.log_metric("eval_accuracy", eval_results.get("eval_accuracy", 0.0))
training_metrics.log_metric("num_epochs", num_epochs)
training_metrics.log_metric("learning_rate", learning_rate)
# The model is saved to trained_model.path by TrainingArguments output_dir
Complete Retraining Pipeline
Now we assemble all components into a pipeline with drift detection logic:
# pipeline.py
from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics
from kfp import kubernetes # for Kubernetes-specific settings
# ──────────────────────────────────────────────────────────────
# Component: Load Reference Data
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "pyarrow", "boto3"],
)
def load_dataset(
gcs_uri: str,
split: str,
output_dataset: Output[Dataset],
):
"""Load a dataset from GCS and expose it as a KFP artifact."""
import subprocess
output_dataset.metadata["source_uri"] = gcs_uri
output_dataset.metadata["split"] = split
# In practice: gsutil cp or boto3 download to output_dataset.path
subprocess.run(["gsutil", "-m", "cp", "-r", gcs_uri, output_dataset.path], check=True)
# ──────────────────────────────────────────────────────────────
# Component: Preprocess and Feature Engineering
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.1.0", "scikit-learn==1.3.0", "pyarrow"],
)
def preprocess_data(
raw_dataset: Input[Dataset],
processed_dataset: Output[Dataset],
scaler_artifact: Output[Model],
):
import pandas as pd
import pickle
from sklearn.preprocessing import StandardScaler
df = pd.read_parquet(raw_dataset.path)
# Drop nulls, encode categoricals
df = df.dropna(subset=["label"])
cat_cols = df.select_dtypes(include=["object"]).columns
df = pd.get_dummies(df, columns=cat_cols)
feature_cols = [c for c in df.columns if c != "label"]
scaler = StandardScaler()
df[feature_cols] = scaler.fit_transform(df[feature_cols])
df.to_parquet(processed_dataset.path, index=False)
processed_dataset.metadata["num_rows"] = len(df)
processed_dataset.metadata["num_features"] = len(feature_cols)
with open(scaler_artifact.path, "wb") as f:
pickle.dump(scaler, f)
scaler_artifact.metadata["feature_columns"] = feature_cols
# ──────────────────────────────────────────────────────────────
# Component: Train Model
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn", "pyarrow"],
)
def train_classifier(
train_dataset: Input[Dataset],
n_estimators: int,
max_depth: int,
trained_model: Output[Model],
training_metrics: Output[Metrics],
):
import pandas as pd
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
df = pd.read_parquet(train_dataset.path)
feature_cols = [c for c in df.columns if c != "label"]
X, y = df[feature_cols], df["label"]
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
n_jobs=-1,
random_state=42,
)
model.fit(X, y)
train_preds = model.predict_proba(X)[:, 1]
train_auc = roc_auc_score(y, train_preds)
with open(trained_model.path, "wb") as f:
pickle.dump(model, f)
trained_model.metadata["n_estimators"] = n_estimators
trained_model.metadata["max_depth"] = max_depth
trained_model.metadata["train_auc"] = round(train_auc, 4)
training_metrics.log_metric("train_auc", train_auc)
training_metrics.log_metric("n_estimators", n_estimators)
training_metrics.log_metric("max_depth", max_depth)
# ──────────────────────────────────────────────────────────────
# Component: Evaluate Model
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "scikit-learn", "pyarrow"],
)
def evaluate_model(
model_artifact: Input[Model],
val_dataset: Input[Dataset],
eval_metrics: Output[Metrics],
is_model_good: dsl.OutputPath(bool),
auc_threshold: float = 0.80,
):
import pandas as pd
import pickle
from sklearn.metrics import roc_auc_score, average_precision_score
df = pd.read_parquet(val_dataset.path)
feature_cols = [c for c in df.columns if c != "label"]
X, y = df[feature_cols], df["label"]
with open(model_artifact.path, "rb") as f:
model = pickle.load(f)
preds = model.predict_proba(X)[:, 1]
auc = roc_auc_score(y, preds)
ap = average_precision_score(y, preds)
eval_metrics.log_metric("val_auc", auc)
eval_metrics.log_metric("val_avg_precision", ap)
eval_metrics.log_metric("threshold", auc_threshold)
model_good = auc >= auc_threshold
with open(is_model_good, "w") as f:
f.write(str(model_good).lower())
# ──────────────────────────────────────────────────────────────
# Component: Register Model
# ──────────────────────────────────────────────────────────────
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["mlflow"],
)
def register_model(
model_artifact: Input[Model],
eval_metrics: Input[Metrics],
model_name: str,
model_version: dsl.OutputPath(str),
):
import mlflow
import pickle
mlflow.set_tracking_uri("http://mlflow-server:5000")
with mlflow.start_run(run_name=f"retrain-{model_name}"):
with open(model_artifact.path, "rb") as f:
model = pickle.load(f)
# Log metrics from KFP Metrics artifact
for key, value in eval_metrics.metadata.items():
if isinstance(value, (int, float)):
mlflow.log_metric(key, value)
model_info = mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name=model_name,
)
with open(model_version, "w") as f:
f.write(model_info.model_uri)
# ──────────────────────────────────────────────────────────────
# Pipeline Definition
# ──────────────────────────────────────────────────────────────
@dsl.pipeline(
name="fraud-detector-retraining",
description="Automated retraining pipeline triggered by data drift detection",
)
def retraining_pipeline(
reference_data_uri: str = "gs://ml-data/fraud/reference/",
current_data_uri: str = "gs://ml-data/fraud/current/",
drift_threshold: float = 0.2,
n_estimators: int = 200,
max_depth: int = 10,
auc_threshold: float = 0.82,
model_name: str = "fraud-detector-v2",
):
# Load reference and current datasets in parallel
ref_data_task = load_dataset(
gcs_uri=reference_data_uri,
split="reference",
)
cur_data_task = load_dataset(
gcs_uri=current_data_uri,
split="current",
)
# Detect drift before spending resources on retraining
drift_task = detect_data_drift(
reference_data=ref_data_task.outputs["output_dataset"],
current_data=cur_data_task.outputs["output_dataset"],
drift_threshold=drift_threshold,
)
# Conditional: only retrain if drift is detected
with dsl.Condition(
drift_task.outputs["is_drift_detected"] == "true",
name="drift-detected",
):
# Preprocess using current data
preprocess_task = preprocess_data(
raw_dataset=cur_data_task.outputs["output_dataset"],
)
# Train model
train_task = train_classifier(
train_dataset=preprocess_task.outputs["processed_dataset"],
n_estimators=n_estimators,
max_depth=max_depth,
)
# Evaluate model quality
eval_task = evaluate_model(
model_artifact=train_task.outputs["trained_model"],
val_dataset=preprocess_task.outputs["processed_dataset"],
auc_threshold=auc_threshold,
)
# Conditional: only register if model quality is acceptable
with dsl.Condition(
eval_task.outputs["is_model_good"] == "true",
name="model-passes-threshold",
):
register_model(
model_artifact=train_task.outputs["trained_model"],
eval_metrics=eval_task.outputs["eval_metrics"],
model_name=model_name,
)
# Set Kubernetes resource requests on the training task
kubernetes.set_resource_request(
task=train_task,
cpu="4",
memory="8G",
)
kubernetes.set_resource_limit(
task=train_task,
cpu="8",
memory="16G",
)
Compiling and Running the Pipeline
# compile_and_run.py
from kfp import compiler
from kfp.client import Client
from retraining_pipeline import retraining_pipeline
# Compile to IR YAML - version control this file
compiler.Compiler().compile(
pipeline_func=retraining_pipeline,
package_path="retraining_pipeline.yaml",
)
# Connect to KFP API server
client = Client(host="https://kubeflow.my-company.com")
# Create or get experiment
experiment = client.create_experiment(name="fraud-detector-retraining")
# Submit a run with custom parameters
run = client.create_run_from_pipeline_func(
pipeline_func=retraining_pipeline,
experiment_name="fraud-detector-retraining",
arguments={
"reference_data_uri": "gs://ml-data/fraud/reference/2024-01/",
"current_data_uri": "gs://ml-data/fraud/current/2024-03/",
"drift_threshold": 0.15,
"auc_threshold": 0.84,
},
enable_caching=True, # Skip cached steps on re-runs
)
print(f"Run ID: {run.run_id}")
print(f"Run URL: {client.get_run(run.run_id).run.pipeline_spec}")
# Wait for completion and get results
run.wait_for_run_completion(timeout=3600)
run_response = client.get_run(run.run_id)
print(f"Run status: {run_response.run.status}")
Parameter Sweeps (Hyperparameter Search)
KFP's dsl.ParallelFor enables efficient hyperparameter search by running multiple training jobs in parallel:
@dsl.pipeline(name="hyperparam-search")
def hyperparameter_search_pipeline(
train_data_uri: str,
val_data_uri: str,
):
load_train = load_dataset(gcs_uri=train_data_uri, split="train")
load_val = load_dataset(gcs_uri=val_data_uri, split="val")
# Define the search grid
hyperparams_grid = [
{"n_estimators": 100, "max_depth": 5},
{"n_estimators": 200, "max_depth": 10},
{"n_estimators": 300, "max_depth": 15},
{"n_estimators": 100, "max_depth": 10},
]
with dsl.ParallelFor(
items=hyperparams_grid,
parallelism=2, # Run at most 2 at a time
) as params:
preprocess_task = preprocess_data(
raw_dataset=load_train.outputs["output_dataset"],
)
train_task = train_classifier(
train_dataset=preprocess_task.outputs["processed_dataset"],
n_estimators=params.n_estimators,
max_depth=params.max_depth,
)
eval_task = evaluate_model(
model_artifact=train_task.outputs["trained_model"],
val_dataset=load_val.outputs["output_dataset"],
)
# After all parallel runs: collect best model
# (requires a custom aggregation component)
MLMD Metadata Tracking
One of KFP's most powerful features is automatic lineage tracking via ML Metadata. Every artifact you pass between components is recorded in MLMD with its type, URI, and metadata.
# Querying MLMD programmatically
from kfp.client import Client
client = Client(host="https://kubeflow.my-company.com")
# Get all artifacts from a run
run_id = "run-abc123"
artifacts = client.list_artifacts(run_id=run_id)
for artifact in artifacts:
print(f"Name: {artifact.name}")
print(f"Type: {artifact.type}")
print(f"URI: {artifact.uri}")
print(f"Metadata: {artifact.metadata}")
print("---")
# Get lineage for a specific model artifact
# (shows what datasets and executions produced it)
model_artifacts = [a for a in artifacts if a.type == "system.Model"]
for model in model_artifacts:
print(f"Model {model.name} produced by:")
# In the UI, you can visualize this as a full lineage graph
Pipeline Caching
KFP caches component outputs based on the component's function body and its input values. On a re-run, if the code and inputs have not changed, KFP returns the cached output without re-executing the component:
# Enable caching for a run
run = client.create_run_from_pipeline_func(
pipeline_func=retraining_pipeline,
arguments={"reference_data_uri": "gs://...", "drift_threshold": 0.2},
enable_caching=True,
)
# Disable caching for a specific component (force re-execution)
@dsl.component(base_image="python:3.11-slim")
def non_cacheable_component(output: Output[Dataset]):
# Decorated with caching disabled
pass
# In the pipeline definition:
task = non_cacheable_component()
task.set_caching_options(enable_caching=False)
:::tip Caching and Data Freshness KFP caching is based on component code + input values. If your input is a GCS URI and the underlying file changes but the URI stays the same, KFP will use the cached output. For data ingestion components that should always re-fetch, disable caching explicitly or include a timestamp in the component inputs. :::
Pipeline Flow Diagram
KFP SDK v2 vs v1 Differences
| Feature | KFP SDK v1 | KFP SDK v2 |
|---|---|---|
| Component definition | func_to_container_op | @dsl.component decorator |
| Artifacts | Strings / custom types | Input[Dataset], Output[Model], etc. |
| Pipeline IR | Argo YAML directly | Backend-agnostic IR YAML |
| Scalar outputs | OutputPath(str) | dsl.OutputPath(str) (same, cleaner) |
| Conditional | dsl.Condition | Same, but cleaner DSL |
| Parallel for | dsl.ParallelFor | Same, with parallelism limit |
| Metadata tracking | Manual | Automatic via typed artifacts |
| Cloud portability | KFP only | KFP + Vertex AI Pipelines |
Production Engineering Notes
Structuring Components for Reusability
Build components as reusable units that can appear in multiple pipelines:
# components/data_loading.py - reusable component library
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas", "pyarrow", "google-cloud-storage"],
)
def load_gcs_parquet(
bucket: str,
prefix: str,
start_date: str,
end_date: str,
output_dataset: Output[Dataset],
):
"""Generic GCS parquet loader - reusable across pipelines."""
# ...
# Import and use in multiple pipeline files
from components.data_loading import load_gcs_parquet
Scheduling Pipelines Externally
KFP has built-in recurring runs, but for event-driven triggers (e.g., "retrain when a new data file arrives"), use an external trigger:
# trigger_retraining.py - called by Cloud Pub/Sub or a monitoring service
from kfp.client import Client
def trigger_retraining(event_data: dict):
"""Triggered when data drift monitoring detects a significant shift."""
client = Client(host="https://kubeflow.my-company.com")
run = client.create_run_from_pipeline_package(
pipeline_file="retraining_pipeline.yaml",
arguments={
"current_data_uri": event_data["data_uri"],
"drift_threshold": 0.15,
},
run_name=f"triggered-retrain-{event_data['timestamp']}",
experiment_name="automated-retraining",
)
return run.run_id
Common Mistakes
:::danger Importing at Module Level Inside Component Functions
All imports inside @dsl.component functions must be inside the function body. The function body is serialized to a string and executed in a container - if you import at the top of the file, those imports are not available in the container.
# WRONG - import at module level
import pandas as pd
@dsl.component(packages_to_install=["pandas"])
def my_component(output: Output[Dataset]):
df = pd.DataFrame(...) # NameError: 'pd' not defined in container
# CORRECT - import inside function body
@dsl.component(packages_to_install=["pandas"])
def my_component(output: Output[Dataset]):
import pandas as pd
df = pd.DataFrame(...)
:::
:::danger Returning Python Objects from Components Components cannot return Python objects - they can only write to Output artifacts or OutputPath files. KFP passes data between components as files stored in the artifact store, not as in-memory Python objects.
# WRONG
@dsl.component
def train_model() -> object: # Cannot return arbitrary objects
return sklearn_model # This will fail
# CORRECT - serialize to Output[Model]
@dsl.component
def train_model(trained_model: Output[Model]):
import pickle
with open(trained_model.path, "wb") as f:
pickle.dump(sklearn_model, f)
:::
:::warning Forgetting to Set Resource Requests on Heavy Components By default, KFP components run with whatever resources Kubernetes assigns to unspecified pods - often very small. For training components, always set explicit resource requests and limits. Without them, your training job will be scheduled on nodes with insufficient memory and be OOM-killed. :::
:::warning Using KFP v1 Syntax with KFP SDK v2
The KFP SDK v2 is not backward-compatible with v1 component definitions. Do not mix func_to_container_op (v1) with @dsl.component (v2) in the same pipeline. Always check your kfp package version with pip show kfp and ensure all components use the same SDK version.
:::
Interview Questions and Answers
Q1: What is the role of Argo Workflows in Kubeflow Pipelines?
Kubeflow Pipelines uses Argo Workflows as its execution backend. When you compile a KFP pipeline and submit it to the KFP API server, the server translates the pipeline IR YAML into an Argo Workflow resource and submits it to Kubernetes. Argo is responsible for scheduling the individual steps as Kubernetes pods, managing parallel execution, handling retries, and tracking which pods have completed. KFP adds the ML-specific layer on top: the UI for visualizing experiments, the MLMD integration for artifact lineage, and the component SDK for authoring. You generally do not interact with Argo directly - you work through the KFP SDK and UI - but understanding that Argo handles actual pod scheduling helps debug issues like pod scheduling failures or resource quota errors.
Q2: How does ML Metadata (MLMD) provide lineage tracking in KFP?
MLMD is a metadata store that records three types of entities: Artifacts (datasets, models, metrics), Executions (component runs), and Events (which execution consumed or produced which artifact). Every time a KFP component runs, it records an Execution in MLMD. Every Input[Dataset] read and Output[Model] written creates an Event linking the execution to the artifact. This creates a complete lineage graph: you can query "which training run produced this model?" and trace back through the preprocessing and ingestion steps to the original data source. This lineage is queryable via the MLMD Python API and visualized in the KFP UI's lineage explorer.
Q3: What is the KFP pipeline IR and why does it matter?
IR stands for Intermediate Representation. In KFP v2, compiling a pipeline produces a YAML file in an intermediate format that is backend-agnostic. The same IR YAML can be executed on standard Kubeflow Pipelines, Google Vertex AI Pipelines, or any other KFP-compatible backend. This matters because it allows teams to develop locally against a Kubeflow cluster and run in production on Vertex AI without changing the pipeline code. The IR contains the complete pipeline specification: component definitions, their container images, their input/output schemas, and the data flow between them.
Q4: How does KFP caching work, and when should you disable it?
KFP caching computes a fingerprint for each component execution based on the component's code (serialized function body or container image digest) and its input values. When a run is submitted with caching enabled, KFP checks whether a completed execution exists with the same fingerprint. If yes, it reuses the outputs without running the component again. This dramatically speeds up iterative development - you can change the training component without re-running data ingestion. Disable caching when: the component has external side effects that should always execute (e.g., fetching live data from an API), when the inputs are the same but the underlying data source has changed (e.g., same GCS URI but new file), or when you are debugging a specific component and need a fresh execution.
Q5: How do you trigger a KFP pipeline run when new data arrives?
KFP has built-in recurring runs for time-based scheduling. For event-driven triggers, you build an external trigger using the KFP Python client. A common pattern: a Cloud Function or Lambda is triggered by a storage event (e.g., "new file written to S3 bucket"). The function uses kfp.Client to call create_run_from_pipeline_package(), passing the new data URI as a parameter. The KFP client authenticates via a service account token and submits the run to the KFP API server. This decouples the trigger mechanism from the pipeline definition - you can change from time-based to event-based triggering without modifying the pipeline code.
Q6: What is the difference between a lightweight Python component and a containerized component? When do you use each?
A lightweight Python component is defined with @dsl.component and uses packages_to_install to install dependencies at runtime in a base image. KFP serializes the function body and packages it with a generated Dockerfile at pipeline compilation time. This is fast to iterate - you change the function and recompile. Use lightweight components for steps with standard PyPI dependencies and no complex build requirements.
A containerized component uses a pre-built Docker image you maintain. You point KFP to the image via base_image, but the image must already contain all dependencies. Use containerized components when you need: GPU libraries (CUDA, cuDNN), proprietary software that cannot be pip-installed, complex multi-stage build requirements, or maximum startup speed (pre-installed dependencies avoid runtime pip install time). The tradeoff is that you must maintain a Docker build pipeline for every component image.
