Skip to main content

Scikit-Learn Pipelines

It is 2019. A data scientist at Spotify is three weeks into debugging a mysterious regression in their music recommendation model. The offline AUC is 0.92. The online A/B test shows no improvement over the baseline - the model might even be slightly worse. The data scientist checks feature drift, data freshness, serving infrastructure, label encoding. Everything looks correct.

On day twenty-two, a senior engineer finds the bug. In the original exploration notebook, someone ran scaler.fit(X) on the full dataset before the train/test split - a completely natural thing to do when you are just exploring. That notebook cell got copy-pasted into the training script months earlier. The StandardScaler was fit on the entire dataset, including the test rows. The test set's mean and standard deviation leaked into the scaler's parameters. Every CV evaluation used contaminated scaling statistics. The model was trained on "easy" data where the feature distributions were already adjusted by the test set's statistics. In production, with real unseen data, the scaler's parameters were slightly wrong.

Three weeks of investigation. The root cause: one line of code in the wrong order.

The Spotify team rewrote the training script using scikit-learn Pipelines. Not because Pipelines are faster or make the model more accurate - but because a Pipeline makes this entire class of bug structurally impossible. When preprocessing is inside the Pipeline, fit() can only be called on training data. The API enforces correctness.

This is the real value of the Pipeline abstraction. It is not syntactic sugar. It is a contract: every statistic computed during preprocessing - means, standard deviations, imputation values, category encodings - comes exclusively from training data. Inference sees the same transformations applied with those training-set statistics. The gap between offline evaluation and production behavior, the notorious "training-serving skew," is eliminated by construction.

Why Training-Serving Skew Is So Hard to Debug

Training-serving skew happens when the data processing code in your training pipeline differs - even slightly - from the processing code in your production serving path. The model trains on transformed data that looks slightly different from what it sees in production.

Common causes:

  • Scaler fit on full dataset instead of only training data
  • Imputation strategy computed on production data at inference time (e.g., median of whatever data is currently in the database)
  • OneHotEncoder fit on training data but applied to production data that has a new category
  • Feature engineering applied differently in a Jupyter notebook versus a Flask API
  • Normalization computed in SQL for training but in Python for inference

A Pipeline does not solve all of these, but it solves the most common ones: ensuring preprocessing statistics come from training data and are serialized alongside the model so the same code runs at training and inference.

The Core Pipeline Pattern

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
import numpy as np

# Without a pipeline (WRONG - scaling leak risk):
# scaler.fit(X) # uses ALL data
# X_scaled = scaler.transform(X)
# X_train, X_test = train_test_split(X_scaled) # test was used to fit scaler

# With a pipeline (CORRECT):
pipe = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
('clf', LogisticRegression(max_iter=1000)),
])

# pipe.fit(X_train, y_train):
# 1. imputer.fit(X_train) → imputer.transform(X_train) → X_imputed
# 2. scaler.fit(X_imputed) → scaler.transform(X_imputed) → X_scaled
# 3. clf.fit(X_scaled, y_train)

# pipe.predict(X_test):
# 1. imputer.transform(X_test) ← uses training-set medians
# 2. scaler.transform(X_test) ← uses training-set mean/std
# 3. clf.predict(...)

from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

pipe.fit(X_train, y_train)
print(f"Test AUC: {pipe.score(X_test, y_test):.4f}")

# Access an intermediate step
scaler = pipe.named_steps['scaler']
print(f"Feature means (from training data): {scaler.mean_[:5]}")

Pipeline Internals: What Actually Happens

Understanding the mechanics helps you debug problems and extend the system correctly.

fit() and transform() chaining

When you call pipe.fit(X_train, y_train), sklearn iterates over all steps except the last. For each intermediate step, it calls fit_transform(X, y) - which is equivalent to calling fit(X, y) followed by transform(X), but more efficient when the transformer implements fit_transform directly (e.g., PCA uses a single SVD decomposition for both). The transformed output becomes the input to the next step.

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import Ridge

pipe = Pipeline([
('scaler', StandardScaler()),
('pca', PCA(n_components=5)),
('model', Ridge()),
])

# What pipe.fit(X_train, y_train) does internally:
# Step 1: scaler.fit_transform(X_train) → X_scaled (1000, 20)
# Step 2: pca.fit_transform(X_scaled) → X_reduced (1000, 5)
# Step 3: model.fit(X_reduced, y_train) → fitted model

# What pipe.predict(X_test) does internally:
# Step 1: scaler.transform(X_test) → X_scaled (200, 20)
# Step 2: pca.transform(X_scaled) → X_reduced (200, 5)
# Step 3: model.predict(X_reduced) → predictions

