Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Drift Detection demo on the EngineersOfAI Playground - no code required. :::

Feature Monitoring

The Silent Degradation

The fraud model had been running for nine months without incident. Precision of 0.87, recall of 0.72, fraud caught per week trending upward. The operations team had set up model-level monitoring: they tracked precision and recall weekly, and had a PagerDuty alert configured to fire if precision dropped below 0.75. It never did - not for the first six weeks of the degradation.

Precision had started dropping in week one. Slowly, then faster. By week four it was 0.81. By week six it was 0.77. By week nine, when an analyst noticed something was off while preparing a quarterly report, precision was 0.71. The alert threshold of 0.75 had been passed in week eight - the team got a PagerDuty alert on a Tuesday morning, investigated for half a day, could not find an obvious model issue, and closed the incident.

The actual failure had nothing to do with the model. One of the twelve input features - device_risk_score - had silently started returning the default value of 0.0 for 34% of users. A vendor API change had modified the response format for certain device types, and the feature pipeline's parser was not handling the new format. Instead of failing noisily, it fell back to the default value. The model received feature values for a third of its users that it had never seen during training in that distribution - during training, 0.0 was rare, associated with very low-risk devices. The model had strong learned weights associating 0.0 with safe users. It stopped flagging them.

A null rate check on device_risk_score - checking that the fraction of values equal to 0.0 was within the historical range - would have fired in hour one of the vendor API change. The fraud operations team would have investigated the feature pipeline, not the model. The precision loss would have been caught before it caused measurable business damage.

This is the gap that feature monitoring fills. Models do not fail in obvious ways. They silently degrade because their inputs change while they continue to produce outputs that look plausible. By the time model-level metrics degrade visibly, the feature problem has often been running for days or weeks.


Why This Exists: The Observability Gap

Production ML systems have two fundamental monitoring surfaces: model outputs and model inputs. The industry has heavily invested in monitoring model outputs - prediction confidence distributions, outcome metrics, A/B test guardrail metrics. Feature monitoring - observing the inputs - has historically been neglected.

The reason for this gap is historical. Before feature stores and centralized feature management, features were computed inline within model serving code. Monitoring them required instrumenting many different codepaths. Feature stores changed this: when all feature computation flows through a central platform, adding monitoring at that layer becomes tractable.

The second reason is that model output monitoring seems sufficient - if the model is wrong, outputs will degrade, and you will catch it. This intuition is wrong for two reasons:

  1. Detection lag: model output degradation is downstream of feature degradation. By the time outputs degrade visibly, the feature problem has already caused harm.
  2. Ambiguity: output degradation can have many causes - data drift, model staleness, concept drift, infrastructure issues. Feature monitoring narrows the root cause immediately.

Feature monitoring answers three questions continuously: Are features arriving on time? Are they covering the right entities? Are their values consistent with what the model was trained on?


Historical Context: From Ad-Hoc Checks to Systematic Observability

Before 2018, ML monitoring in most organizations meant "check the model accuracy once a week." The concept of monitoring individual features systematically - with statistical tests, SLA tracking, and automated alerts - emerged alongside the feature store movement.

The finance industry had analogous tooling in data quality frameworks for risk models, where regulators required documentation of data quality checks. These frameworks measured completeness (null rates), consistency (value range checks), and timeliness (freshness checks). The ML industry adopted and extended these concepts.

Tecton, Feast, and similar feature stores began incorporating monitoring primitives around 2021–2022. Tools like Evidently AI (open source, founded 2021) and Arize AI (commercial, founded 2020) emerged specifically to address the gap in ML observability, with feature drift detection as a core capability. By 2023, feature monitoring had become a standard requirement for production ML systems in regulated industries.


What Feature Monitoring Measures

Feature monitoring is continuous measurement of feature health across three dimensions:

Each dimension captures a different failure mode:

DimensionFailure ModeExample
StalenessPipeline failed or delayedBatch job crashed at 2 AM, features not updated
CoverageNull rate increasedVendor API change causes missing values for 34% of users
DistributionFeature values shiftedNew user cohort has different behavior patterns

Staleness Monitoring

A stale feature is one that has not been updated within its expected refresh interval - its SLA (Service Level Agreement). A daily feature pipeline that has not run in 36 hours is stale. A streaming feature that has not received events in 10 minutes when events normally arrive every 30 seconds is stale.

Staleness monitoring requires:

  1. Knowing the expected refresh interval for each feature
  2. Querying the last materialization timestamp
  3. Alerting when now - last_updated > ttl_threshold
import pandas as pd
from datetime import datetime, timezone
from typing import Dict, Optional, Callable
from dataclasses import dataclass, field
import logging

logger = logging.getLogger(__name__)


@dataclass
class FeatureSLA:
"""SLA definition for a feature view."""
feature_view_name: str
max_staleness_hours: float # alert if older than this
critical_staleness_hours: float # page on-call if older than this
entity_table: str # table to query for last_updated


