Skip to main content

Logging for ML Systems

The Bug Nobody Could Reproduce

A user files a complaint: "The fraud detection system flagged my legitimate purchase three times last week. This is the third time this year." The ML team wants to investigate. They need to see what features the model received for those three transactions, what score it produced, and whether something was anomalous about the feature values.

The answer: they can't. The model server logs {"level": "INFO", "message": "Prediction complete", "score": 0.87}. No feature values. No request context. No transaction ID. No feature freshness timestamps. The logs tell you that a prediction happened, not what the prediction was based on.

Without detailed prediction logs, you cannot:

  • Debug individual wrong predictions
  • Build datasets for model evaluation from production data
  • Detect data quality issues that affect specific users
  • Comply with explainability regulations (GDPR Article 22, EU AI Act)
  • Perform post-incident analysis ("what did the model see?")

This lesson shows you how to design an ML logging system that supports all of these needs.

:::tip 🎮 Interactive Playground Visualize this concept: Try the LLM Observability demo on the EngineersOfAI Playground - no code required. :::

The Three Types of Logs for ML Systems

Traditional software logging serves developers during debugging. ML systems need three distinct log types with different consumers, retention policies, and storage backends:

Structured Operational Logs

Operational logs should always be structured JSON. Free-text logs are not queryable, not parseable by log aggregation tools, and not consistent across services.

import logging
import json
import time
import structlog

# Configure structlog for JSON output
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer() # JSON output
],
logger_factory=structlog.stdlib.LoggerFactory(),
)

logger = structlog.get_logger("fraud-model")


# Good structured logging
logger.info(
"prediction_complete",
request_id="req-abc123",
model_version="v2.1.0",
score=0.87,
decision="declined",
feature_fetch_ms=23,
inference_ms=44,
total_ms=71,
feature_cache_hit=True,
user_segment="high_value"
)

# Produces:
# {"event": "prediction_complete", "request_id": "req-abc123", "model_version": "v2.1.0",
# "score": 0.87, "decision": "declined", "feature_fetch_ms": 23, "inference_ms": 44,
# "total_ms": 71, "feature_cache_hit": true, "user_segment": "high_value",
# "level": "info", "timestamp": "2026-03-14T03:17:22.341Z", "logger": "fraud-model"}

# Bad unstructured logging - avoid this
logging.info(f"Prediction complete: score=0.87, decision=declined for user=user123")

Log Levels for ML Systems

# Use log levels consistently

logger.debug("Feature values computed", features=features) # only in DEBUG mode
logger.info("Prediction complete", request_id=rid, score=score) # normal operation
logger.warning("Feature cache miss - falling back to DB", user_id=uid, feature="txn_history")
logger.warning("Prediction score near threshold", score=0.501, threshold=0.5) # edge case
logger.error("Feature fetch failed - returning fallback", error=str(e), request_id=rid)
logger.critical("Model failed to load - all predictions failing", model_path=path)

Prediction Logging - The Core ML Log

The prediction log is the most important log in any ML system. It records enough information to:

  1. Reproduce the exact prediction for debugging
  2. Join with ground truth later for model evaluation
  3. Run drift analysis on feature distributions
  4. Investigate individual user complaints
import uuid
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
import json

@dataclass
class PredictionLogEntry:
"""Immutable record of a single model prediction."""
# Identification
prediction_id: str # globally unique
request_id: str # from HTTP header (client-provided)
session_id: str # from cookie/session
timestamp_utc: str # ISO 8601

# Model metadata
model_name: str
model_version: str
model_framework: str # pytorch, sklearn, etc.

# Input features (key subset - full features may be too large)
features_summary: dict # store key features; full features to separate store
feature_store_snapshot_id: str # pointer to full feature snapshot in object store

# Output
raw_score: float
decision: str # approved / declined / review
decision_threshold: float

# Quality signals
feature_freshness_max_age_seconds: int # age of the stalest feature used
feature_null_count: int # how many features were null/imputed
model_confidence: str # high/medium/low based on score distance from threshold

# Context (for cohort analysis)
user_segment: str
channel: str # mobile / web / api
geographic_region: str

def create_prediction_log(
request_id: str,
model_name: str,
model_version: str,
features: dict,
score: float,
decision: str,
feature_metadata: dict
) -> PredictionLogEntry:
"""Create a structured prediction log entry."""
threshold = 0.5

# Summarize key features (don't log all 200 features to operational DB)
features_summary = {
k: v for k, v in features.items()
if k in ["credit_score", "debt_to_income", "account_age_months",
"monthly_income_bucket", "transaction_count_7d"]
}