# Key insight: test data ONLY goes through transform(), never fit()

fit_transform() vs fit() + transform()

fit_transform() is not just a convenience method. For some transformers, it is computationally cheaper than calling fit() and transform() separately. For example, PCA.fit_transform() runs a single SVD and extracts both the components and the projection in one pass. PCA.fit() followed by PCA.transform() runs SVD twice. The Pipeline automatically calls fit_transform() when it is available.

import time
from sklearn.decomposition import PCA
import numpy as np

X = np.random.randn(10000, 500)

pca = PCA(n_components=50)

# fit + transform separately
t0 = time.time()
pca.fit(X)
X_t = pca.transform(X)
t1 = time.time()
print(f"fit + transform: {t1-t0:.3f}s")

pca2 = PCA(n_components=50)

# fit_transform (single pass)
t2 = time.time()
X_t2 = pca2.fit_transform(X)
t3 = time.time()
print(f"fit_transform: {t3-t2:.3f}s")
# fit_transform is ~2x faster for PCA

Accessing pipeline steps with getitem

Pipelines support indexing and slicing to extract sub-pipelines or individual steps.

pipe = Pipeline([
('imputer', SimpleImputer()),
('scaler', StandardScaler()),
('pca', PCA(n_components=10)),
('clf', LogisticRegression()),
])

# Named step access
scaler = pipe.named_steps['scaler'] # by name
scaler = pipe['scaler'] # dict-style (same as named_steps)
imputer = pipe[0] # by integer index

# Slicing creates a sub-pipeline (useful for inspecting intermediate transforms)
preprocessing_only = pipe[:-1] # all steps except the classifier
# preprocessing_only is a Pipeline with imputer, scaler, pca

# Transform data through only the preprocessing steps
X_preprocessed = preprocessing_only.fit_transform(X_train)
print(X_preprocessed.shape) # (800, 10)

ColumnTransformer: Different Preprocessing per Column Type

Real datasets have mixed types: numeric columns need scaling and imputation, categorical columns need encoding, text needs vectorization. ColumnTransformer applies different transformers to different column subsets.

import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split

# Example: customer churn dataset
df = pd.DataFrame({
'age': [25, np.nan, 34, 28, 52, 41],
'spend_30d': [120.5, 0.0, 450.2, 88.3, 310.0, np.nan],
'plan': ['free', 'paid', 'paid', 'free', 'paid', 'free'],
'country': ['US', 'UK', 'US', 'DE', 'US', 'UK'],
'churn': [0, 0, 1, 0, 1, 0],
})

NUMERIC_COLS = ['age', 'spend_30d']
CATEGORICAL_COLS = ['plan', 'country']
TARGET = 'churn'

X = df[NUMERIC_COLS + CATEGORICAL_COLS]
y = df[TARGET]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Numeric pipeline
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
])

# Categorical pipeline
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
])

# Combine with ColumnTransformer
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, NUMERIC_COLS),
('cat', categorical_transformer, CATEGORICAL_COLS),
], remainder='drop') # drop any unlisted columns

# Full pipeline: preprocessing + model
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', GradientBoostingClassifier(n_estimators=100, random_state=42)),
])

full_pipeline.fit(X_train, y_train)
print(f"Test accuracy: {full_pipeline.score(X_test, y_test):.4f}")

remainder='passthrough' vs 'drop'

The remainder parameter controls what happens to columns not listed in any transformer.

# remainder='drop' (default): unlisted columns are discarded
preprocessor_drop = ColumnTransformer(transformers=[
('num', StandardScaler(), ['age', 'spend_30d']),
], remainder='drop')
# Only 'age' and 'spend_30d' survive; 'plan', 'country' are lost

# remainder='passthrough': unlisted columns pass through unchanged
preprocessor_pass = ColumnTransformer(transformers=[
('num', StandardScaler(), ['age', 'spend_30d']),
], remainder='passthrough')
# 'age' and 'spend_30d' are scaled; 'plan' and 'country' are concatenated as-is
# Useful when some columns need no transformation

# remainder can also be a transformer
preprocessor_encode = ColumnTransformer(transformers=[
('num', StandardScaler(), ['age', 'spend_30d']),
], remainder=OrdinalEncoder())
# Numeric columns are scaled; all other columns get ordinal-encoded

Getting feature names after transformation

After ColumnTransformer, the column names are lost (the output is a numpy array). Recovering them is important for feature importance analysis and debugging.

# After fitting the full pipeline, recover feature names from ColumnTransformer
ct = full_pipeline.named_steps['preprocessor']