@dataclass
class StalenessCheckResult:
feature_view: str
last_updated: Optional[datetime]
age_hours: Optional[float]
is_stale: bool
is_critical: bool
message: str


class FreshnessMonitor:
"""
Monitors feature freshness against defined SLAs.

Queries the feature metadata layer (or offline store)
for the last materialization timestamp of each feature view.
"""

def __init__(self, slas: list[FeatureSLA],
get_last_updated_fn: Callable[[str], Optional[datetime]]):
"""
Args:
slas: List of SLA definitions for each feature view
get_last_updated_fn: Callable that takes feature_view_name
and returns the last materialization time.
Wraps your feature store's metadata API.
"""
self.slas = {sla.feature_view_name: sla for sla in slas}
self.get_last_updated = get_last_updated_fn

def check_all(self) -> list[StalenessCheckResult]:
"""Check freshness for all registered feature views."""
results = []
now = datetime.now(timezone.utc)

for feature_view_name, sla in self.slas.items():
last_updated = self.get_last_updated(feature_view_name)

if last_updated is None:
results.append(StalenessCheckResult(
feature_view=feature_view_name,
last_updated=None,
age_hours=None,
is_stale=True,
is_critical=True,
message=f"No materialization timestamp found for {feature_view_name}"
))
continue

# Ensure timezone-aware comparison
if last_updated.tzinfo is None:
last_updated = last_updated.replace(tzinfo=timezone.utc)

age_hours = (now - last_updated).total_seconds() / 3600

is_stale = age_hours > sla.max_staleness_hours
is_critical = age_hours > sla.critical_staleness_hours

results.append(StalenessCheckResult(
feature_view=feature_view_name,
last_updated=last_updated,
age_hours=round(age_hours, 2),
is_stale=is_stale,
is_critical=is_critical,
message=(
f"CRITICAL: {feature_view_name} is {age_hours:.1f}h old "
f"(threshold: {sla.critical_staleness_hours}h)"
if is_critical else
f"STALE: {feature_view_name} is {age_hours:.1f}h old "
f"(threshold: {sla.max_staleness_hours}h)"
if is_stale else
f"OK: {feature_view_name} is {age_hours:.1f}h old"
)
))

return results

def check_and_alert(self, alert_fn: Callable[[StalenessCheckResult], None]):
"""Check freshness and fire alerts for stale features."""
results = self.check_all()
for result in results:
if result.is_stale:
logger.warning(result.message)
alert_fn(result)
else:
logger.info(result.message)
return results


# Integration with Feast
def make_feast_freshness_checker(store) -> Callable[[str], Optional[datetime]]:
"""
Returns a function that queries Feast registry for last materialization time.
"""
def get_last_updated(feature_view_name: str) -> Optional[datetime]:
try:
# Feast stores materialization intervals in the registry
fv = store.get_feature_view(feature_view_name)
if fv.materialization_intervals:
latest = max(
interval.end_time
for interval in fv.materialization_intervals
)
return latest
return None
except Exception as e:
logger.error(f"Failed to get last updated for {feature_view_name}: {e}")
return None
return get_last_updated


# Airflow sensor pattern: upstream model training waits for fresh features
# In Airflow DAG:
#
# def check_features_fresh(**context):
# results = monitor.check_all()
# stale = [r for r in results if r.is_stale]
# if stale:
# raise AirflowException(
# f"Features not fresh: {[r.feature_view for r in stale]}"
# )
#
# wait_for_features = PythonSensor(
# task_id='wait_for_features_freshness',
# python_callable=check_features_fresh,
# poke_interval=300, # check every 5 minutes
# timeout=7200, # fail after 2 hours
# )
# wait_for_features >> train_model

Coverage Monitoring

Coverage monitoring measures whether features are populated - whether entity-feature pairs that should have values are returning non-null results. Coverage failures are often subtle: not all entities become null, only a specific segment does.

import pandas as pd
import numpy as np
from typing import Optional


@dataclass
class CoverageMetrics:
feature_name: str
total_entities: int
null_count: int
null_rate: float
baseline_null_rate: float # historical average
null_rate_delta: float # current - baseline
alert: bool
severity: str # "ok", "warning", "critical"


class CoverageMonitor:
"""
Monitors null rates and coverage gaps per feature.

Compares current null rates to a reference distribution
(typically computed from the training dataset or a recent
stable production window).
"""

def __init__(self,
baseline_null_rates: dict[str, float],
warning_threshold: float = 0.05, # 5% increase
critical_threshold: float = 0.15): # 15% increase
self.baseline = baseline_null_rates
self.warning_threshold = warning_threshold
self.critical_threshold = critical_threshold

def check_coverage(self, df: pd.DataFrame,
feature_cols: list[str]) -> list[CoverageMetrics]:
"""
Compute coverage metrics for a batch of feature values.

Args:
df: Feature value DataFrame with one row per entity
feature_cols: Features to check

Returns:
Coverage metrics for each feature
"""
results = []
n = len(df)

