:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Drift Detection demo on the EngineersOfAI Playground - no code required. :::
Custom Data Monitoring
The $50k Alternative
The engineering manager opened the procurement request. Three observability platforms. All required annual contracts. The cheapest: 180,000 per year.
The team's data stack was 200 tables in a single Redshift cluster. They had four engineers. The stack was well-understood - they had built most of it themselves. A six-figure observability contract felt out of proportion to the problem.
They decided to build. Three weeks of engineering time. The result: a Python monitoring service running on a 25/month. Engineering cost: three weeks, amortized over two years of operation.
Two years later, the system was still running. It had caught 23 data quality incidents before they reached consumers. It had been extended by three more engineers who had never been involved in building it. The cost was never revisited.
Custom monitoring is not always the right answer. But for mid-size teams with a focused data stack and cost sensitivity, it can cover 100% of your critical tables at a fraction of the cost of any commercial platform. The key is building it with the right architecture from the start - one that is simple enough to maintain, extensible enough to grow with your data estate, and reliable enough to actually alert on real problems.
Why Build Custom
The commercial observability platforms are genuinely excellent. The question is not whether they work - they do. The question is whether the cost is justified for your specific situation.
Build custom monitoring when:
- Your data stack has fewer than 300 tables and you understand the failure modes of all of them
- Your budget is constrained and you have engineering capacity for a 2–3 week build
- Your monitoring needs are primarily rule-based, not anomaly-detection-based
- You have domain-specific checks that commercial tools cannot implement (business logic validation, cross-table consistency checks, custom ML feature distribution checks)
Buy a commercial platform when:
- You have 500+ tables and cannot afford to monitor them all manually
- You need coverage of unknown unknowns - anomalies in tables you never thought to check
- You have a dedicated data platform team whose time is better spent on higher-level work
- A single missed data incident (affecting revenue calculations or an ML model) would cost more than the annual platform cost
The hybrid approach that often makes the most sense: custom monitoring for known business rules on critical tables, plus a commercial platform for broad coverage of unknown anomalies.
The Custom Monitoring Architecture
A well-designed custom monitoring system has four layers:
Building the Metrics Store
Before writing any monitoring logic, build the metrics store. This is the PostgreSQL table (or tables) that stores every check result with a timestamp. Having a persistent time series of check results is what enables trending, baselining, and retrospective investigation.
-- Create the metrics store schema
CREATE SCHEMA IF NOT EXISTS data_observability;
-- Core check results table
CREATE TABLE data_observability.check_results (
id BIGSERIAL PRIMARY KEY,
checked_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
table_name TEXT NOT NULL,
pillar TEXT NOT NULL, -- freshness, volume, schema, distribution, lineage
check_name TEXT NOT NULL,
passed BOOLEAN,
metric_value JSONB, -- the actual measured value(s)
threshold JSONB, -- the threshold(s) used
notes TEXT,
alert_sent BOOLEAN DEFAULT FALSE,
alert_severity TEXT -- critical, warning, info
);
-- Index for fast querying by table and time
CREATE INDEX idx_check_results_table_time
ON data_observability.check_results (table_name, checked_at DESC);
-- Schema snapshots for drift detection
CREATE TABLE data_observability.schema_snapshots (
id BIGSERIAL PRIMARY KEY,
captured_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
table_name TEXT NOT NULL,
schema_json JSONB NOT NULL, -- {col_name: {dtype, nullable, position}}
column_count INT
);
-- Partitioned by day for efficient pruning
CREATE INDEX idx_schema_snapshots_table_time
ON data_observability.schema_snapshots (table_name, captured_at DESC);
-- View: latest check status per table per pillar (for dashboard)
CREATE OR REPLACE VIEW data_observability.latest_check_status AS
SELECT DISTINCT ON (table_name, pillar)
table_name,
pillar,
check_name,
passed,
metric_value,
threshold,
notes,
checked_at,
alert_severity
FROM data_observability.check_results
ORDER BY table_name, pillar, checked_at DESC;
-- View: daily health summary per table (traffic light for dashboard)
CREATE OR REPLACE VIEW data_observability.daily_health_summary AS
SELECT
DATE(checked_at) AS check_date,
table_name,
COUNT(*) AS total_checks,
SUM(CASE WHEN passed = TRUE THEN 1 ELSE 0 END) AS passed_checks,
SUM(CASE WHEN passed = FALSE AND alert_severity = 'critical' THEN 1 ELSE 0 END) AS critical_failures,
SUM(CASE WHEN passed = FALSE AND alert_severity = 'warning' THEN 1 ELSE 0 END) AS warning_failures,
ROUND(
100.0 * SUM(CASE WHEN passed = TRUE THEN 1 ELSE 0 END) / COUNT(*),
1
) AS pass_rate_pct
FROM data_observability.check_results
WHERE checked_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY DATE(checked_at), table_name
ORDER BY check_date DESC, table_name;
The Core Monitoring Class
The CustomDataMonitor class is the central component. It connects to the warehouse, runs all configured checks, stores results, and routes alerts.
import psycopg2
import json
import logging
import os
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import scipy.stats as scipy_stats
import pytz
import requests
logger = logging.getLogger("data_monitor")
class CustomDataMonitor:
"""
Production-grade custom data monitoring system.
Covers all five observability pillars with SQL-based checks.
Stores all results in a PostgreSQL metrics store.
Routes alerts to Slack (warnings) and PagerDuty (critical).
"""
def __init__(
self,
warehouse_conn_string: str,
metrics_conn_string: str,
slack_webhook_url: str = None,
pagerduty_routing_key: str = None,
):
self.warehouse_dsn = warehouse_conn_string
self.metrics_dsn = metrics_conn_string
self.slack_webhook = slack_webhook_url
self.pd_key = pagerduty_routing_key
def _warehouse(self):
return psycopg2.connect(self.warehouse_dsn)
def _metrics(self):
return psycopg2.connect(self.metrics_dsn)
def _store(self, result: Dict) -> None:
"""Persist a check result to the metrics store."""
with self._metrics() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO data_observability.check_results
(table_name, pillar, check_name, passed,
metric_value, threshold, notes, alert_severity)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
result["table"],
result["pillar"],
result.get("check_name", result["pillar"]),
result["passed"],
json.dumps(result.get("metrics", {})),
json.dumps(result.get("thresholds", {})),
result.get("reason"),
result.get("severity", "info"),
))
conn.commit()
# ── Freshness ───────────────────────────────────────────────────────────
def check_freshness(
self,
table: str,
timestamp_col: str,
sla_hours: float,
) -> Dict:
with self._warehouse() as conn:
with conn.cursor() as cur:
cur.execute(f"SELECT MAX({timestamp_col}) FROM {table}")
last_updated = cur.fetchone()[0]
if last_updated is None:
result = {
"table": table, "pillar": "freshness",
"passed": False, "severity": "critical",
"reason": "Table appears to be empty - no rows found",
}
else:
if last_updated.tzinfo is None:
last_updated = last_updated.replace(tzinfo=pytz.UTC)
lag_hours = (datetime.now(tz=pytz.UTC) - last_updated).total_seconds() / 3600
passed = lag_hours <= sla_hours
result = {
"table": table, "pillar": "freshness",
"passed": passed,
"severity": "critical" if not passed else "info",
"metrics": {"lag_hours": round(lag_hours, 2), "last_updated": str(last_updated)},
"thresholds": {"sla_hours": sla_hours},
"reason": f"Lag {lag_hours:.1f}h > SLA {sla_hours}h" if not passed else None,
}
self._store(result)
return result
# ── Volume ───────────────────────────────────────────────────────────────
def check_volume(
self,
table: str,
date_col: str,
alert_pct: float = 0.25,
lookback_days: int = 14,
) -> Dict:
with self._warehouse() as conn:
with conn.cursor() as cur:
cur.execute(f"""
SELECT DATE({date_col}) AS dt, COUNT(*) AS cnt
FROM {table}
WHERE {date_col} >= CURRENT_DATE - INTERVAL '{lookback_days} days'
GROUP BY DATE({date_col})
ORDER BY dt
""")
rows = cur.fetchall()
if len(rows) < 3:
result = {"table": table, "pillar": "volume", "passed": None, "reason": "Insufficient history"}
self._store(result)
return result
counts = [r[1] for r in rows]
today = counts[-1]
baseline = counts[:-1]
mean = np.mean(baseline)
std = np.std(baseline)
pct_dev = abs(today - mean) / mean if mean > 0 else 0
z = (today - mean) / std if std > 0 else 0
passed = pct_dev <= alert_pct
result = {
"table": table, "pillar": "volume",
"passed": passed,
"severity": "critical" if not passed else "info",
"metrics": {
"today_count": today,
"baseline_mean": round(mean, 1),
"pct_deviation": round(pct_dev * 100, 1),
"z_score": round(z, 2),
},
"thresholds": {"alert_pct": alert_pct * 100},
"reason": f"Volume {pct_dev*100:.1f}% from baseline (z={z:.2f})" if not passed else None,
}
self._store(result)
return result
# ── Schema ───────────────────────────────────────────────────────────────
def check_schema(self, table: str) -> Dict:
"""
Capture current schema and compare to most recent snapshot.
On first run, captures baseline and returns passed=True.
"""
schema_name, table_name = table.split(".", 1)
with self._warehouse() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT column_name, data_type, is_nullable, ordinal_position
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
ORDER BY ordinal_position
""", (schema_name, table_name))
current_schema = {
r[0]: {"dtype": r[1], "nullable": r[2], "position": r[3]}
for r in cur.fetchall()
}
# Get previous snapshot from metrics store
with self._metrics() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT schema_json FROM data_observability.schema_snapshots
WHERE table_name = %s
ORDER BY captured_at DESC
LIMIT 1
""", (table,))
row = cur.fetchone()
prev_schema = json.loads(row[0]) if row else None
# Store current snapshot
with conn.cursor() as cur:
cur.execute("""
INSERT INTO data_observability.schema_snapshots
(table_name, schema_json, column_count)
VALUES (%s, %s, %s)
""", (table, json.dumps(current_schema), len(current_schema)))
conn.commit()
if prev_schema is None:
result = {
"table": table, "pillar": "schema",
"passed": True, "severity": "info",
"reason": "Baseline schema captured",
"metrics": {"column_count": len(current_schema)},
}
self._store(result)
return result
# Diff schemas
changes = []
for col in prev_schema:
if col not in current_schema:
changes.append({"type": "REMOVED", "column": col, "severity": "critical"})
for col in current_schema:
if col not in prev_schema:
changes.append({"type": "ADDED", "column": col, "severity": "info"})
elif current_schema[col]["dtype"] != prev_schema[col]["dtype"]:
changes.append({
"type": "TYPE_CHANGED", "column": col, "severity": "critical",
"old_type": prev_schema[col]["dtype"],
"new_type": current_schema[col]["dtype"],
})
critical = [c for c in changes if c["severity"] == "critical"]
passed = len(critical) == 0
result = {
"table": table, "pillar": "schema",
"passed": passed,
"severity": "critical" if critical else ("warning" if changes else "info"),
"metrics": {"changes": changes, "change_count": len(changes)},
"reason": f"{len(critical)} critical schema changes" if critical else None,
}
self._store(result)
return result
# ── Distribution ─────────────────────────────────────────────────────────
def check_null_rates(
self,
table: str,
columns: List[str],
max_null_rate_pct: float = 5.0,
alert_on_increase_pct: float = 10.0,
) -> Dict:
"""
Check null rates for specified columns.
Alerts if null rate exceeds threshold OR has increased by more than alert_on_increase_pct.
"""
null_checks = "\n".join([
f" SUM(CASE WHEN {col} IS NULL THEN 1 ELSE 0 END)::float / COUNT(*) AS null_rate_{col},"
for col in columns
]).rstrip(",")
with self._warehouse() as conn:
with conn.cursor() as cur:
cur.execute(f"""
SELECT {null_checks}, COUNT(*) AS total_rows
FROM {table}
""")
row = cur.fetchone()
if not row:
return {"table": table, "pillar": "distribution", "passed": None}
results_by_col = {}
for i, col in enumerate(columns):
null_rate = row[i] * 100 if row[i] is not None else 0
results_by_col[col] = {
"null_rate_pct": round(null_rate, 2),
"exceeded_threshold": null_rate > max_null_rate_pct,
}
failed_cols = [c for c, v in results_by_col.items() if v["exceeded_threshold"]]
passed = len(failed_cols) == 0
result = {
"table": table, "pillar": "distribution",
"check_name": "null_rates",
"passed": passed,
"severity": "critical" if failed_cols else "info",
"metrics": results_by_col,
"thresholds": {"max_null_rate_pct": max_null_rate_pct},
"reason": f"High null rates in: {', '.join(failed_cols)}" if failed_cols else None,
}
self._store(result)
return result
# ── Alert Routing ─────────────────────────────────────────────────────────
def route_alert(self, result: Dict) -> None:
"""Route check failures to the appropriate channel based on severity."""
if result.get("passed") is True:
return # No alert needed for passing checks
severity = result.get("severity", "info")
if severity == "info":
return # Log only, no alert
table = result.get("table", "unknown")
pillar = result.get("pillar", "unknown")
reason = result.get("reason", "No details")
if severity == "warning" and self.slack_webhook:
self._send_slack(
level="warning",
title=f"Data Quality Warning: {table}",
message=f"*Pillar*: {pillar}\n*Issue*: {reason}",
color="#f59e0b",
)
elif severity == "critical":
if self.slack_webhook:
self._send_slack(
level="critical",
title=f":red_circle: Data Quality CRITICAL: {table}",
message=f"*Pillar*: {pillar}\n*Issue*: {reason}",
color="#dc2626",
)
if self.pd_key:
self._send_pagerduty(
summary=f"Data quality critical: {table} / {pillar}",
details=result,
severity="critical",
)
def _send_slack(
self, level: str, title: str, message: str, color: str
) -> None:
payload = {
"attachments": [{
"color": color,
"title": title,
"text": message,
"footer": "data-observability-monitor",
"ts": int(datetime.utcnow().timestamp()),
}]
}
try:
resp = requests.post(self.slack_webhook, json=payload, timeout=5)
resp.raise_for_status()
except Exception as e:
logger.error(f"Slack alert failed: {e}")
def _send_pagerduty(
self, summary: str, details: Dict, severity: str = "critical"
) -> None:
payload = {
"routing_key": self.pd_key,
"event_action": "trigger",
"payload": {
"summary": summary,
"severity": severity,
"source": "data-observability-monitor",
"custom_details": details,
},
}
try:
resp = requests.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload, timeout=5,
)
resp.raise_for_status()
except Exception as e:
logger.error(f"PagerDuty alert failed: {e}")
def run_checks(self, config: List[Dict]) -> List[Dict]:
"""Run all configured checks and route alerts for failures."""
results = []
for check in config:
try:
pillar = check["pillar"]
if pillar == "freshness":
r = self.check_freshness(
check["table"], check["timestamp_col"], check["sla_hours"]
)
elif pillar == "volume":
r = self.check_volume(
check["table"], check["date_col"],
check.get("alert_pct", 0.25),
)
elif pillar == "schema":
r = self.check_schema(check["table"])
elif pillar == "null_rates":
r = self.check_null_rates(
check["table"], check["columns"],
check.get("max_null_rate_pct", 5.0),
)
else:
continue
self.route_alert(r)
results.append(r)
status = "PASS" if r.get("passed") else "FAIL"
logger.info(f"[{status}] {pillar} - {check['table']}")
except Exception as e:
logger.error(f"Check error for {check.get('table')}: {e}")
return results
Statistical Baselines Without ML
Commercial platforms use ML models to set baselines. For custom monitoring, you can get 80% of the value with three statistical techniques that require no ML infrastructure:
1. Rolling Mean ± Sigma
The simplest baseline: compute the rolling mean and standard deviation over the past N days. Alert when the current value exceeds mean ± k*sigma where k is typically 2 or 3.
This works well for metrics with roughly normal daily variation. It does not handle seasonality (tables with consistent weekly patterns will generate false positives on weekends).
2. Week-Over-Week Comparison
For tables with weekly seasonality, compare today's value to the same day of the week in the previous N weeks. This naturally handles weekday/weekend variation.
def check_volume_wow(
conn,
table: str,
date_col: str,
lookback_weeks: int = 4,
alert_pct: float = 0.20,
) -> Dict:
"""
Week-over-week volume check.
Compares today's count to the same weekday in prior N weeks.
Handles weekly seasonality that rolling-mean methods miss.
"""
with conn.cursor() as cur:
cur.execute(f"""
SELECT DATE({date_col}), EXTRACT(DOW FROM {date_col}), COUNT(*)
FROM {table}
WHERE {date_col} >= CURRENT_DATE - INTERVAL '{lookback_weeks * 7 + 1} days'
GROUP BY DATE({date_col}), EXTRACT(DOW FROM {date_col})
ORDER BY 1
""")
rows = cur.fetchall()
today_dow = datetime.now().weekday()
today_count = None
same_dow_counts = []
for date, dow, count in rows:
if date == datetime.now().date():
today_count = count
elif int(dow) == today_dow:
same_dow_counts.append(count)
if today_count is None or len(same_dow_counts) < 2:
return {"passed": None, "reason": "Insufficient history for WoW comparison"}
baseline_mean = np.mean(same_dow_counts[-lookback_weeks:])
pct_dev = abs(today_count - baseline_mean) / baseline_mean if baseline_mean > 0 else 0
passed = pct_dev <= alert_pct
return {
"table": table, "pillar": "volume", "check_name": "volume_wow",
"passed": passed,
"metrics": {
"today_count": today_count,
"wow_baseline_mean": round(baseline_mean, 1),
"pct_deviation": round(pct_dev * 100, 1),
},
"reason": f"WoW volume deviation {pct_dev*100:.1f}%" if not passed else None,
}
3. CUSUM for Slow Drift Detection
The rolling mean and WoW checks detect sudden changes. CUSUM (Cumulative Sum Control Chart) detects slow, gradual drift - the kind of change that is too small to alert on any single day but represents a real trend over weeks.
where is the target mean and is the allowable slack. Alert when , the decision threshold.
def cusum_drift_detect(
values: List[float],
target_mean: float,
slack_k: float,
decision_h: float,
) -> Tuple[bool, float]:
"""
CUSUM detection for slow drift.
Returns (drift_detected, current_cusum_value).
Parameters:
values: time-series of metric values (oldest first)
target_mean: expected mean value
slack_k: allowable deviation per step (usually 0.5 * sigma)
decision_h: alert threshold (usually 4-5 * sigma)
"""
s_pos = 0.0 # Cumulative sum for upward drift
s_neg = 0.0 # Cumulative sum for downward drift
for x in values:
s_pos = max(0, s_pos + (x - target_mean) - slack_k)
s_neg = max(0, s_neg - (x - target_mean) - slack_k)
drift_detected = (s_pos > decision_h) or (s_neg > decision_h)
return drift_detected, max(s_pos, s_neg)
Business Logic Checks
The checks described so far are generic - they apply to any table. Business logic checks are domain-specific assertions about the semantics of your data. These are the checks that commercial platforms cannot implement for you because they require business knowledge.
BUSINESS_LOGIC_CHECKS = [
# Financial data constraints
{
"name": "revenue_always_positive",
"table": "mart.revenue_metrics",
"sql": """
SELECT COUNT(*) AS violation_count
FROM mart.revenue_metrics
WHERE DATE(created_at) = CURRENT_DATE - 1
AND revenue_usd < 0
""",
"pass_condition": "violation_count = 0",
"severity": "critical",
},
# Temporal consistency
{
"name": "no_future_event_timestamps",
"table": "events.purchase_events",
"sql": """
SELECT COUNT(*) AS future_rows
FROM events.purchase_events
WHERE created_at > NOW() + INTERVAL '1 hour'
""",
"pass_condition": "future_rows = 0",
"severity": "critical",
},
# Referential integrity (no FK in the warehouse schema)
{
"name": "all_orders_have_valid_users",
"sql": """
SELECT COUNT(*) AS orphaned_orders
FROM mart.orders o
LEFT JOIN mart.users u ON o.user_id = u.user_id
WHERE u.user_id IS NULL
AND DATE(o.created_at) = CURRENT_DATE - 1
""",
"pass_condition": "orphaned_orders = 0",
"severity": "critical",
},
# ML-specific: feature value sanity
{
"name": "churn_score_in_range",
"sql": """
SELECT
COUNT(*) AS out_of_range_count,
MIN(churn_score) AS min_score,
MAX(churn_score) AS max_score
FROM ml.user_features
WHERE churn_score < 0 OR churn_score > 1
""",
"pass_condition": "out_of_range_count = 0",
"severity": "critical",
},
# Monotonicity check (user count should never decrease)
{
"name": "cumulative_users_monotone",
"sql": """
WITH daily AS (
SELECT DATE(created_at) AS dt, COUNT(DISTINCT user_id) AS daily_users
FROM events.user_registrations
WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY DATE(created_at)
),
with_cumsum AS (
SELECT dt, daily_users,
SUM(daily_users) OVER (ORDER BY dt) AS cumulative_users
FROM daily
)
SELECT COUNT(*) AS violations
FROM with_cumsum
WHERE cumulative_users < LAG(cumulative_users) OVER (ORDER BY dt)
""",
"pass_condition": "violations = 0",
"severity": "warning",
},
]
def run_business_logic_checks(conn, checks: list) -> List[Dict]:
"""Run all business logic SQL checks and return results."""
results = []
for check in checks:
try:
with conn.cursor() as cur:
cur.execute(check["sql"])
row = cur.fetchone()
col_names = [d[0] for d in cur.description]
result_row = dict(zip(col_names, row))
# Evaluate pass condition
condition = check["pass_condition"]
col, op, val = condition.split()
actual_val = result_row.get(col, None)
if actual_val is None:
passed = None
elif op == "=":
passed = actual_val == int(val)
elif op == "=0":
passed = actual_val == 0
else:
passed = True # Unknown operator - don't fail
results.append({
"check_name": check["name"],
"pillar": "business_logic",
"passed": passed,
"metrics": result_row,
"severity": check.get("severity", "warning"),
"reason": f"Business logic violation: {check['name']}" if not passed else None,
})
except Exception as e:
results.append({
"check_name": check["name"],
"pillar": "business_logic",
"passed": False,
"severity": "critical",
"reason": f"Check error: {e}",
})
return results
Grafana Dashboard Design
The Grafana dashboard is where the metrics store becomes actionable. A well-designed data health dashboard has three sections:
Section 1: Overall health - traffic light grid. One cell per table, green/yellow/red based on whether all checks passed, some checks warned, or any check failed today. This is the overview that the on-call engineer checks first thing each morning.
Section 2: Trend charts - one panel per pillar. Row count over 30 days (volume pillar), freshness lag over 30 days, null rate over 30 days per critical column. Trends show you whether a problem is new or has been developing gradually.
Section 3: Active failures - table of failing checks. Sorted by severity, then by detection time. Links to the relevant alert in Slack.
// Grafana panel query for traffic light table
// (Grafana table panel with thresholds)
{
"targets": [{
"rawSql": "SELECT table_name, MAX(CASE WHEN pillar = 'freshness' THEN CASE WHEN passed THEN 1 ELSE 0 END END) AS freshness_ok, MAX(CASE WHEN pillar = 'volume' THEN CASE WHEN passed THEN 1 ELSE 0 END END) AS volume_ok, MAX(CASE WHEN pillar = 'schema' THEN CASE WHEN passed THEN 1 ELSE 0 END END) AS schema_ok, MIN(CASE WHEN passed = FALSE THEN 0 ELSE 1 END) AS overall_health FROM data_observability.check_results WHERE checked_at >= NOW() - INTERVAL '24 hours' GROUP BY table_name ORDER BY overall_health, table_name",
"format": "table"
}],
"fieldConfig": {
"overrides": [{
"matcher": {"id": "byName", "options": "overall_health"},
"properties": [{
"id": "thresholds",
"value": {
"mode": "absolute",
"steps": [
{"value": 0, "color": "red"},
{"value": 1, "color": "green"}
]
}
}]
}]
}
}
:::danger Do not skip the metrics store - it is not optional Many teams build monitoring scripts that send Slack alerts but store no historical data. When an incident occurs, they cannot answer "when did this start?" or "has this happened before?" or "is this getting better or worse?" A metrics store with 30 days of history turns every alert into an investigatable event. Build the metrics store first, before writing any checks. :::
:::warning Alert routing must be designed before launch, not added later
A monitoring system where all alerts go to a single #data-alerts channel is a monitoring system that nobody reads. Before your first alert fires in production, define: which tables belong to which team, which severity levels go to Slack vs. PagerDuty, how to suppress duplicate alerts for an ongoing incident. Alert routing that is not designed upfront becomes chaos the first time three critical alerts fire simultaneously.
:::
Interview Q&A
Q: How would you design a custom data monitoring system for a 200-table Redshift warehouse with a 3-person data team?
A: I would build a four-layer architecture. Layer one is the SQL check runner - a Python class that executes freshness, volume, schema, and null rate checks as SQL queries against Redshift. Checks run as Airflow tasks at the end of each pipeline DAG. Layer two is a PostgreSQL metrics store - a check_results table with timestamps that stores every check result, making it a time series you can trend over time and retrospectively investigate. Layer three is an alert router that classifies results as info/warning/critical and routes warnings to a team-specific Slack channel and criticals to PagerDuty. Layer four is a Grafana dashboard connected to the metrics store showing a traffic light health grid and 30-day trend charts. The whole system costs roughly $30/month in infrastructure and takes about 3 weeks to build well. The critical design choice is to build the metrics store first - without historical data, alerts have no context.
Q: What is the difference between a rolling mean baseline and a week-over-week baseline for volume monitoring, and when do you use each?
A: A rolling mean baseline computes the average row count over the past N days and alerts when today's count deviates by more than a percentage or sigma from that average. It works well for tables with consistent daily behavior. The problem is weekly seasonality: a B2B SaaS platform might have 5x more events on weekdays than weekends. A rolling mean baseline that includes weekends will have a lower average, so Monday's high count always looks like an anomaly. A week-over-week baseline solves this by comparing today's count to the same weekday in prior weeks: Monday compared to previous Mondays, Sunday to previous Sundays. I use rolling mean for tables with no weekly pattern (real-time event tables, IoT data) and week-over-week for any table that follows business hours or business days.
Q: What are business logic checks and why can't commercial observability platforms replace them?
A: Business logic checks are SQL assertions that enforce domain-specific invariants about your data - rules that derive from the semantics of your business, not just statistical properties. Examples: revenue must always be positive, event timestamps must never be in the future, every order must reference a valid user, cumulative user count must be monotonically non-decreasing. These checks cannot be automated by ML-based platforms like Monte Carlo because they require business knowledge - understanding what the columns mean and what values are semantically valid. A column named revenue_usd that contains a -500 value is not a statistical anomaly if the system has seen negative values before; it is a semantic violation of your domain. Business logic checks and statistical anomaly detection are complementary: statistical checks catch unexpected distributional changes, business logic checks enforce invariants that must always hold regardless of distribution.
Q: How do you handle alert deduplication to prevent alert fatigue?
A: Alert fatigue is the silent killer of monitoring programs. If the same failed check fires an alert every 30 minutes for 6 hours, engineers stop reading alerts. The key design patterns are: first, track whether an alert has already been sent for an ongoing failure. Store an alert_sent_at timestamp in the check results table and suppress repeat alerts for the same table/pillar combination until the check passes and then fails again. Second, implement an incident lifecycle: when a check fails, create an incident record. Send the initial alert. Send a follow-up only if the severity escalates (warning becomes critical) or if the incident has been open for longer than the SLA without acknowledgment. Third, implement a cooldown period - do not re-alert for the same issue within N hours. In practice, I use a 4-hour cooldown for warnings and 1-hour escalation triggers for criticals.
Q: How do you monitor a table that has no timestamp column?
A: Tables without timestamp columns require a different approach for freshness monitoring. Several strategies work depending on the table type. For tables loaded by an ETL pipeline, use the Airflow task completion time as the proxy - if the load task completed but the table has fewer rows than expected, that is a freshness/volume failure even without a timestamp. For slowly changing dimension tables, use the last _updated_at metadata timestamp if available, or query the warehouse's table metadata (e.g., SVV_TABLE_INFO in Redshift, INFORMATION_SCHEMA.TABLE_STORAGE in BigQuery) for last modification time. For complete-reload tables (every run drops and recreates the table), monitor the absence of expected rows using a hash of the expected row count range. The fallback when none of these work is process-level monitoring: monitor the Airflow task completion time and row count delta between runs, not the data timestamps directly.
Q: How do you extend a custom monitoring system to cover ML feature tables specifically?
A: ML feature tables need additional checks beyond the standard five pillars. The key additions are: feature value range checks (each feature must be within its expected range, derived from training data statistics), feature distribution drift (compare the current distribution of each feature to the distribution at training time using KS tests), missing feature checks (if a feature is expected for every user, check for gaps), and feature correlation stability (for features that are historically highly correlated, sudden decorrelation is a signal of a data quality issue). I store the training data distribution as a JSON snapshot in the metrics store at model training time, then compare each inference batch's feature distributions against that snapshot. If any feature shows a KS test p-value below 0.05 on the comparison, I alert - this means the feature distribution has shifted significantly from training, which is a predictor of model performance degradation.