# sklearn 1.0+ unified method - works across all transformers
feature_names = ct.get_feature_names_out()
print(feature_names)
# ['num__age', 'num__spend_30d', 'cat__plan_free', 'cat__plan_paid',
# 'cat__country_DE', 'cat__country_UK', 'cat__country_US']

# The prefix is the transformer name given in the ColumnTransformer definition
# Strip the prefix for cleaner names
clean_names = [name.split('__', 1)[1] for name in feature_names]

# Use with feature importances from the model
model = full_pipeline.named_steps['classifier']
importances = model.feature_importances_
for name, imp in sorted(zip(clean_names, importances), key=lambda x: -x[1]):
print(f"{name:20s}: {imp:.4f}")

Pipeline Flow: From Raw Features to Predictions

Custom Transformers

When built-in transformers don't cover your use case, write a custom one by subclassing BaseEstimator and TransformerMixin.

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin

class LogTransformer(BaseEstimator, TransformerMixin):
"""Apply log1p to specified columns."""

def __init__(self, columns=None):
self.columns = columns # list of column indices or None for all

def fit(self, X, y=None):
# No fitting required - stateless transformer
return self

def transform(self, X):
X = X.copy()
if self.columns is None:
return np.log1p(X)
X[:, self.columns] = np.log1p(X[:, self.columns])
return X


class OutlierClipper(BaseEstimator, TransformerMixin):
"""Clip values beyond n standard deviations (fit on training data only)."""

def __init__(self, n_std=3.0):
self.n_std = n_std

def fit(self, X, y=None):
self.mean_ = X.mean(axis=0)
self.std_ = X.std(axis=0)
self.lower_ = self.mean_ - self.n_std * self.std_
self.upper_ = self.mean_ + self.n_std * self.std_
return self

def transform(self, X):
return np.clip(X, self.lower_, self.upper_)


# Use in a pipeline
from sklearn.pipeline import Pipeline
from sklearn.linear_model import Ridge

X_train = np.random.randn(100, 5) * 10
y_train = X_train[:, 0] * 2 + np.random.randn(100)

pipe = Pipeline([
('clipper', OutlierClipper(n_std=3)),
('log', LogTransformer()),
('model', Ridge(alpha=1.0)),
])

pipe.fit(X_train, y_train)

Why inherit from both BaseEstimator and TransformerMixin?

BaseEstimator gives you get_params() and set_params() automatically, which are required by GridSearchCV and clone(). The key rule: store every constructor argument as an attribute with the exact same name (do not modify it). This is what enables get_params() to introspect the object.

TransformerMixin gives you fit_transform() for free - it just calls self.fit(X, y).transform(X). You do not need to implement it yourself.

class GoodTransformer(BaseEstimator, TransformerMixin):
def __init__(self, alpha=1.0, method='median'):
# CORRECT: attribute name matches parameter name exactly
self.alpha = alpha
self.method = method

def fit(self, X, y=None):
return self

def transform(self, X):
return X


class BadTransformer(BaseEstimator, TransformerMixin):
def __init__(self, alpha=1.0):
# WRONG: modifying the parameter in __init__ breaks get_params()
self.alpha_ = alpha * 2 # now get_params() returns {} for 'alpha'
# GridSearchCV cannot see this parameter

Stateful Custom Transformer: Winsorizer

A Winsorizer clips extreme values at a percentile computed from training data - not a fixed number of standard deviations. This is stateful: fit() computes the percentiles from training data, transform() applies them.

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin

class Winsorizer(BaseEstimator, TransformerMixin):
"""
Cap values at the lower and upper percentiles computed from training data.

Parameters
----------
lower : float, default=0.05
Lower percentile (e.g., 0.05 = 5th percentile).
upper : float, default=0.95
Upper percentile (e.g., 0.95 = 95th percentile).
"""

def __init__(self, lower=0.05, upper=0.95):
self.lower = lower
self.upper = upper

def fit(self, X, y=None):
# Compute percentile thresholds from training data only
# These are stored as fitted attributes (convention: trailing underscore)
self.lower_bounds_ = np.percentile(X, self.lower * 100, axis=0)
self.upper_bounds_ = np.percentile(X, self.upper * 100, axis=0)
return self

def transform(self, X):
# Apply training-set thresholds - never recompute on test data
X = np.array(X, dtype=float)
return np.clip(X, self.lower_bounds_, self.upper_bounds_)

def get_feature_names_out(self, input_features=None):
"""Support get_feature_names_out() - required for ColumnTransformer integration."""
if input_features is None:
return np.array([f'x{i}' for i in range(len(self.lower_bounds_))])
return np.array(input_features)


# Test it
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression

np.random.seed(42)
X_train = np.random.randn(200, 3) * 10
X_train[0, 0] = 1000.0 # inject outlier into training data
X_test = np.random.randn(50, 3) * 10
X_test[0, 0] = 500.0 # inject outlier into test data

pipe = Pipeline([
('winsor', Winsorizer(lower=0.01, upper=0.99)),
('scaler', StandardScaler()),
('model', LinearRegression()),
])

y_train = X_train[:, 0] + np.random.randn(200)
pipe.fit(X_train, y_train)

# The winsorizer's bounds come from X_train's percentiles
print("Upper bounds:", pipe.named_steps['winsor'].upper_bounds_)
# The outlier in X_test is capped at the training 99th percentile, not X_test's 99th percentile

Stateful Custom Transformer: CyclicalEncoder

Cyclical features like hour-of-day (0–23) or day-of-week (0–6) should be encoded with sine and cosine so that the model understands that hour 23 is close to hour 0 - they are neighbors on a circle, not endpoints on a line.

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin

class CyclicalEncoder(BaseEstimator, TransformerMixin):
"""
Encode cyclical features with sine and cosine.

For a feature with period P, hour-of-day has P=24, day-of-week has P=7.
Output has 2x the number of input columns: [sin(2π*x/P), cos(2π*x/P)] per column.

Parameters
----------
periods : dict
Maps column index to its period. E.g., {0: 24, 1: 7} for
column 0 = hour-of-day, column 1 = day-of-week.
"""

def __init__(self, periods):
self.periods = periods

def fit(self, X, y=None):
# Stateless - no statistics needed from training data
return self

def transform(self, X):
X = np.array(X, dtype=float)
outputs = []
for col_idx, period in sorted(self.periods.items()):
col = X[:, col_idx]
outputs.append(np.sin(2 * np.pi * col / period))
outputs.append(np.cos(2 * np.pi * col / period))
return np.column_stack(outputs)

def get_feature_names_out(self, input_features=None):
names = []
for col_idx, period in sorted(self.periods.items()):
prefix = f'x{col_idx}' if input_features is None else input_features[col_idx]
names.append(f'{prefix}_sin_{period}')
names.append(f'{prefix}_cos_{period}')
return np.array(names)


# Example: ride-sharing demand prediction
# Features: hour_of_day (0-23), day_of_week (0-6), temperature
import pandas as pd

df = pd.DataFrame({
'hour_of_day': [0, 6, 12, 18, 23],
'day_of_week': [0, 1, 3, 5, 6],
'temperature': [15.2, 18.1, 25.3, 22.7, 16.8],
})

# hour and day need cyclical encoding; temperature needs scaling
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler

cyclical_encoder = CyclicalEncoder(periods={0: 24, 1: 7})

preprocessor = ColumnTransformer(transformers=[
('cyclical', cyclical_encoder, ['hour_of_day', 'day_of_week']),
('numeric', StandardScaler(), ['temperature']),
])

X_enc = preprocessor.fit_transform(df)
print("Shape:", X_enc.shape) # (5, 5): 4 cyclical + 1 scaled
print("Features:", preprocessor.get_feature_names_out())
# ['cyclical__hour_of_day_sin_24', 'cyclical__hour_of_day_cos_24',
# 'cyclical__day_of_week_sin_7', 'cyclical__day_of_week_cos_7',
# 'numeric__temperature']

FunctionTransformer: Quick transformations without a class

For simple stateless transformations, FunctionTransformer wraps a plain Python function into a transformer object.

from sklearn.preprocessing import FunctionTransformer
import numpy as np

# Log transform
log_transformer = FunctionTransformer(np.log1p, validate=True)

# Clip outliers at fixed values
clip_transformer = FunctionTransformer(
lambda X: np.clip(X, -5, 5),
validate=True
)

# Square root transform (for count data)
sqrt_transformer = FunctionTransformer(np.sqrt, validate=True)

# Use in a pipeline
from sklearn.pipeline import Pipeline
from sklearn.linear_model import Ridge

pipe = Pipeline([
('log', FunctionTransformer(np.log1p)),
('clip', FunctionTransformer(lambda X: np.clip(X, 0, 10))),
('model', Ridge()),
])

# FunctionTransformer.fit() does nothing (stateless)
# Good for: log, sqrt, power transforms, simple thresholds
# Bad for: anything that needs to learn statistics from training data

:::note When to use FunctionTransformer vs a custom class Use FunctionTransformer when the transformation is stateless - it does not need to compute any statistics from training data. Use a custom BaseEstimator subclass when you need to compute statistics in fit() (like Winsorizer computing percentiles). A common mistake is using FunctionTransformer with a lambda that captures training-set statistics from outside the pipeline - this reintroduces the leakage problem you are trying to avoid. :::