for col in feature_cols:
null_count = df[col].isnull().sum()
null_rate = null_count / n if n > 0 else 0.0
baseline = self.baseline.get(col, 0.0)
delta = null_rate - baseline

if abs(delta) >= self.critical_threshold:
severity = "critical"
alert = True
elif abs(delta) >= self.warning_threshold:
severity = "warning"
alert = True
else:
severity = "ok"
alert = False

results.append(CoverageMetrics(
feature_name=col,
total_entities=n,
null_count=int(null_count),
null_rate=round(null_rate, 4),
baseline_null_rate=round(baseline, 4),
null_rate_delta=round(delta, 4),
alert=alert,
severity=severity
))

return results

def check_segment_coverage(self, df: pd.DataFrame,
feature_col: str,
segment_col: str) -> pd.DataFrame:
"""
Check coverage broken down by entity segment.

Catches platform-specific or cohort-specific coverage failures
that would be hidden in aggregate metrics.
"""
segment_stats = (
df.groupby(segment_col)[feature_col]
.agg(
total='count',
null_count=lambda x: x.isnull().sum(),
null_rate=lambda x: x.isnull().mean()
)
.reset_index()
)
segment_stats['baseline_null_rate'] = (
self.baseline.get(feature_col, 0.0)
)
segment_stats['delta'] = (
segment_stats['null_rate'] - segment_stats['baseline_null_rate']
)
segment_stats['alert'] = (
segment_stats['delta'].abs() >= self.warning_threshold
)
return segment_stats


# Row count anomaly detection
class RowCountMonitor:
"""
Detects when today's feature table has significantly fewer
entities than expected (signals an upstream pipeline failure).
"""

def __init__(self, expected_count: int,
min_fraction: float = 0.90):
self.expected_count = expected_count
self.min_fraction = min_fraction

def check(self, actual_count: int) -> dict:
fraction = actual_count / self.expected_count
is_anomalous = fraction < self.min_fraction
return {
'expected': self.expected_count,
'actual': actual_count,
'fraction': round(fraction, 4),
'is_anomalous': is_anomalous,
'message': (
f"ANOMALY: only {fraction:.1%} of expected rows present "
f"({actual_count:,} of {self.expected_count:,})"
if is_anomalous else
f"OK: {fraction:.1%} of expected rows present"
)
}

Segment-Level Coverage: The Hidden Failure

Aggregate null rates can mask segment-level failures. If device_risk_score fails for all mobile users but is fine for desktop users, and mobile is 40% of your user base, the aggregate null rate increases by 40% - a large and detectable signal. But if mobile is 8% of your user base, the aggregate null rate increases by only 8%, which may be within your alert thresholds.

Segment-level coverage monitoring catches failures that affect specific cohorts:

# Example: check null rates by platform segment
segment_results = monitor.check_segment_coverage(
df=todays_features,
feature_col='device_risk_score',
segment_col='platform' # 'mobile', 'desktop', 'tablet'
)

# Output would show:
# platform | total | null_count | null_rate | delta | alert
# mobile | 8420 | 2863 | 0.340 | 0.334 | True ← CAUGHT
# desktop | 24381 | 244 | 0.010 | 0.004 | False
# tablet | 1203 | 12 | 0.010 | 0.004 | False

Distribution Drift Monitoring

Even when features are fresh and fully populated, their values can shift. A model trained on historical data may receive production features with a different statistical distribution - not because anything broke, but because the world changed.

Population Stability Index (PSI)

PSI is the standard metric for measuring feature distribution drift in production ML. It was developed in the credit scoring industry and has been widely adopted in ML monitoring.

PSI=i=1n(PiQi)ln(PiQi)\text{PSI} = \sum_{i=1}^{n} \left( P_i - Q_i \right) \cdot \ln\left(\frac{P_i}{Q_i}\right)

Where PiP_i is the fraction of the training (reference) distribution in bin ii, and QiQ_i is the fraction of the current production distribution in bin ii.

PSI thresholds:

  • PSI<0.1\text{PSI} < 0.1: stable - no action needed
  • 0.1PSI<0.20.1 \leq \text{PSI} < 0.2: moderate shift - monitor closely, investigate
  • PSI0.2\text{PSI} \geq 0.2: significant shift - consider retraining, escalate
import numpy as np
import pandas as pd
from typing import Union


def compute_psi(
reference: np.ndarray,
current: np.ndarray,
buckets: int = 10,
epsilon: float = 1e-7
) -> float:
"""
Compute Population Stability Index between a reference
distribution and a current production distribution.

Args:
reference: Array of feature values from training data
current: Array of feature values from production (current period)
buckets: Number of bins to use for discretization
epsilon: Small value to avoid log(0) - added to zero bins

Returns:
PSI value. Interpretation:
< 0.1: stable
0.1 - 0.2: moderate shift
> 0.2: significant shift, consider retraining
"""
# Compute bin edges from the reference distribution
# Using reference quantiles ensures bins are well-populated
breakpoints = np.nanpercentile(
reference,
np.linspace(0, 100, buckets + 1)
)
# Remove duplicate breakpoints (can occur with low-cardinality features)
breakpoints = np.unique(breakpoints)