confidence = "high" if abs(score - threshold) > 0.3 else \
"medium" if abs(score - threshold) > 0.1 else "low"

return PredictionLogEntry(
prediction_id=str(uuid.uuid4()),
request_id=request_id,
session_id=request_id, # substitute if no separate session ID
timestamp_utc=datetime.now(timezone.utc).isoformat(),
model_name=model_name,
model_version=model_version,
model_framework="pytorch",
features_summary=features_summary,
feature_store_snapshot_id=feature_metadata.get("snapshot_id", ""),
raw_score=score,
decision=decision,
decision_threshold=threshold,
feature_freshness_max_age_seconds=feature_metadata.get("max_age_seconds", 0),
feature_null_count=sum(1 for v in features.values() if v is None),
model_confidence=confidence,
user_segment=feature_metadata.get("user_segment", "unknown"),
channel=feature_metadata.get("channel", "unknown"),
geographic_region=feature_metadata.get("region", "unknown"),
)

async def log_prediction_async(entry: PredictionLogEntry):
"""
Write prediction log to both operational logs (Loki) and
data warehouse (BigQuery) asynchronously.
"""
log_dict = asdict(entry)

# 1. Operational log (low latency, 30-day retention)
logger.info("prediction_logged", **log_dict)

# 2. Data warehouse (for evaluation, drift analysis)
await bigquery_client.insert_rows_json(
table="ml_predictions.fraud_model_predictions",
json_rows=[log_dict]
)

Joining Prediction Logs with Ground Truth

# Delayed evaluation pipeline (runs daily)
import pandas as pd
from google.cloud import bigquery

def evaluate_predictions_from_30_days_ago():
"""Join 30-day-old predictions with now-available ground truth."""
bq = bigquery.Client()

query = """
SELECT
p.prediction_id,
p.model_version,
p.raw_score,
p.decision,
p.user_segment,
p.channel,
p.geographic_region,
p.timestamp_utc as prediction_time,
g.outcome, -- fraud/not_fraud (from chargeback processing)
g.outcome_timestamp
FROM
ml_predictions.fraud_model_predictions p
INNER JOIN
business.fraud_outcomes g
ON
p.request_id = g.transaction_id
WHERE
DATE(p.timestamp_utc) = DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
AND g.outcome IS NOT NULL -- ground truth is available
"""

df = bq.query(query).to_dataframe()

# Compute overall and per-cohort AUC
from sklearn.metrics import roc_auc_score
overall_auc = roc_auc_score(df["outcome"] == "fraud", df["raw_score"])

by_segment = df.groupby("user_segment").apply(
lambda g: roc_auc_score(g["outcome"] == "fraud", g["raw_score"])
if len(g) > 100 else None
).dropna()

return {
"evaluation_date": (datetime.today() - timedelta(days=30)).strftime("%Y-%m-%d"),
"model_version": df["model_version"].mode()[0],
"n_predictions": len(df),
"overall_auc": overall_auc,
"auc_by_segment": by_segment.to_dict()
}

Audit Logs for Regulated ML Models

For models making high-stakes decisions (credit, insurance, hiring, medical), audit logs are legally required in many jurisdictions. They differ from prediction logs in their immutability, retention period, and access controls.

import hashlib
import json
from datetime import datetime, timezone

class AuditLog:
"""
Immutable audit log for regulated ML decisions.
Written to append-only, tamper-evident storage (S3 Object Lock).
"""

@staticmethod
def create_audit_entry(
prediction_entry: PredictionLogEntry,
model_explanation: dict, # SHAP values or Anchors for the decision
regulatory_context: dict # relevant regulatory category, geographic jurisdiction
) -> dict:
entry = {
"audit_id": str(uuid.uuid4()),
"prediction_id": prediction_entry.prediction_id,
"timestamp_utc": datetime.now(timezone.utc).isoformat(),

# Decision details
"model_name": prediction_entry.model_name,
"model_version": prediction_entry.model_version,
"raw_score": prediction_entry.raw_score,
"decision": prediction_entry.decision,
"decision_threshold": prediction_entry.decision_threshold,

# Explanation (required by GDPR Art. 22 for automated decisions)
"top_features_positive": model_explanation.get("top_positive_features", []),
"top_features_negative": model_explanation.get("top_negative_features", []),
"explanation_method": model_explanation.get("method", "shap"),

# Regulatory context
"jurisdiction": regulatory_context.get("jurisdiction", "US"),
"regulatory_category": regulatory_context.get("category", "credit"),
"adverse_action_reasons": None, # populated if decision is declined

# Integrity (tamper detection)
"entry_hash": None # computed below
}