Custom transformer for Pandas DataFrames

import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin

class FrequencyEncoder(BaseEstimator, TransformerMixin):
"""Replace categorical values with their training-set frequency."""

def __init__(self, columns):
self.columns = columns

def fit(self, X, y=None):
X = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X
self.freq_maps_ = {}
for col in self.columns:
self.freq_maps_[col] = X[col].value_counts(normalize=True).to_dict()
return self

def transform(self, X):
X = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X.copy()
for col in self.columns:
X[col] = X[col].map(self.freq_maps_.get(col, {})).fillna(0.0)
return X

Caching Pipelines for Expensive Steps

If you have an expensive transformer (e.g., TF-IDF on 10M documents), caching avoids recomputing it during grid search. The memory parameter accepts a path to a directory where fitted transformers are cached using joblib.

from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDClassifier
import tempfile

# Use a temporary directory as the cache
cache_dir = tempfile.mkdtemp()

pipe = Pipeline([
('tfidf', TfidfVectorizer(max_features=50_000)),
('clf', SGDClassifier(loss='log_loss')),
], memory=cache_dir)

# First fit: TF-IDF is computed and cached to disk
# pipe.fit(X_train_texts, y_train)

# Subsequent fits with different clf params: TF-IDF is loaded from cache
from sklearn.model_selection import GridSearchCV

param_grid = {'clf__alpha': [0.0001, 0.001, 0.01]}
gs = GridSearchCV(pipe, param_grid, cv=5)
# gs.fit(X_train_texts, y_train)
# TF-IDF is computed once and cached; only SGDClassifier is re-fit for each param combo

:::warning Cache invalidation The cache is keyed on the transformer's hyperparameters. If you change TfidfVectorizer(max_features=100_000), the cache is invalidated and TF-IDF is recomputed. If you change only clf__alpha, the cache is hit. This is the correct behavior - but make sure to clean the cache directory when you are done with an experiment, or it can consume significant disk space. :::

Pipeline + Cross-Validation: No Data Leakage

Using cross_val_score with a Pipeline is the correct way to do CV because the pipeline's fit is called only on the training fold, guaranteeing no leakage.

from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.datasets import make_classification
import numpy as np

X, y = make_classification(n_samples=500, n_features=20, random_state=42)

pipe = Pipeline([
('scaler', StandardScaler()),
('svm', SVC(kernel='rbf', probability=True)),
])

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(pipe, X, y, cv=cv, scoring='roc_auc', n_jobs=-1)

print(f"CV AUC: {scores.mean():.4f} ± {scores.std():.4f}")
# Each fold: scaler.fit on 400 samples, scaler.transform on 100 samples
# No test-fold statistics ever influence the scaler

:::danger The classic leakage mistake

# WRONG: fit preprocessor before cross-validation
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # uses ALL data including every CV test fold

# This CV score is optimistic - test folds already "seen" by the scaler
scores = cross_val_score(SVC(), X_scaled, y, cv=5)

# CORRECT: put the scaler inside the pipeline
pipe = Pipeline([('scaler', StandardScaler()), ('svc', SVC())])
scores = cross_val_score(pipe, X, y, cv=5)

The difference can be substantial for small datasets or high-variance features. The contaminated evaluation inflates the CV score, making you think your model is better than it actually is. :::

Grid Search and RandomizedSearchCV on Pipelines

Use double underscore __ to reference parameters inside nested pipeline steps. The pattern is stepname__parametername. For nested structures like ColumnTransformer inside a Pipeline, you chain them: preprocessor__num__scaler__with_mean.

from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from scipy.stats import randint, uniform

pipe = Pipeline([
('scaler', StandardScaler()),
('rf', RandomForestClassifier(random_state=42)),
])

# Grid search - step name + __ + parameter name
param_grid = {
'rf__n_estimators': [50, 100, 200],
'rf__max_depth': [None, 5, 10],
'rf__min_samples_split': [2, 5, 10],
# You can also search over preprocessor parameters:
'scaler__with_mean': [True, False], # disable mean centering
}

gs = GridSearchCV(
pipe,
param_grid,
cv=5,
scoring='roc_auc',
n_jobs=-1,
verbose=1,
)
gs.fit(X_train, y_train)
print(f"Best params: {gs.best_params_}")
print(f"Best CV AUC: {gs.best_score_:.4f}")

# Randomized search for larger spaces
param_dist = {
'rf__n_estimators': randint(50, 500),
'rf__max_depth': [None, 3, 5, 10, 15],
'rf__max_features': ['sqrt', 'log2', 0.5],
'rf__min_samples_leaf': randint(1, 20),
}