# Assign each value to a bin
ref_bins = np.histogram(reference, bins=breakpoints)[0]
cur_bins = np.histogram(current, bins=breakpoints)[0]

# Convert to proportions
ref_props = ref_bins / len(reference)
cur_props = cur_bins / len(current)

# Add epsilon to avoid division by zero and log(0)
ref_props = np.clip(ref_props, epsilon, None)
cur_props = np.clip(cur_props, epsilon, None)

# Normalize to sum to 1 after clipping
ref_props /= ref_props.sum()
cur_props /= cur_props.sum()

psi = np.sum((cur_props - ref_props) * np.log(cur_props / ref_props))
return float(psi)


def interpret_psi(psi: float) -> str:
if psi < 0.1:
return "stable"
elif psi < 0.2:
return "moderate_shift"
else:
return "significant_shift"


# Categorical features: Chi-squared test
from scipy import stats


def compute_categorical_drift(
reference: pd.Series,
current: pd.Series
) -> dict:
"""
Compute distribution drift for a categorical feature
using Chi-squared test.

Returns p-value (low p-value = significant drift detected).
"""
ref_counts = reference.value_counts(normalize=True)
cur_counts = current.value_counts(normalize=True)

# Align categories
all_cats = set(ref_counts.index) | set(cur_counts.index)
ref_aligned = np.array([ref_counts.get(c, 0.0) for c in all_cats])
cur_aligned = np.array([cur_counts.get(c, 0.0) for c in all_cats])

# Scale current to match reference count for chi-squared test
n_current = len(current)
expected = ref_aligned * n_current
observed = cur_aligned * n_current

# Avoid zero expected values (merge rare categories)
mask = expected >= 5
if mask.sum() < 2:
return {'test': 'chi2', 'result': 'insufficient_data', 'p_value': None}

chi2, p_value = stats.chisquare(observed[mask], f_exp=expected[mask])

return {
'test': 'chi2',
'chi2_stat': float(chi2),
'p_value': float(p_value),
'drift_detected': p_value < 0.05,
'new_categories': list(set(current.unique()) - set(reference.unique()))
}


# Continuous features: KS test
def compute_ks_drift(reference: np.ndarray, current: np.ndarray) -> dict:
"""
Kolmogorov-Smirnov test for continuous feature drift.
Tests if two samples come from the same distribution.
"""
ks_stat, p_value = stats.ks_2samp(reference, current)
return {
'test': 'ks',
'ks_statistic': float(ks_stat),
'p_value': float(p_value),
'drift_detected': p_value < 0.05
}

Evidently AI Integration

For teams wanting a higher-level toolkit, Evidently AI provides a report-based framework with prebuilt drift detectors:

from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.metrics import (
ColumnDriftMetric,
ColumnSummaryMetric,
DatasetMissingValuesMetric
)
import pandas as pd


def generate_feature_drift_report(
reference_df: pd.DataFrame,
current_df: pd.DataFrame,
feature_cols: list[str],
output_path: str = "feature_drift_report.html"
) -> dict:
"""
Generate a comprehensive feature drift report using Evidently AI.

Args:
reference_df: Training data features (reference distribution)
current_df: Current production features (window of recent predictions)
feature_cols: Feature columns to monitor
output_path: Where to save the HTML report

Returns:
Dictionary of drift metrics per feature
"""
report = Report(metrics=[
DataDriftPreset(),
DataQualityPreset(),
DatasetMissingValuesMetric(),
*[ColumnDriftMetric(column_name=col) for col in feature_cols],
*[ColumnSummaryMetric(column_name=col) for col in feature_cols],
])

report.run(
reference_data=reference_df[feature_cols],
current_data=current_df[feature_cols]
)

report.save_html(output_path)

# Extract drift results for programmatic use
result_dict = report.as_dict()
drift_summary = {}

for metric in result_dict.get('metrics', []):
if metric.get('metric') == 'ColumnDriftMetric':
col = metric['result']['column_name']
drift_summary[col] = {
'drift_detected': metric['result']['drift_detected'],
'stattest_name': metric['result'].get('stattest_name'),
'p_value': metric['result'].get('p_value'),
'drift_score': metric['result'].get('drift_score')
}

return drift_summary

Feature Importance-Weighted Alerting

Not all features deserve equal alert urgency. A null rate spike in the top-5 feature by SHAP importance can halve model performance. A null rate spike in the bottom-20 feature may have no measurable effect. Treating all alerts equally produces alert fatigue and causes teams to ignore critical signals.

import pandas as pd
import numpy as np
from enum import Enum


class AlertSeverity(Enum):
P1_PAGE = "P1_PAGE" # PagerDuty, wake someone up
P2_SLACK = "P2_SLACK" # Slack #ml-alerts, investigate within 4h
P3_LOG = "P3_LOG" # Log only, review in next business day standup