# Adverse action reasons (required by US Fair Credit Reporting Act)
if prediction_entry.decision == "declined":
entry["adverse_action_reasons"] = [
f["feature_name"] for f in
model_explanation.get("top_negative_features", [])[:3]
]

# Hash the entry for tamper detection
entry_str = json.dumps(entry, sort_keys=True, default=str)
entry["entry_hash"] = hashlib.sha256(entry_str.encode()).hexdigest()

return entry

@staticmethod
async def write_audit_log(entry: dict):
"""Write to S3 with Object Lock (WORM - Write Once, Read Many)."""
s3_key = (
f"audit-logs/fraud-model/"
f"{entry['timestamp_utc'][:10]}/"
f"{entry['audit_id']}.json"
)
await s3_client.put_object(
Bucket="company-ml-audit-logs",
Key=s3_key,
Body=json.dumps(entry),
ContentType="application/json",
# Object Lock: cannot be deleted or overwritten for 7 years
ObjectLockMode="COMPLIANCE",
ObjectLockRetainUntilDate=datetime.now() + timedelta(days=365 * 7)
)

Log Aggregation with Loki

Grafana Loki is the modern choice for ML log aggregation - it uses the same label model as Prometheus and integrates natively with Grafana for unified dashboards.

# Loki is installed via helm (grafana/loki-stack)
# Promtail agent runs as DaemonSet and ships pod logs to Loki

# Query logs in Grafana using LogQL:
# All prediction logs for a specific model version:
{namespace="ml-prod", pod=~"fraud-model-.*"} |= "prediction_logged" | json
| model_version = "v2.1.0"

# Error rate over time:
rate({namespace="ml-prod", pod=~"fraud-model-.*"} |= "error" [5m])

# Find predictions for a specific request_id (user complaint investigation):
{namespace="ml-prod"} | json | request_id = "req-abc123"

# Feature cache miss rate:
rate({namespace="ml-prod"} |= "feature_cache_miss" [5m]) /
rate({namespace="ml-prod"} |= "prediction_complete" [5m])

Promtail Configuration for ML Pods

# promtail configmap - configured to add useful labels from pod metadata
scrapeConfigs:
- job_name: ml-pods
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
target_label: app
- source_labels: [__meta_kubernetes_namespace]
target_label: namespace
- source_labels: [__meta_kubernetes_pod_label_version]
target_label: model_version # label each log line with model version
pipeline_stages:
- json:
expressions:
level: level
event: event
model_version: model_version
- labels:
level:
event:
model_version:

Production Notes

Log sampling for high-throughput services: at 10,000 predictions per second, logging every prediction to the data warehouse creates 864 million rows per day. For most models, a 10% sample is sufficient for drift analysis and performance monitoring. Log 100% for a 24-hour window after each new model deployment.

import random

PREDICTION_LOG_SAMPLE_RATE = float(os.environ.get("PREDICTION_LOG_SAMPLE_RATE", "0.10"))

async def maybe_log_prediction(entry: PredictionLogEntry):
"""Log prediction to warehouse at the configured sample rate."""
# Always log operational events (INFO level to Loki)
logger.info("prediction_complete", ...)

# Sample for warehouse logging
if random.random() < PREDICTION_LOG_SAMPLE_RATE:
await write_to_warehouse(entry)

# Always log for high-stakes cases (near-threshold, high-value transactions)
if entry.model_confidence == "low" or entry.raw_score > 0.9:
await write_to_warehouse(entry) # always log uncertain and high-confidence fraud

Correlation IDs across service boundaries: for tracing a prediction through multiple services (API → feature server → inference server → result aggregator), use a single request_id passed through all service calls as an HTTP header (X-Request-ID). Log the same request_id in every service's logs for easy cross-service filtering.

Common Mistakes

:::danger Not Logging Feature Values Logging only score: 0.87 without the input features makes debugging impossible. When a user complains about a wrong decision, you need to know what the model received. When investigating model degradation, you need to know what features looked like at the time. The most common objection is PII (Personally Identifiable Information) - address it with feature hashing or pseudonymization, not by eliminating feature logging entirely.

Log a summary of key features (the 5–10 most important ones by SHAP value) to operational logs. Log the full feature vector to the data warehouse with appropriate access controls. :::

:::warning Using print() Instead of a Logging Framework print() statements do not include timestamps, log levels, or structured fields. They cannot be filtered by level. They cannot be parsed by log aggregation tools. They cannot be suppressed in production without code changes. Use structlog or the standard logging module with JSON formatting. The operational overhead of switching from print() to structured logging is 30 minutes; the debugging time saved over the lifetime of the model is hundreds of hours. :::