rs = RandomizedSearchCV(
pipe,
param_dist,
n_iter=50,
cv=5,
scoring='roc_auc',
n_jobs=-1,
random_state=42,
)
rs.fit(X_train, y_train)
print(f"Best random search AUC: {rs.best_score_:.4f}")

Nested parameter paths for ColumnTransformer inside Pipeline

# Full pipeline: Pipeline → ColumnTransformer → sub-Pipeline → transformer
full_pipeline = Pipeline([
('preprocessor', ColumnTransformer([
('num', Pipeline([
('imputer', SimpleImputer()),
('scaler', StandardScaler()),
]), NUMERIC_COLS),
('cat', Pipeline([
('imputer', SimpleImputer()),
('encoder', OneHotEncoder()),
]), CATEGORICAL_COLS),
])),
('classifier', GradientBoostingClassifier()),
])

# Parameter path: pipeline_step__CT_step__sub_pipeline_step__param
param_grid = {
# preprocessor (ColumnTransformer) → num (sub-Pipeline) → scaler (StandardScaler) → with_mean
'preprocessor__num__scaler__with_mean': [True, False],
# preprocessor → cat → encoder → max_categories
'preprocessor__cat__encoder__max_categories': [10, 20, None],
# classifier params
'classifier__n_estimators': [50, 100],
'classifier__learning_rate': [0.05, 0.1, 0.2],
'classifier__max_depth': [3, 5],
}

gs = GridSearchCV(full_pipeline, param_grid, cv=3, n_jobs=-1)
# The double-underscore separator can be chained as deep as needed

:::tip Nested cross-validation for unbiased model selection When you use GridSearchCV to select the best hyperparameters and then report gs.best_score_, that score is biased upward - the parameters were chosen to maximize it. For a truly unbiased estimate, use nested CV:

from sklearn.model_selection import cross_val_score, GridSearchCV

inner_cv = StratifiedKFold(n_splits=5)
outer_cv = StratifiedKFold(n_splits=5)

gs = GridSearchCV(pipe, param_grid, cv=inner_cv)
# Outer loop evaluates the whole model selection procedure
nested_scores = cross_val_score(gs, X, y, cv=outer_cv, scoring='roc_auc')
print(f"Nested CV AUC: {nested_scores.mean():.4f} ± {nested_scores.std():.4f}")

This is slower (nouter×ninner×nparamsn_{outer} \times n_{inner} \times n_{params} fits) but gives an estimate of how well your model selection procedure generalizes. :::

Pipeline Visualization

Sklearn 1.0+ can render an interactive HTML diagram of your pipeline structure in Jupyter notebooks.

from sklearn import set_config

# Enable HTML diagram rendering in notebooks
set_config(display='diagram')

# Now just display the pipeline object - it renders as an interactive diagram
full_pipeline
# Click on each step to expand and see hyperparameters

# You can also get the HTML as a string
from sklearn.utils.estimator_html_repr import estimator_html_repr
html = estimator_html_repr(full_pipeline)

# For text-based display (useful in scripts/logs)
set_config(display='text')
print(full_pipeline)
# Check the current config
from sklearn import get_config
print(get_config())
# {'assume_finite': False, 'working_memory': 1024, 'print_changed_only': True,
# 'display': 'diagram', 'pairwise_dist_chunk_size': 256, ...}

Production Deployment: Pickle the Pipeline

A Pipeline serializes the entire object graph - imputer statistics, scaler mean/std, encoder categories, and model weights - into a single file. This is what you deploy.

import pickle
import joblib

# Fit the final pipeline on ALL training data (after CV-based model selection)
final_pipe = Pipeline([
('preprocessor', preprocessor),
('classifier', GradientBoostingClassifier(**gs.best_params_)),
])
final_pipe.fit(X_train, y_train)

# Save with joblib (better than pickle for numpy arrays)
joblib.dump(final_pipe, 'churn_model_v1.pkl', compress=3)

# Load and serve
loaded_pipe = joblib.load('churn_model_v1.pkl')

# Inference: same API as training
new_data = pd.DataFrame({
'age': [32],
'spend_30d': [245.0],
'plan': ['paid'],
'country': ['US'],
})
proba = loaded_pipe.predict_proba(new_data)[:, 1]
print(f"Churn probability: {proba[0]:.4f}")

joblib vs pickle: the real differences

# pickle - standard library, works for everything
import pickle

with open('model.pkl', 'wb') as f:
pickle.dump(final_pipe, f)

with open('model.pkl', 'rb') as f:
model = pickle.load(f)

# joblib - preferred for sklearn models
import joblib