@dataclass
class FeatureImportance:
feature_name: str
shap_importance: float # mean |SHAP value| from latest model
importance_rank: int # 1 = most important


class ImportanceWeightedAlerter:
"""
Routes feature monitoring alerts to different channels
based on feature importance.

Rules:
- Top 10% features (by SHAP importance): any coverage/drift alert → P1
- Top 10-30% features: coverage/drift alert → P2
- Bottom 70% features: alert → P3 (log only, review at standup)

Override: staleness alerts are always at least P2 regardless of importance
(stale features affect all downstream predictions, not just this model)
"""

def __init__(self,
feature_importances: list[FeatureImportance],
p1_threshold: float = 0.10, # top 10% → P1
p2_threshold: float = 0.30): # top 30% → P2
self.importances = {
fi.feature_name: fi for fi in feature_importances
}
n = len(feature_importances)
self.p1_rank_cutoff = max(1, int(n * p1_threshold))
self.p2_rank_cutoff = max(1, int(n * p2_threshold))

def get_severity(self, feature_name: str,
alert_type: str, # "staleness", "coverage", "drift"
is_critical: bool = False) -> AlertSeverity:
"""
Determine alert severity based on feature importance and alert type.
"""
fi = self.importances.get(feature_name)

# Unknown feature - treat as P2 to be safe
if fi is None:
return AlertSeverity.P2_SLACK

# Staleness is always at least P2 (affects all predictions)
if alert_type == "staleness":
if is_critical:
return AlertSeverity.P1_PAGE
return AlertSeverity.P2_SLACK

# Coverage and drift: severity based on importance rank
if fi.importance_rank <= self.p1_rank_cutoff:
return AlertSeverity.P1_PAGE
elif fi.importance_rank <= self.p2_rank_cutoff:
return AlertSeverity.P2_SLACK
else:
return AlertSeverity.P3_LOG

def route_alert(self, feature_name: str,
alert_type: str,
message: str,
is_critical: bool = False):
"""Route an alert to the appropriate channel."""
severity = self.get_severity(feature_name, alert_type, is_critical)
fi = self.importances.get(feature_name)
rank_info = f"rank #{fi.importance_rank}" if fi else "unknown rank"

enriched_message = (
f"[{severity.value}] Feature: {feature_name} ({rank_info})\n"
f"Type: {alert_type}\n"
f"Message: {message}"
)

if severity == AlertSeverity.P1_PAGE:
self._page_oncall(enriched_message)
elif severity == AlertSeverity.P2_SLACK:
self._notify_slack(enriched_message)
else:
import logging
logging.getLogger(__name__).info(enriched_message)

def _page_oncall(self, message: str):
# Integration point: PagerDuty, OpsGenie, etc.
print(f"[PAGERDUTY] {message}")

def _notify_slack(self, message: str):
# Integration point: Slack webhook
print(f"[SLACK #ml-alerts] {message}")

Feature Store Consistency Monitoring

The online store and offline store can diverge. This happens when:

  • Materialization fails partway through (some entities in online, not offline)
  • Clock skew between pipeline runs produces slightly different values
  • A bug in the online serving path transforms values differently than the offline pipeline

Periodic consistency checks sample entities and compare online vs. offline values:

import random
from feast import FeatureStore
import pandas as pd
import numpy as np


def check_online_offline_consistency(
store: FeatureStore,
feature_view_name: str,
feature_names: list[str],
entity_col: str,
sample_size: int = 1000,
tolerance: float = 1e-6
) -> dict:
"""
Sample entities and compare online store values to offline store values.

A mismatch indicates training-serving skew - the features used during
training (offline) differ from the features served at prediction time (online).
"""
# Get all entity IDs (from a recent offline feature batch)
# In practice, sample from your entity registry or a recent feature table
all_entities = store.get_feature_view(feature_view_name).entities
sample_entities = random.sample(range(10000, 100000), sample_size)

entity_df = pd.DataFrame({entity_col: sample_entities})

# Online store lookup
online_features = store.get_online_features(
features=[f"{feature_view_name}:{f}" for f in feature_names],
entity_rows=[{entity_col: e} for e in sample_entities]
).to_df()

# Offline store lookup (point-in-time with current timestamp)
import datetime
entity_df['event_timestamp'] = datetime.datetime.now(datetime.timezone.utc)

offline_features = store.get_historical_features(
entity_df=entity_df,
features=[f"{feature_view_name}:{f}" for f in feature_names]
).to_df()

# Compare
mismatch_counts = {}
for feat in feature_names:
online_vals = online_features[feat].values
offline_vals = offline_features[feat].values

# Handle NaN: NaN == NaN should be True for consistency check
online_nan = pd.isna(online_vals)
offline_nan = pd.isna(offline_vals)

# Mismatches: one is NaN and the other isn't, or values differ
nan_mismatch = online_nan != offline_nan
value_mismatch = ~online_nan & ~offline_nan & (
np.abs(online_vals.astype(float) - offline_vals.astype(float))
> tolerance
)