:::warning Not Implementing Prediction Logging Before Deployment Prediction logging is most valuable from day 1. If you launch a model without prediction logging, the first 30–60 days of production data are lost for model evaluation purposes. You can never go back and reconstruct "what did the model receive on day 3 post-launch." Treat prediction logging as a pre-deployment requirement, not an afterthought. :::

Interview Q&A

Q1: What are the three types of logs in an ML system and what is each used for?

(1) Operational logs: real-time debugging for engineers during incidents. Structured JSON with event type, latency, error codes, and request context. 30-day retention. Stored in Loki or CloudWatch. Consumer: on-call engineers. (2) Prediction logs: historical record of every prediction (or a sample) including input features, output scores, metadata, and context. Used for delayed model evaluation (join with ground truth 30–60 days later), drift analysis (query feature distributions over time), and debugging individual user complaints. 1–3 year retention. Stored in BigQuery or Snowflake. Consumer: data scientists and monitoring systems. (3) Audit logs: legally required record for regulated models (credit, medical, insurance). Immutable, tamper-evident, 5–7 year retention. Must include the decision, input features, model explanation (why the decision was made), and regulatory context. Stored in WORM (Write Once, Read Many) object storage. Consumer: compliance and legal teams.

Q2: What should a prediction log entry contain, and why does each field matter?

Required fields: prediction_id (unique identifier for this prediction, enables tracing), request_id (client-provided, enables cross-service correlation), timestamp (when the prediction was made, enables temporal analysis), model_name and model_version (which model made the prediction, enables version-specific analysis), input features (what the model received - essential for debugging and drift analysis), raw output score (the actual probability, not just the decision), decision (the actionable output, e.g., approved/declined), decision_threshold (the cutoff used - may change over time), feature freshness metrics (were any features stale?), feature null count (data quality signal), model confidence (how far from the decision boundary?), and cohort context (user segment, channel, geography - for cohort-based performance analysis). Optional but valuable: SHAP values for the top features (for debugging and compliance), feature store snapshot ID (pointer to the complete feature set stored separately).

Q3: How do you implement audit logging for a credit decision model that complies with the US Fair Credit Reporting Act (FCRA)?

The FCRA requires: adverse action notices must explain why credit was denied in terms the applicant can understand. Implementation: (1) for declined predictions, extract the top 3 most impactful negative features from the SHAP explanation and convert them to human-readable adverse action reasons (e.g., SHAP shows monthly_debt_payments as most negative → "High monthly debt obligations relative to income"). (2) Store the adverse action reasons in the audit log along with the full SHAP explanation, the model version, the input features, and the decision. (3) Make audit logs immutable (S3 Object Lock, 7-year retention minimum). (4) Ensure the feature names used for adverse action notices are mapped to legally acceptable plain-English descriptions - maintain a feature name → adverse action reason mapping. (5) Log the jurisdiction and regulatory category so compliance teams can filter by applicable law.

Q4: How do you design prediction logging for a high-throughput model that handles 50,000 requests per second without adding meaningful latency?

Two key techniques: (1) Asynchronous logging - write prediction logs to a buffer (in-memory queue) and flush to the data warehouse in background batches. Never block the prediction response on log writes. Use asyncio queues or a background thread pool. (2) Sampling - at 50,000 RPS, log 100% to the operational log system (Loki, lightweight) but sample 1–5% to the data warehouse (expensive). Use stratified sampling: always log uncertain predictions (score near 0.5), always log predictions for new user segments, and randomly sample the rest. This gives complete operational observability and statistically representative warehouse data. Additionally, use Kafka or Kinesis as an intermediate buffer - predictions are written to a Kafka topic (microseconds latency), and a separate consumer reads from Kafka and writes to BigQuery (decoupled, can retry failed writes).

Q5: Walk through how you would use prediction logs to investigate a user complaint that the fraud model incorrectly declined their payment.

Step 1: get the transaction ID from the user's complaint. Query the prediction log table: SELECT * FROM fraud_model_predictions WHERE request_id = 'txn-xyz789'. Step 2: examine the log entry - what score did the model produce (e.g., 0.73), what was the threshold (0.5), and what were the key input features? Step 3: check feature freshness - was any feature stale at prediction time? (feature_freshness_max_age_seconds > 3600 would indicate a potential issue). Step 4: look at the SHAP values or adverse action reasons - which features drove the high fraud score? Step 5: compare the user's feature values to their historical distribution - was this a statistically unusual transaction for this user? Step 6: if the prediction appears incorrect given correct features, add the case to a human review queue and consider retraining with this type of edge case. If features were wrong (stale, null, schema mismatch), that's a data pipeline bug - fix the pipeline, consider reprocessing the affected predictions.

© 2026 EngineersOfAI. All rights reserved.