joblib.dump(final_pipe, 'model.joblib', compress=3)
# compress=3 → good balance of speed and size (0=none, 9=maximum)

model = joblib.load('model.joblib')

The practical differences:

Aspectpicklejoblib
Large numpy arraysSlow, large filesFast, efficient (memory-mapped)
CompressionNo built-inBuilt-in (compress=0-9)
Parallel serializationNoYes (for large arrays)
Standard libraryYesNo (pip install)
sklearn recommendationWorksPreferred

:::danger sklearn version pinning A sklearn Pipeline serialized with sklearn 1.3 may not load correctly on sklearn 1.0. The __init__ signatures of transformers change between versions. Always record the sklearn version alongside your model artifact, and pin sklearn in your serving environment to the same version used for training.

import sklearn
import json

metadata = {
'sklearn_version': sklearn.__version__,
'model_path': 'churn_model_v1.joblib',
'trained_at': '2024-01-15T10:23:44',
}

# At serving time, assert versions match
assert sklearn.__version__ == metadata['sklearn_version'], \
f"sklearn version mismatch: {sklearn.__version__} != {metadata['sklearn_version']}"

:::

Pipeline versioning with metadata

import json
from datetime import datetime

metadata = {
'model_version': 'v1.2.0',
'trained_at': datetime.utcnow().isoformat(),
'cv_roc_auc': float(gs.best_score_),
'best_params': gs.best_params_,
'n_training_samples': len(X_train),
'feature_columns': NUMERIC_COLS + CATEGORICAL_COLS,
'sklearn_version': sklearn.__version__,
}