total_mismatch = (nan_mismatch | value_mismatch).sum()
mismatch_counts[feat] = {
'mismatches': int(total_mismatch),
'mismatch_rate': round(total_mismatch / sample_size, 4),
'alert': total_mismatch / sample_size > 0.01 # alert if >1% mismatch
}

return {
'sample_size': sample_size,
'feature_view': feature_view_name,
'results': mismatch_counts,
'any_alerts': any(v['alert'] for v in mismatch_counts.values())
}

The Complete FeatureMonitor Class

Pulling all three monitoring dimensions together:

from dataclasses import dataclass, field
from typing import Callable, Optional
import pandas as pd
import numpy as np
import logging
from datetime import datetime, timezone


@dataclass
class MonitoringConfig:
feature_name: str
max_staleness_hours: float = 25.0
critical_staleness_hours: float = 48.0
baseline_null_rate: float = 0.0
null_rate_warning_delta: float = 0.05
null_rate_critical_delta: float = 0.15
psi_warning_threshold: float = 0.1
psi_critical_threshold: float = 0.2
shap_importance_rank: int = 999 # lower = more important


class FeatureMonitor:
"""
Unified feature health monitor covering staleness, coverage,
and distribution drift with importance-weighted alerting.
"""

def __init__(self,
configs: list[MonitoringConfig],
reference_data: dict[str, np.ndarray],
alert_p1: Callable[[str], None],
alert_p2: Callable[[str], None]):
self.configs = {c.feature_name: c for c in configs}
self.reference = reference_data
self.alert_p1 = alert_p1
self.alert_p2 = alert_p2
self.logger = logging.getLogger(self.__class__.__name__)

def run_full_check(self,
current_df: pd.DataFrame,
last_updated: dict[str, datetime]) -> pd.DataFrame:
"""
Run all three monitoring checks for all features.

Args:
current_df: Current feature values (one row per entity)
last_updated: Dict mapping feature_name to last materialization time

Returns:
DataFrame with check results for all features
"""
results = []
now = datetime.now(timezone.utc)

for feat_name, config in self.configs.items():
row = {'feature': feat_name,
'importance_rank': config.shap_importance_rank}

# 1. Staleness check
lu = last_updated.get(feat_name)
if lu is None:
row['staleness_hours'] = None
row['staleness_status'] = 'missing_metadata'
self._route_alert(config, 'P1',
f"{feat_name}: no last_updated timestamp found")
else:
if lu.tzinfo is None:
lu = lu.replace(tzinfo=timezone.utc)
age_h = (now - lu).total_seconds() / 3600
row['staleness_hours'] = round(age_h, 2)
if age_h > config.critical_staleness_hours:
row['staleness_status'] = 'critical'
self._route_alert(config, 'P1',
f"{feat_name}: {age_h:.1f}h old (critical threshold "
f"{config.critical_staleness_hours}h)")
elif age_h > config.max_staleness_hours:
row['staleness_status'] = 'stale'
self._route_alert(config, 'P2',
f"{feat_name}: {age_h:.1f}h old (warning threshold "
f"{config.max_staleness_hours}h)")
else:
row['staleness_status'] = 'ok'

# 2. Coverage check
if feat_name in current_df.columns:
null_rate = current_df[feat_name].isnull().mean()
row['null_rate'] = round(null_rate, 4)
delta = null_rate - config.baseline_null_rate
row['null_rate_delta'] = round(delta, 4)

if delta >= config.null_rate_critical_delta:
row['coverage_status'] = 'critical'
self._route_alert(config, 'P1',
f"{feat_name}: null rate {null_rate:.1%} "
f"(+{delta:.1%} above baseline)")
elif delta >= config.null_rate_warning_delta:
row['coverage_status'] = 'warning'
self._route_alert(config, 'P2',
f"{feat_name}: null rate {null_rate:.1%} "
f"(+{delta:.1%} above baseline)")
else:
row['coverage_status'] = 'ok'
else:
row['null_rate'] = None
row['coverage_status'] = 'missing_column'

# 3. Distribution drift check (PSI)
if feat_name in current_df.columns and feat_name in self.reference:
ref_vals = self.reference[feat_name]
cur_vals = current_df[feat_name].dropna().values
if len(cur_vals) > 100: # need sufficient data for PSI
psi = compute_psi(ref_vals, cur_vals)
row['psi'] = round(psi, 4)

if psi >= config.psi_critical_threshold:
row['drift_status'] = 'significant'
self._route_alert(config, 'P1',
f"{feat_name}: PSI={psi:.3f} "
f"(>={config.psi_critical_threshold} critical)")
elif psi >= config.psi_warning_threshold:
row['drift_status'] = 'moderate'
self._route_alert(config, 'P2',
f"{feat_name}: PSI={psi:.3f} "
f"(>={config.psi_warning_threshold} warning)")
else:
row['drift_status'] = 'stable'
else:
row['psi'] = None
row['drift_status'] = 'insufficient_data'
else:
row['psi'] = None
row['drift_status'] = 'no_reference'

results.append(row)

return pd.DataFrame(results)

def _route_alert(self, config: MonitoringConfig,
baseline_severity: str, message: str):
"""Route alert, downgrading severity for low-importance features."""
n_features = len(self.configs)
p1_cutoff = max(1, int(n_features * 0.10)) # top 10% → P1
p2_cutoff = max(1, int(n_features * 0.30)) # top 30% → P2

rank = config.shap_importance_rank
if baseline_severity == 'P1':
if rank <= p1_cutoff:
self.alert_p1(message)
elif rank <= p2_cutoff:
self.alert_p2(message)
else:
self.logger.warning(message) # P3: log only
elif baseline_severity == 'P2':
if rank <= p2_cutoff:
self.alert_p2(message)
else:
self.logger.warning(message)

Feature Monitoring Pipeline Architecture

The metrics collector is the heartbeat of the system. It runs on a schedule - every 5 minutes for streaming features, every hour for batch features - and publishes freshness, null rate, and PSI metrics to the metrics store. The metrics store is queryable by both the alerting engine (for real-time threshold checks) and the dashboard (for visualization). The audit log preserves all historical metrics for post-mortem analysis.


Production Engineering Notes

Setting Alert Thresholds Without Reference Data

When deploying a new feature for the first time, you do not yet have a stable baseline. The approach:

  1. Deployment phase (first 2 weeks): monitor and log, do not alert. Collect the baseline distribution.
  2. Baseline phase (week 3): compute baseline null rates and PSI reference distribution from weeks 1-2.
  3. Active monitoring phase (week 4+): enable alerting with thresholds set at 2 standard deviations from the observed baseline.

This prevents false alarms during initial deployment while ensuring monitoring is active once the feature is stable.

Monitoring Cadence by Feature Type

Feature TypeFreshness CheckCoverage CheckDrift Check
Real-time (streaming)Every 5 minEvery hourDaily
Near-real-time (micro-batch)Every 15 minEvery 4hDaily
Batch (daily)Every 2hDailyWeekly
Static (rarely changes)DailyWeeklyMonthly

Backpressure: When to Stop Serving Features

If a critical feature (P1 importance rank) has been stale for more than the critical threshold, serving predictions with that feature may be worse than falling back to a simpler model or rejecting the prediction. Build backpressure logic:

def should_serve_prediction(feature_freshness: dict[str, float],
model_critical_features: list[str],
max_critical_staleness_hours: float = 4.0) -> bool:
"""
Returns False if any critical feature is too stale to trust.
Caller should fall back to a default prediction or reject the request.
"""
for feat in model_critical_features:
staleness = feature_freshness.get(feat, float('inf'))
if staleness > max_critical_staleness_hours:
return False
return True

Common Mistakes

danger

Monitoring Only Model Output Metrics

"If the model is wrong, the precision/recall metrics will degrade and we'll know."

This is wrong for two reasons. First, model output metrics have detection lag - feature problems cause model degradation slowly, and by the time the metric crosses your alert threshold, the feature has been broken for days or weeks. Second, model output degradation is ambiguous - you cannot distinguish between a feature pipeline failure, concept drift, and model staleness from output metrics alone. Feature monitoring provides early, specific, actionable signals. Monitoring model outputs without monitoring model inputs is like monitoring a car's speedometer without monitoring fuel level, tire pressure, or engine temperature.

danger

Monitoring Without Feature Importance Weighting

If you monitor 200 features with equal alert thresholds, and each feature has a 1% probability of triggering a false alarm on any given day, you expect about 2 false alarms per day. Over a week, 14 false alarms. Alert fatigue sets in within days. Engineers start ignoring alerts. Then a real critical feature failure fires, it is ignored along with the noise, and the model silently degrades.

Weight alerts by SHAP feature importance. The top 10% of features by importance should have tight thresholds and P1/P2 routing. The bottom 50% can have wide thresholds and P3 (log-only) routing. You want 0-2 alerts per week that require action - not 14 alerts per week, most of which are noise.

warning

Using the Wrong Reference Distribution

PSI measures drift relative to a reference. If your reference is stale (e.g., you computed the reference distribution in January, and it is now November), seasonal patterns will appear as "drift" even if the feature is healthy. Update your reference distribution periodically - typically when you retrain the model. The reference distribution should match the training data distribution that the current production model was trained on.

warning

Not Monitoring Coverage by Segment

Aggregate null rates hide segment-specific failures. A pipeline that fails for all Android users will show a null rate increase equal to the Android fraction of your user base - which might be 20-30%, a detectable anomaly. But a pipeline that fails for users running a specific app version (5% of users) will increase aggregate null rate by only 5%, which may be within normal variance. Always monitor coverage segmented by the most relevant entity attributes: platform, device type, user cohort, geography.


Interview Questions and Answers

Q1: What are the three dimensions of feature monitoring and what failure does each catch?