with open('churn_model_v1_metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)

:::tip Pipeline versioning Save the pipeline with a version tag in the filename and alongside a metadata JSON with the training date, dataset version, CV score, and git commit hash. When a model regresses in production, you need to be able to roll back to the exact previous artifact. :::

MLflow for model tracking

For teams running many experiments, joblib files alone are hard to manage. MLflow wraps sklearn pipelines with experiment tracking, artifact storage, and a model registry.

import mlflow
import mlflow.sklearn

with mlflow.start_run():
# Log hyperparameters
mlflow.log_params(gs.best_params_)
mlflow.log_metric('cv_roc_auc', gs.best_score_)
mlflow.log_metric('test_roc_auc', roc_auc_score(y_test, final_pipe.predict_proba(X_test)[:, 1]))

# Log the entire sklearn pipeline as an artifact
mlflow.sklearn.log_model(
final_pipe,
artifact_path='churn_model',
registered_model_name='ChurnPipeline',
)

# Later: load from MLflow registry for serving
model_uri = 'models:/ChurnPipeline/Production'
loaded_pipe = mlflow.sklearn.load_model(model_uri)
predictions = loaded_pipe.predict(X_new)

Common Mistakes

:::danger Fitting the preprocessor outside the pipeline

# WRONG: scaler is fit on all data before the split
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_train, X_test = train_test_split(X_scaled)

# CORRECT: put everything inside the pipeline
pipe = Pipeline([('scaler', StandardScaler()), ('model', LogisticRegression())])
X_train, X_test = train_test_split(X)
pipe.fit(X_train, y_train)

:::

:::danger Mutating X inside transform()

# WRONG: modifies the input array in-place
def transform(self, X):
X[:, 0] = np.log1p(X[:, 0]) # mutates the caller's data
return X

# CORRECT: always copy first
def transform(self, X):
X = X.copy()
X[:, 0] = np.log1p(X[:, 0])
return X

If you mutate X in-place, the Pipeline's intermediate results are corrupted across folds in cross-validation. This is very hard to debug. :::

:::warning init parameter naming in custom transformers

# WRONG: stored under different name than the parameter
def __init__(self, n_std=3):
self.threshold = n_std # get_params() returns {} - GridSearchCV breaks

# CORRECT: same name
def __init__(self, n_std=3):
self.n_std = n_std # get_params() returns {'n_std': 3}

:::

YouTube Resources

VideoChannelWhat it covers
Scikit-Learn PipelinesDataSchoolComplete pipeline walkthrough
ColumnTransformer ExplainedsentdexMixed-type preprocessing
GridSearchCV with PipelinesCorey SchaferHyperparameter tuning
Custom sklearn TransformersPython EngineerBuilding your own transformers

Interview Q&A

Q1: What problem does a scikit-learn Pipeline solve, and why is it essential for production ML?

A Pipeline prevents data leakage by ensuring that every preprocessing step is fit only on training data. Without a pipeline, it is easy to accidentally fit a scaler or imputer on the combined train+test dataset before splitting - meaning the model "sees" test statistics during training, artificially inflating offline metrics. A Pipeline also bundles preprocessing and model into a single serializable object, so the exact same transformations applied during training are applied at inference time. This eliminates the "training-serving skew" class of bugs where the production API applies different preprocessing than what the model was trained on. The Spotify example is real: a scaler fit on the full dataset before splitting went undetected for months because the offline/online gap looked like an A/B test power issue, not a data leakage bug.

Q2: How does ColumnTransformer work, and when would you use it?

ColumnTransformer applies different transformers to different subsets of columns. This is necessary for mixed-type datasets: numeric columns need StandardScaler and SimpleImputer, categorical columns need OneHotEncoder or OrdinalEncoder, and text columns need TfidfVectorizer. Each transformer is specified as a tuple of (name, transformer, columns). The remainder parameter controls what happens to columns not listed: 'drop' discards them, 'passthrough' includes them unchanged, or you can pass another transformer. The output is the horizontal concatenation of all transformer outputs. After fitting, call get_feature_names_out() to recover the column names, which are prefixed with the transformer name (e.g., num__age, cat__plan_free).

Q3: How do you implement a stateful custom transformer in scikit-learn, and what conventions must you follow?

Subclass both BaseEstimator and TransformerMixin. Implement fit(X, y=None) which computes statistics from training data and stores them as attributes with a trailing underscore (e.g., self.mean_, self.upper_bounds_). Return self from fit. Implement transform(X) which applies the transformation using the fitted attributes - never recompute statistics from X inside transform. Copy X before modifying it (never mutate in-place). In __init__, store every constructor argument as an attribute with the exact same name - BaseEstimator.get_params() introspects __init__'s signature and matches argument names to attribute names. Breaking this convention causes GridSearchCV to silently ignore the parameter.

Q4: How does parameter naming work in GridSearchCV when the pipeline has nested steps?

Use double underscore __ to chain through the hierarchy. For a Pipeline with step 'preprocessor' (a ColumnTransformer) containing a sub-step 'num' (another Pipeline) containing 'scaler' (a StandardScaler), the parameter with_mean is addressed as preprocessor__num__scaler__with_mean. Each __ goes one level deeper. The rule: use the name you gave the step in the parent container's list of transformers, then __, then the next level. This design is what allows GridSearchCV to call set_params() to inject different hyperparameter values into the nested structure without knowing the pipeline's architecture in advance.

Q5: How do you handle a transformer that needs to know column names (DataFrame column access inside transform)?

The safest approach is to have the transformer accept column names in its constructor and use them inside transform. The transformer checks whether its input is a DataFrame or numpy array, and branches accordingly. ColumnTransformer always passes numpy arrays to transformers (the column selection happens before passing to the transformer). If you need column-aware behavior inside a transformer, either: (1) accept column indices in the constructor (works with numpy arrays from ColumnTransformer), or (2) use set_output(transform='pandas') in sklearn 1.2+ which makes all transformers output DataFrames, or (3) write your pipeline to work at the DataFrame level without ColumnTransformer (using pandas pipelines or feature-engine).

Q6: What are the risks of Pipeline serialization and how do you mitigate them?

Three main risks. First, sklearn version mismatch: a pipeline serialized with sklearn 1.3 may fail to load on sklearn 1.0 because __init__ signatures of transformers change between versions. Pin sklearn in your serving environment to the same version used for training, and record the version in metadata. Second, custom transformer classes must be importable at deserialization time: joblib/pickle stores class paths by reference, not by value. If you rename or move your Winsorizer class, loading the old model fails with an ImportError. Keep a stable module path for production transformers. Third, large models: joblib with compress=3 handles numpy arrays efficiently (memory-mapped I/O), but pickle can be prohibitively slow for large TF-IDF matrices or KNN models. Use joblib for everything containing large numpy arrays.

Q7: When and how would you use pipeline caching, and what are the caveats?

Use memory=cache_dir when you have an expensive transformer early in the pipeline that does not change across grid search iterations - TF-IDF vectorization, image feature extraction, or embedding generation. The cache is keyed on the transformer class and its hyperparameters. When only downstream parameters change (e.g., the classifier's regularization strength), the cached transformer output is reused. The main caveats: (1) cache invalidation is automatic but only based on hyperparameters, not on data changes - if your training data changes, clear the cache manually; (2) the cache directory can grow large with many experiments; (3) caching does not help if the expensive transformer's hyperparameters are also being searched (every new max_features value in TF-IDF invalidates the cache). Clean up with shutil.rmtree(cache_dir) after the experiment.

:::tip 🎮 Interactive Playground

Visualize this concept: Try the Regression Explorer demo on the EngineersOfAI Playground - no code required.

:::

© 2026 EngineersOfAI. All rights reserved.