The three dimensions are staleness, coverage, and distribution drift. Staleness monitoring catches pipeline failures - the feature computation job crashed, was delayed, or silently produced empty output. Coverage monitoring catches data quality failures - the fraction of entities with null feature values increased, signaling a broken upstream data source, schema change, or vendor API failure. Distribution drift monitoring catches world-change failures - the statistical distribution of feature values shifted because user behavior, seasonality, or the underlying data-generating process changed. Each dimension catches a different root cause and requires different remediation: staleness requires fixing the pipeline, coverage requires fixing the data source, and drift requires investigating whether a model retrain is needed.

Q2: Design a feature monitoring system for 200 features serving 15 models.

The design requires four components. First, a metrics collector that runs on a schedule (5 minutes for streaming features, hourly for batch features) and computes freshness, null rate, and PSI for all 200 features. Use a shared feature store API to pull both the last materialization timestamps and current feature values. Second, a metrics store - InfluxDB or BigQuery time-series - to retain 90 days of metric history per feature. This enables anomaly detection against historical baselines and powers dashboards. Third, an importance-weighted alert engine. Compute SHAP importances for each feature across all 15 models, taking the max importance rank across models (a feature in the top-10 of any model deserves high priority). Route P1 alerts (top 10% features) to PagerDuty, P2 (top 30%) to Slack, P3 to log-only. Target 0-2 actionable alerts per week. Fourth, a dashboard with a feature health grid: one cell per feature, colored by worst-dimension status (green/yellow/red). Allow drilling into any feature to see 30-day history of null rate and PSI. This architecture scales horizontally - adding more features requires updating the config, not the architecture.

Q3: How do you set alert thresholds without causing alert fatigue?

Four principles. First, use importance-weighted routing, not importance-weighted thresholds - keep thresholds consistent but route low-importance feature alerts to low-priority channels. This prevents alert fatigue without lowering the bar for critical features. Second, compute baseline distributions from a stable reference window (2-4 weeks of healthy operation), then set thresholds at 2-3 standard deviations from the baseline variance for null rate, and use published PSI thresholds (0.1/0.2) for distribution drift. Third, test your thresholds before enabling them: replay 30 days of historical metrics through the alerting logic and count false alarms. If you would have received more than 5 actionable alerts per week historically, tighten thresholds or reclassify severity. Fourth, schedule a quarterly threshold review. As features mature and stabilize, their normal variance narrows and thresholds can be tightened. As models are retrained and reference distributions updated, PSI thresholds need recalibration.

Q4: A fraud model's precision drops from 0.87 to 0.71 over 6 weeks. Walk me through how feature monitoring would have caught this earlier.

With feature monitoring in place, on day one of the vendor API change causing device_risk_score to return 0.0 for 34% of users: the hourly coverage check fires. The null rate for device_risk_score (treating 0.0 as a default/null indicator, or detecting the spike in 0.0 values via PSI) exceeds the warning threshold. If device_risk_score is in the top-30 features by SHAP importance (it is, given it's one of 12 features), a P2 alert fires to Slack. The on-call data engineer investigates the feature pipeline within 4 hours, identifies the parser failure, deploys a fix, and the feature is restored. Total exposure: a few hours, not 6 weeks. The model never degrades below 0.84 because the bad feature values serve only a few thousand predictions before the fix. No post-mortem needed - the root cause was caught before it had time to accumulate.

Q5: What is PSI and how do you interpret its values?

PSI (Population Stability Index) measures the shift between two distributions by binning each into equal-frequency buckets, computing the proportion of values in each bin for both distributions, and computing (PiQi)ln(Pi/Qi)\sum (P_i - Q_i) \cdot \ln(P_i / Q_i). The standard interpretation: PSI below 0.1 means the distributions are stable - no action needed. Between 0.1 and 0.2 means moderate shift - investigate whether the shift is explained by a known factor (new user cohort, seasonal pattern) or is anomalous. Above 0.2 means significant shift - the model was likely trained on a very different distribution, and retraining should be evaluated. PSI has the nice property that it is symmetric in the sense that it measures how much the distributions differ, but note that it is not symmetric in computation - PSI of current vs. reference is not the same as PSI of reference vs. current. Always compute with training data as the reference (PP) and production data as the current (QQ).

Q6: How do you handle the cold start problem in feature monitoring - when you have no historical baseline for a new feature?

Cold start monitoring has three phases. In the shadow phase (first two weeks), the new feature is computed and logged but not served to the model. Monitoring is active but alert thresholds are disabled. Collect the baseline distribution and null rate variance. In the baseline phase (week three), compute the reference distribution, mean null rate, and null rate standard deviation from the shadow period. Set initial alert thresholds conservatively (3 standard deviations for null rate, 0.2 PSI for drift). In the active phase (week four onwards), enable alerts. Refine thresholds after 30 days of active monitoring - if you received more false alarms than expected, widen thresholds slightly. Update the reference distribution whenever the model is retrained, because the retrained model may have been trained on data from a different time window with a different feature distribution.

© 2026 EngineersOfAI. All rights reserved.