Anomaly Detection on Sensor Data
Reading time: ~45 min · Interview relevance: Very High · Target roles: ML Engineer, Data Scientist, Industrial AI Engineer
The Signal in the Noise
A power plant in Texas runs 47 gas turbines. Each turbine has 350 sensors. They sample at frequencies ranging from 1 Hz for temperatures to 1,000 Hz for vibration. At any given moment, the facility is generating 350 x 47 x average_frequency = several hundred thousand sensor readings per second. Every one of those readings is, in the normal case, boring. The turbine is running. Everything is fine.
But 0.001% of the time, one of those readings is not fine. A bearing is beginning to fail. A compressor blade is cracking. A seal is leaking. The abnormal reading is buried in millions of normal readings, and it looks, at first glance, like just another minor fluctuation in an inherently noisy signal. The question is: can you detect that 0.001% of readings that matter before they become 100% of your problem - a failure that costs $2 million in repairs and two weeks of lost generation capacity?
This is the anomaly detection problem for industrial sensor data. It is superficially similar to anomaly detection in cybersecurity or fraud detection, but it has several characteristics that make it distinctly harder. The data is high-frequency and multivariate: you have hundreds of correlated sensors, and anomalies often manifest as changes in correlation patterns rather than in any individual sensor's absolute value. The noise is structured, not random: sensor readings have diurnal cycles, seasonal trends, and load-dependent baselines that must be accounted for before "anomaly" has a meaningful definition. The labels are almost nonexistent: real anomalies are rare, and the historical record of labeled fault events is sparse, inconsistent, and often wrong.
The methods that work in this domain span statistics, classical ML, and deep learning. None of them is a magic solution. Each makes different assumptions about the data generating process and fails in different ways. This lesson teaches you how to reason about which method to deploy, how to combine them into a robust system, and how to build the real-time infrastructure that processes sensor streams at production scale.
Why This Exists
Why Thresholds Fail
The default approach to industrial anomaly detection is threshold-based alarming: if temperature exceeds 85C, alert. If vibration amplitude exceeds 2.5 mm/s, alert. These thresholds are set by engineers based on equipment specifications, adjusted over time based on experience.
Thresholds fail in two directions. First, they generate false positives: during a production surge, every temperature rises to within 3 degrees of the threshold. The alarm triggers on every peak load shift. Operators learn to dismiss it. Second, they miss gradual degradation: a bearing that is slowly failing may never exceed the static threshold until hours before catastrophic failure. The temperature rises from 70C to 75C to 78C over three weeks - each reading is below 85C. The threshold misses the trend entirely.
ML-based anomaly detection addresses both problems. By learning what "normal" looks like in context - including the normal temperature at high load, the normal diurnal cycle, the normal correlation between bearing temperature and ambient temperature - the model can flag deviations from expected behavior even when the absolute values are within range. A temperature of 78C at current load and ambient conditions, where the model expects 71C, is anomalous even though it is below the static threshold.
The Curse of Rare Labels
The most commonly cited challenge in industrial anomaly detection is the absence of labeled anomaly examples. This is partially accurate but somewhat overstated. Here is the more nuanced picture:
Truly labeled failure events are rare - perhaps 5-20 per year per large facility. These are the events that made it into the maintenance log, were serious enough to cause downtime, and had their sensor data preserved.
Operational degradation periods - where the equipment was running abnormally but had not yet failed - are never labeled systematically. The maintenance log says "bearing replaced on March 3" but does not tell you that the bearing was degraded from February 15 onward.
Normal data is abundant. Any industrial sensor system collects months or years of normal operation data.
This scarcity structure (abundant normal, rare anomaly) motivates unsupervised and semi-supervised approaches. You train on normal data only, then flag deviations. The challenge: "deviation from normal" must be carefully defined. The model must learn the full structure of normal behavior, not just mean values.
Historical Context
Statistical process control (SPC) is the foundation. Walter Shewhart at Bell Labs developed the control chart in 1924 - a visualization tool for distinguishing "common cause" variation (random noise within a stable process) from "special cause" variation (an actual change in the process). Shewhart's insight: not all variation is a problem. The control chart defines the normal variation range and flags readings outside it as requiring investigation.
W. Edwards Deming introduced SPC to Japanese manufacturing in the 1950s, laying the groundwork for Toyota's quality revolution and eventually Six Sigma. The fundamental concepts - control limits, process capability, common vs special cause - remain the conceptual foundation for modern sensor anomaly detection.
The CUSUM (Cumulative Sum) control chart (Page, 1954) improved on Shewhart charts for detecting small, sustained shifts - exactly the gradual degradation pattern that Shewhart charts miss. EWMA (Exponentially Weighted Moving Average) charts followed, providing smooth tracking of process mean with built-in forgetting.
The ML era for anomaly detection began seriously around 2003 with Liu et al.'s work that eventually became Isolation Forest (2008). The key insight: anomalies are "few and different" - they can be isolated with fewer random partitions than normal points. Isolation Forest remains one of the most deployment-ready anomaly detection algorithms.
The deep learning era brought LSTM autoencoders (2015-2016), which showed that sequence models could learn multivariate temporal patterns and detect anomalies as reconstruction errors. The Transformer era brought TranAD (2022), which uses dual-phase adversarial training with transformer encoders and has achieved state-of-the-art results on multiple industrial benchmark datasets.
Core Concepts
Types of Anomalies
Point anomalies: A single measurement that is unusual compared to the rest. Temperature reading of 200C in a context where normal is 70-80C. These are the easiest to detect with simple thresholding.
Contextual anomalies: A value that is unusual given the context but would be normal in a different context. Temperature of 82C at low motor load (anomalous) vs at full load (normal). Context can be temporal (time of day, season), operational (load level, production mode), or environmental (ambient conditions). Classical thresholds cannot handle contextual anomalies. Condition-aware models that incorporate load/operating mode as features can.
Collective anomalies: A sequence of observations that is anomalous as a whole, even if individual points are not. No single vibration reading exceeds the threshold, but the correlation structure of the vibration signal over the past 30 minutes has shifted - indicating a change in the rotating machinery's dynamic behavior. These are the hardest to detect and the most diagnostically valuable. LSTM and transformer-based methods capture collective anomalies through sequence modeling.
Statistical Baselines
Shewhart Control Charts (X-bar, R): Compute the process mean and standard deviation from historical data. Normal operation is within . Points outside this range are anomalies. Simple, interpretable, the basis for all manufacturing quality control.
CUSUM (Cumulative Sum): Accumulates deviations from the target mean. Detects small, sustained shifts that Shewhart charts miss. The CUSUM statistic:
where is the allowance (usually for detecting 1-sigma shifts). Alert when or (threshold , typically ). CUSUM has theoretically optimal detection properties for detecting a step change in mean.
EWMA (Exponentially Weighted Moving Average): Smoothed estimate of the current process mean, with weight on the current observation:
Control limits are where and are design parameters. EWMA is sensitive to gradual trends and is computationally simple - useful for real-time deployment on edge devices.
LSTM Autoencoder for Multivariate Anomaly Detection
The LSTM autoencoder architecture:
- Encoder: LSTM processes the input window (multivariate, T timesteps, D features each) and produces a compressed latent representation
- Decoder: Another LSTM reconstructs the input sequence from
- Training: Minimize reconstruction error on normal data only. The model learns to reconstruct normal patterns well.
- Inference: Feed new window through encoder-decoder. High reconstruction error = anomaly.
The reconstruction error is computed per sensor per timestep. This gives you an anomaly score that is localized - you can see not just "this window is anomalous" but "sensor 3 in timesteps 15-20 is the anomalous part." This diagnostic localization is essential for operators who need to know what to look at.
Code Examples
1. CUSUM Control Chart Implementation
"""
CUSUM and EWMA control charts for sensor data monitoring.
These statistical methods are the first line of defense - simple,
fast, and work well for detecting step changes and gradual drifts.
"""
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import List, Tuple, Optional
@dataclass
class CUSUMState:
"""Running state of a CUSUM control chart."""
c_plus: float = 0.0 # Upper CUSUM statistic
c_minus: float = 0.0 # Lower CUSUM statistic
alert_upper: bool = False
alert_lower: bool = False
class CUSUMChart:
"""
CUSUM control chart for detecting sustained shifts in sensor mean.
Parameters:
- k: allowance = sigma * delta/2 where delta is the shift to detect
k = sigma/2 is standard for detecting 1-sigma shifts
- h: decision threshold, typically 4-5 * sigma
- mu0: target mean (in-control mean)
- sigma: process standard deviation (estimated from normal data)
"""
def __init__(
self,
mu0: float,
sigma: float,
k: Optional[float] = None,
h: Optional[float] = None
):
self.mu0 = mu0
self.sigma = sigma
self.k = k if k is not None else sigma / 2 # Detect 1-sigma shifts
self.h = h if h is not None else 5 * sigma # 5-sigma threshold
self.state = CUSUMState()
def update(self, x: float) -> CUSUMState:
"""Process a single new observation, return current state."""
# Upper CUSUM: detects upward shifts
self.state.c_plus = max(
0,
self.state.c_plus + (x - self.mu0) - self.k
)
# Lower CUSUM: detects downward shifts
self.state.c_minus = max(
0,
self.state.c_minus - (x - self.mu0) - self.k
)
self.state.alert_upper = self.state.c_plus > self.h
self.state.alert_lower = self.state.c_minus > self.h
# Reset after alert (optional - depends on use case)
if self.state.alert_upper:
self.state.c_plus = 0.0
if self.state.alert_lower:
self.state.c_minus = 0.0
return CUSUMState(
c_plus=self.state.c_plus,
c_minus=self.state.c_minus,
alert_upper=self.state.alert_upper,
alert_lower=self.state.alert_lower
)
def fit(self, normal_data: np.ndarray) -> "CUSUMChart":
"""Estimate process parameters from historical normal data."""
self.mu0 = float(np.mean(normal_data))
self.sigma = float(np.std(normal_data))
self.k = self.sigma / 2
self.h = 5 * self.sigma
print(f"CUSUM fitted: mu0={self.mu0:.3f}, sigma={self.sigma:.3f}, "
f"k={self.k:.3f}, h={self.h:.3f}")
return self
def process_batch(
self, data: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Process a batch of observations.
Returns (c_plus_series, c_minus_series, alerts)
"""
c_plus_vals = []
c_minus_vals = []
alerts = []
for x in data:
state = self.update(float(x))
c_plus_vals.append(state.c_plus)
c_minus_vals.append(state.c_minus)
alerts.append(state.alert_upper or state.alert_lower)
return (
np.array(c_plus_vals),
np.array(c_minus_vals),
np.array(alerts)
)
class EWMAChart:
"""
EWMA control chart - exponentially weighted moving average.
More sensitive to gradual trends than Shewhart, simpler than CUSUM.
lambda_: smoothing factor. Small (0.05-0.1) = more smoothing = more sensitive to small shifts
L: control limit multiplier, typically 2.7-3.0
"""
def __init__(
self,
mu0: float,
sigma: float,
lambda_: float = 0.2,
L: float = 3.0
):
self.mu0 = mu0
self.sigma = sigma
self.lambda_ = lambda_
self.L = L
self.z_current = mu0 # Current EWMA value
# Asymptotic control limits
self.ucl = mu0 + L * sigma * np.sqrt(lambda_ / (2 - lambda_))
self.lcl = mu0 - L * sigma * np.sqrt(lambda_ / (2 - lambda_))
def update(self, x: float) -> Tuple[float, bool]:
"""
Update EWMA with new observation.
Returns (ewma_value, is_alert)
"""
self.z_current = self.lambda_ * x + (1 - self.lambda_) * self.z_current
is_alert = self.z_current > self.ucl or self.z_current < self.lcl
return self.z_current, is_alert
def fit(self, normal_data: np.ndarray) -> "EWMAChart":
"""Estimate control limits from normal data."""
self.mu0 = float(np.mean(normal_data))
self.sigma = float(np.std(normal_data))
self.z_current = self.mu0
self.ucl = self.mu0 + self.L * self.sigma * np.sqrt(
self.lambda_ / (2 - self.lambda_)
)
self.lcl = self.mu0 - self.L * self.sigma * np.sqrt(
self.lambda_ / (2 - self.lambda_)
)
return self
2. LSTM Autoencoder for Multivariate Anomaly Detection
"""
LSTM Autoencoder for multivariate sensor anomaly detection.
Trained on normal sensor data, detects anomalies as reconstruction errors.
Works with correlated multivariate signals where the anomaly is a
change in correlation structure, not just a change in individual sensors.
"""
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from typing import Tuple, List, Optional
import pandas as pd
class SensorWindowDataset(Dataset):
"""
Sliding window dataset for sensor time series.
Each sample is a window of length seq_len with n_sensors channels.
Target is the same as input (reconstruction objective).
"""
def __init__(
self,
data: np.ndarray, # (n_timesteps, n_sensors)
seq_len: int = 60,
stride: int = 1
):
self.seq_len = seq_len
windows = []
for i in range(0, len(data) - seq_len + 1, stride):
windows.append(data[i:i+seq_len])
self.windows = np.array(windows, dtype=np.float32)
def __len__(self):
return len(self.windows)
def __getitem__(self, idx):
x = torch.tensor(self.windows[idx])
return x, x # input = target for autoencoder
class LSTMEncoder(nn.Module):
"""Encodes input sequence to a fixed-size latent vector."""
def __init__(self, input_size: int, hidden_size: int, n_layers: int = 1):
super().__init__()
self.lstm = nn.LSTM(
input_size, hidden_size, n_layers,
batch_first=True, dropout=0.2 if n_layers > 1 else 0.0
)
self.hidden_size = hidden_size
self.n_layers = n_layers
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, tuple]:
# x: (batch, seq_len, input_size)
output, (h_n, c_n) = self.lstm(x)
# Return last hidden state as bottleneck representation
return h_n[-1], (h_n, c_n) # (batch, hidden_size)
class LSTMDecoder(nn.Module):
"""Reconstructs input sequence from latent vector."""
def __init__(self, hidden_size: int, output_size: int, seq_len: int, n_layers: int = 1):
super().__init__()
self.seq_len = seq_len
self.lstm = nn.LSTM(
hidden_size, hidden_size, n_layers,
batch_first=True, dropout=0.2 if n_layers > 1 else 0.0
)
self.output_layer = nn.Linear(hidden_size, output_size)
def forward(self, z: torch.Tensor) -> torch.Tensor:
# z: (batch, hidden_size) - repeat across seq_len
z_repeated = z.unsqueeze(1).repeat(1, self.seq_len, 1) # (batch, seq_len, hidden)
output, _ = self.lstm(z_repeated)
reconstruction = self.output_layer(output) # (batch, seq_len, output_size)
return reconstruction
class LSTMAnomalyDetector(nn.Module):
"""
LSTM Autoencoder for sensor anomaly detection.
Architecture:
- Encoder: LSTM compresses input window to latent vector
- Decoder: LSTM reconstructs input window from latent vector
- Anomaly score: per-timestep reconstruction error (MSE)
"""
def __init__(
self,
n_sensors: int,
seq_len: int = 60,
hidden_size: int = 64,
latent_size: int = 32,
n_layers: int = 1
):
super().__init__()
self.n_sensors = n_sensors
self.seq_len = seq_len
self.encoder = LSTMEncoder(n_sensors, hidden_size, n_layers)
# Compress further to latent_size
self.bottleneck = nn.Linear(hidden_size, latent_size)
self.expand = nn.Linear(latent_size, hidden_size)
self.decoder = LSTMDecoder(hidden_size, n_sensors, seq_len, n_layers)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
x: (batch, seq_len, n_sensors)
Returns: (batch, seq_len, n_sensors) reconstruction
"""
h, _ = self.encoder(x)
z = self.bottleneck(h)
h_expanded = self.expand(z)
reconstruction = self.decoder(h_expanded)
return reconstruction
def compute_anomaly_scores(
self, x: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Compute per-window and per-sensor anomaly scores.
Returns:
window_scores: (batch,) scalar anomaly score per window
sensor_scores: (batch, n_sensors) anomaly contribution per sensor
"""
with torch.no_grad():
reconstruction = self.forward(x)
# Per-timestep, per-sensor reconstruction error
errors = (x - reconstruction) ** 2 # (batch, seq_len, n_sensors)
# Aggregate over time for per-sensor score
sensor_scores = errors.mean(dim=1) # (batch, n_sensors)
# Window score: max over time and sensors (worst-case)
window_scores = errors.max(dim=2)[0].max(dim=1)[0] # (batch,)
return window_scores, sensor_scores
class SensorAnomalyPipeline:
"""
Complete pipeline: preprocessing, model, threshold, alerting.
"""
def __init__(
self,
n_sensors: int,
seq_len: int = 60,
hidden_size: int = 64
):
self.scaler = StandardScaler()
self.model = LSTMAnomalyDetector(n_sensors, seq_len, hidden_size)
self.threshold = None
self.seq_len = seq_len
self.n_sensors = n_sensors
def fit(
self,
normal_data: np.ndarray, # (n_timesteps, n_sensors)
n_epochs: int = 50,
batch_size: int = 64,
learning_rate: float = 1e-3,
device: str = "cpu"
) -> "SensorAnomalyPipeline":
"""Train autoencoder on normal sensor data."""
# Normalize
data_scaled = self.scaler.fit_transform(normal_data)
# Dataset and dataloader
dataset = SensorWindowDataset(data_scaled, self.seq_len)
# Reserve 10% for threshold estimation
n_val = int(0.1 * len(dataset))
n_train = len(dataset) - n_val
train_ds, val_ds = torch.utils.data.random_split(
dataset, [n_train, n_val]
)
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size)
# Training
self.model.to(device)
optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate)
criterion = nn.MSELoss()
for epoch in range(n_epochs):
self.model.train()
train_loss = []
for x_batch, _ in train_loader:
x_batch = x_batch.to(device)
optimizer.zero_grad()
reconstruction = self.model(x_batch)
loss = criterion(reconstruction, x_batch)
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
optimizer.step()
train_loss.append(loss.item())
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}: loss={np.mean(train_loss):.6f}")
# Set threshold from validation reconstruction errors
self.model.eval()
val_scores = []
for x_batch, _ in val_loader:
x_batch = x_batch.to(device)
scores, _ = self.model.compute_anomaly_scores(x_batch)
val_scores.extend(scores.cpu().numpy())
# Threshold at 99th percentile of normal reconstruction errors
self.threshold = float(np.percentile(val_scores, 99))
print(f"Anomaly threshold: {self.threshold:.6f}")
return self
def predict(
self,
data: np.ndarray,
device: str = "cpu"
) -> pd.DataFrame:
"""
Run anomaly detection on new sensor data.
Returns DataFrame with window timestamps, scores, alerts, and
per-sensor contributions.
"""
data_scaled = self.scaler.transform(data)
dataset = SensorWindowDataset(data_scaled, self.seq_len, stride=self.seq_len)
loader = DataLoader(dataset, batch_size=32)
all_window_scores = []
all_sensor_scores = []
self.model.eval()
for x_batch, _ in loader:
x_batch = x_batch.to(device)
window_scores, sensor_scores = self.model.compute_anomaly_scores(x_batch)
all_window_scores.extend(window_scores.cpu().numpy())
all_sensor_scores.extend(sensor_scores.cpu().numpy())
results = pd.DataFrame({
"window_id": range(len(all_window_scores)),
"anomaly_score": all_window_scores,
"is_anomaly": np.array(all_window_scores) > self.threshold
})
# Add per-sensor scores
sensor_df = pd.DataFrame(
all_sensor_scores,
columns=[f"sensor_{i}_score" for i in range(self.n_sensors)]
)
results = pd.concat([results, sensor_df], axis=1)
results["top_contributing_sensor"] = np.argmax(all_sensor_scores, axis=1)
return results
3. Isolation Forest Ensemble for Sensor Features
"""
Isolation Forest ensemble for multivariate sensor anomaly detection.
Works on engineered features rather than raw time series.
Best suited for:
- Real-time inference (very fast)
- Moderate number of features (10-50)
- Initial deployment before deep learning models are ready
"""
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from typing import List, Optional, Dict
import joblib
class SensorFeatureAnomalyDetector:
"""
Isolation Forest on window-level engineered features.
Feature extraction happens before this model - see Lesson 1
for FFT and statistical feature extraction code.
"""
def __init__(
self,
n_estimators: int = 200,
contamination: float = 0.005, # Expect 0.5% anomalies in training
max_samples: int = 256,
random_state: int = 42
):
self.pipeline = Pipeline([
("scaler", StandardScaler()),
("iforest", IsolationForest(
n_estimators=n_estimators,
contamination=contamination,
max_samples=max_samples,
random_state=random_state,
n_jobs=-1
))
])
self.threshold = None
self.feature_names = None
def fit(
self,
X_normal: pd.DataFrame,
percentile_threshold: float = 1.0
) -> "SensorFeatureAnomalyDetector":
"""
Train on normal-operation feature windows.
percentile_threshold: set threshold at this percentile of training scores.
1.0 means 1% of training windows will be above threshold (false positive rate).
"""
self.feature_names = list(X_normal.columns)
X = X_normal.values
self.pipeline.fit(X)
# Compute scores on training data to set threshold
scores = self.pipeline.score_samples(X)
self.threshold = np.percentile(scores, percentile_threshold)
print(f"Isolation Forest trained on {len(X)} windows")
print(f"Threshold at {percentile_threshold}th percentile: {self.threshold:.4f}")
return self
def score(self, X: pd.DataFrame) -> np.ndarray:
"""
Anomaly scores: more negative = more anomalous.
Isolation Forest raw scores are in (-1, 0).
"""
return self.pipeline.score_samples(X[self.feature_names].values)
def predict(self, X: pd.DataFrame) -> np.ndarray:
"""1 = anomaly, 0 = normal."""
scores = self.score(X)
return (scores < self.threshold).astype(int)
def explain(self, X_single: pd.DataFrame, n_top: int = 5) -> Dict:
"""
Provide feature-level explanation for why a window was flagged.
Uses permutation approach: shuffle each feature and measure
how much the anomaly score changes. Features that matter most
when shuffled are the most important for this anomaly.
"""
base_score = float(self.score(X_single)[0])
feature_importance = {}
for feature in self.feature_names:
X_permuted = X_single.copy()
X_permuted[feature] = X_permuted[feature].sample(frac=1).values
permuted_score = float(self.score(X_permuted)[0])
# More negative delta = more important feature (its absence hurts the anomaly)
feature_importance[feature] = base_score - permuted_score
top_features = sorted(
feature_importance.items(), key=lambda x: x[1], reverse=True
)[:n_top]
return {
"anomaly_score": base_score,
"is_anomaly": base_score < self.threshold,
"top_contributing_features": top_features
}
def save(self, path: str):
joblib.dump({
"pipeline": self.pipeline,
"threshold": self.threshold,
"feature_names": self.feature_names
}, path)
@classmethod
def load(cls, path: str) -> "SensorFeatureAnomalyDetector":
obj = cls.__new__(cls)
data = joblib.load(path)
obj.pipeline = data["pipeline"]
obj.threshold = data["threshold"]
obj.feature_names = data["feature_names"]
return obj
4. Streaming Anomaly Detection with Sliding Window
"""
Real-time streaming anomaly detection.
Processes sensor data as it arrives, maintaining a sliding window,
extracting features, and running the anomaly detector.
Architecture:
- Kafka consumer reads sensor readings from topic
- Sliding window buffer maintains last N readings per sensor
- Feature extraction computes statistical and spectral features
- Anomaly detector scores each complete window
- Alerts published to separate Kafka topic
"""
import numpy as np
from collections import deque
from typing import Dict, List, Optional, Callable
import time
import logging
from dataclasses import dataclass
from datetime import datetime
logger = logging.getLogger(__name__)
@dataclass
class SensorReading:
"""Single sensor reading."""
sensor_id: str
timestamp: datetime
value: float
unit: str = ""
@dataclass
class AnomalyAlert:
"""Anomaly alert generated by the detector."""
sensor_group: str
timestamp: datetime
anomaly_score: float
threshold: float
severity: str # LOW, MEDIUM, HIGH
top_sensors: List[str]
class SlidingWindowBuffer:
"""
Thread-safe sliding window buffer for multi-sensor data.
Maintains a fixed-length window per sensor.
When all sensors have enough data, triggers feature extraction.
"""
def __init__(
self,
sensor_ids: List[str],
window_size: int,
min_sensors_required: int = None
):
self.sensor_ids = sensor_ids
self.window_size = window_size
self.min_sensors = min_sensors_required or len(sensor_ids)
self.buffers: Dict[str, deque] = {
sid: deque(maxlen=window_size) for sid in sensor_ids
}
def add_reading(self, sensor_id: str, value: float) -> bool:
"""
Add a reading to the buffer.
Returns True if the window is complete (ready for inference).
"""
if sensor_id not in self.buffers:
return False
self.buffers[sensor_id].append(value)
return self.is_ready()
def is_ready(self) -> bool:
"""True when all required sensors have full windows."""
n_full = sum(
1 for sid in self.sensor_ids
if len(self.buffers[sid]) == self.window_size
)
return n_full >= self.min_sensors
def get_window(self) -> Dict[str, np.ndarray]:
"""Return current window as dict of sensor_id -> array."""
return {
sid: np.array(list(buf))
for sid, buf in self.buffers.items()
if len(buf) == self.window_size
}
def get_matrix(self) -> np.ndarray:
"""Return window as (window_size, n_sensors) matrix."""
window = self.get_window()
return np.column_stack([window[sid] for sid in self.sensor_ids
if sid in window])
class StreamingAnomalyDetector:
"""
Streaming anomaly detection engine.
Processes individual sensor readings, accumulates windows,
and generates alerts when anomalies are detected.
"""
def __init__(
self,
sensor_ids: List[str],
detector, # Fitted anomaly detector (LSTMAnomalyDetector or IsolationForest)
window_size: int,
step_size: int, # Process every N readings (not on every new reading)
scaler, # Fitted StandardScaler
alert_callback: Optional[Callable] = None
):
self.sensor_ids = sensor_ids
self.detector = detector
self.window_size = window_size
self.step_size = step_size
self.scaler = scaler
self.alert_callback = alert_callback
self.buffer = SlidingWindowBuffer(sensor_ids, window_size)
self.readings_since_last_inference = 0
self.total_readings = 0
self.total_alerts = 0
# Recent scores for trend tracking
self.recent_scores = deque(maxlen=20)
def process_reading(self, reading: SensorReading) -> Optional[AnomalyAlert]:
"""
Process a single sensor reading.
Returns an AnomalyAlert if an anomaly is detected, None otherwise.
"""
window_ready = self.buffer.add_reading(reading.sensor_id, reading.value)
self.total_readings += 1
self.readings_since_last_inference += 1
if not window_ready:
return None
# Only run inference every step_size readings to control CPU load
if self.readings_since_last_inference < self.step_size:
return None
self.readings_since_last_inference = 0
# Get window matrix and scale
window_matrix = self.buffer.get_matrix()
if window_matrix.shape[0] < self.window_size:
return None
window_scaled = self.scaler.transform(window_matrix)
# Run inference
try:
import torch
window_tensor = torch.tensor(
window_scaled[np.newaxis, :, :], # (1, window_size, n_sensors)
dtype=torch.float32
)
scores, sensor_scores = self.detector.compute_anomaly_scores(window_tensor)
anomaly_score = float(scores[0])
sensor_score_array = sensor_scores[0].numpy()
except Exception as e:
logger.error(f"Inference error: {e}")
return None
self.recent_scores.append(anomaly_score)
# Check if anomalous
if anomaly_score <= self.detector.threshold:
return None
self.total_alerts += 1
# Severity based on score magnitude relative to threshold
severity_ratio = anomaly_score / self.detector.threshold
if severity_ratio > 3.0:
severity = "HIGH"
elif severity_ratio > 1.5:
severity = "MEDIUM"
else:
severity = "LOW"
# Identify top contributing sensors
top_sensor_indices = np.argsort(sensor_score_array)[-3:][::-1]
top_sensors = [self.sensor_ids[i] for i in top_sensor_indices]
alert = AnomalyAlert(
sensor_group=",".join(self.sensor_ids[:3]) + "...",
timestamp=reading.timestamp,
anomaly_score=round(anomaly_score, 4),
threshold=self.detector.threshold,
severity=severity,
top_sensors=top_sensors
)
if self.alert_callback:
try:
self.alert_callback(alert)
except Exception as e:
logger.error(f"Alert callback error: {e}")
return alert
def setup_kafka_pipeline(
broker_url: str,
sensor_topic: str,
alert_topic: str,
sensor_ids: List[str],
detector: StreamingAnomalyDetector
):
"""
Set up Kafka-based real-time sensor processing pipeline.
Requires: pip install confluent-kafka
"""
try:
from confluent_kafka import Consumer, Producer, KafkaError
import json
# Producer for alerts
producer = Producer({"bootstrap.servers": broker_url})
def send_alert(alert: AnomalyAlert):
alert_msg = {
"timestamp": alert.timestamp.isoformat(),
"sensor_group": alert.sensor_group,
"anomaly_score": alert.anomaly_score,
"severity": alert.severity,
"top_sensors": alert.top_sensors
}
producer.produce(
alert_topic,
value=json.dumps(alert_msg).encode("utf-8")
)
producer.poll(0) # Non-blocking flush
detector.alert_callback = send_alert
# Consumer for sensor readings
consumer = Consumer({
"bootstrap.servers": broker_url,
"group.id": "anomaly-detector",
"auto.offset.reset": "latest"
})
consumer.subscribe([sensor_topic])
logger.info(f"Kafka pipeline started. Consuming from {sensor_topic}")
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
logger.error(f"Kafka error: {msg.error()}")
continue
data = json.loads(msg.value().decode("utf-8"))
reading = SensorReading(
sensor_id=data["sensor_id"],
timestamp=datetime.fromisoformat(data["timestamp"]),
value=float(data["value"])
)
detector.process_reading(reading)
except ImportError:
logger.warning("confluent-kafka not installed. Kafka pipeline unavailable.")
5. Sensor Data Preprocessing
"""
Preprocessing pipeline for industrial sensor data.
Handles: normalization, imputation, drift detection, outlier rejection.
"""
import numpy as np
import pandas as pd
from sklearn.preprocessing import RobustScaler
from scipy import stats
from typing import Dict, List, Tuple
class IndustrialSensorPreprocessor:
"""
Preprocessing for industrial sensor time series.
Handles common data quality issues in manufacturing sensor data:
- Stuck sensors (same value for N consecutive readings)
- Out-of-range readings (sensor saturation or failure)
- High-frequency outliers (electrical interference spikes)
- Missing values (communication gaps)
- Slow sensor drift (baseline shift over weeks/months)
"""
def __init__(
self,
sensor_ranges: Dict[str, Tuple[float, float]], # {sensor_id: (min, max)}
stuck_detection_window: int = 20,
spike_sigma_threshold: float = 5.0
):
self.sensor_ranges = sensor_ranges
self.stuck_window = stuck_detection_window
self.spike_sigma = spike_sigma_threshold
self.scaler = RobustScaler() # Robust to outliers vs StandardScaler
self.baseline_stats: Dict[str, Dict] = {}
def detect_stuck_sensor(self, series: pd.Series) -> pd.Series:
"""
Detect stuck-sensor periods: consecutive identical values.
Returns boolean mask: True where sensor appears stuck.
"""
# Compute rolling unique count
def n_unique(window):
return len(set(window))
rolling_unique = series.rolling(
self.stuck_window, min_periods=self.stuck_window
).apply(n_unique, raw=True)
# Stuck if fewer than 2 unique values in window
return rolling_unique < 2
def reject_out_of_range(
self, series: pd.Series, sensor_id: str
) -> pd.Series:
"""Replace out-of-range values with NaN."""
if sensor_id not in self.sensor_ranges:
return series
lo, hi = self.sensor_ranges[sensor_id]
cleaned = series.copy()
cleaned[(cleaned < lo) | (cleaned > hi)] = np.nan
return cleaned
def despike(self, series: pd.Series) -> pd.Series:
"""
Remove electrical interference spikes using z-score.
Spikes are extreme outliers that are present for 1-3 samples.
"""
z_scores = np.abs(stats.zscore(series.dropna()))
cleaned = series.copy()
outlier_mask = pd.Series(False, index=series.index)
valid_idx = series.dropna().index
outlier_mask[valid_idx] = z_scores > self.spike_sigma
cleaned[outlier_mask] = np.nan
return cleaned
def impute_short_gaps(
self, series: pd.Series, max_gap_length: int = 5
) -> pd.Series:
"""
Fill short NaN gaps using linear interpolation.
Long gaps (> max_gap_length) are left as NaN (model should not impute large gaps).
"""
# Count consecutive NaN run lengths
is_nan = series.isna()
nan_groups = is_nan.groupby((~is_nan).cumsum())
long_nan_starts = []
for name, group in nan_groups:
if group.all() and len(group) > max_gap_length:
long_nan_starts.append(group.index[0])
# Interpolate all NaNs
imputed = series.interpolate(method="linear", limit=max_gap_length)
# Re-apply NaN for long gaps
for start_idx in long_nan_starts:
# Find the extent of the long gap
i = series.index.get_loc(start_idx)
end_i = i
while end_i < len(series) and pd.isna(series.iloc[end_i]):
end_i += 1
imputed.iloc[i:end_i] = np.nan
return imputed
def preprocess(
self,
df: pd.DataFrame, # (n_timesteps, n_sensors)
fit: bool = True
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Full preprocessing pipeline.
Returns: (cleaned_df, quality_report_df)
quality_report: per-sensor stats on stuck/out-of-range/spike rejection
"""
cleaned = df.copy()
quality_stats = {}
for col in df.columns:
orig = df[col]
n_total = len(orig)
# 1. Out-of-range rejection
cleaned[col] = self.reject_out_of_range(orig, col)
n_oor = int((orig != cleaned[col]).sum())
# 2. Stuck sensor detection
stuck_mask = self.detect_stuck_sensor(cleaned[col])
cleaned.loc[stuck_mask, col] = np.nan
n_stuck = int(stuck_mask.sum())
# 3. Despike
cleaned[col] = self.despike(cleaned[col])
n_spikes = int(cleaned[col].isna().sum()) - n_oor - n_stuck
# 4. Impute short gaps
cleaned[col] = self.impute_short_gaps(cleaned[col])
quality_stats[col] = {
"total_readings": n_total,
"out_of_range": n_oor,
"stuck_readings": n_stuck,
"spikes_removed": max(0, n_spikes),
"remaining_nan": int(cleaned[col].isna().sum()),
"data_quality_pct": round(
100 * (1 - cleaned[col].isna().sum() / n_total), 1
)
}
quality_df = pd.DataFrame(quality_stats).T
# Scale
if fit:
cleaned_values = self.scaler.fit_transform(
cleaned.fillna(cleaned.median())
)
else:
cleaned_values = self.scaler.transform(
cleaned.fillna(cleaned.median())
)
cleaned_scaled = pd.DataFrame(
cleaned_values, columns=df.columns, index=df.index
)
return cleaned_scaled, quality_df
System Architecture
Production Engineering Notes
Handling Sensor Drift in Long-Running Systems
Industrial sensors drift over time. Thermocouples age. Pressure transducers accumulate zero-drift. Vibration accelerometers change sensitivity as their mounting degrades. A model trained six months ago on "normal" data may flag current readings as anomalous simply because the sensor baseline has shifted - even though the physical asset is fine.
Two strategies. First, periodic baseline recalibration: retrain the anomaly model every month on the most recent 30-60 days of confirmed-normal data. The LSTM autoencoder and Isolation Forest both support retraining from scratch on new data. This keeps the model calibrated to the current sensor baselines. Second, adaptive normalization: instead of using a fixed scaler trained on historical data, use an EWMA-based adaptive scaler that slowly tracks the current sensor baseline. New readings are normalized relative to the recent rolling mean and standard deviation. This handles slow drift automatically without retraining.
For rapid shifts (a sensor is recalibrated, replaced, or remounted), the CUSUM chart will immediately detect the step change. This is actually useful: a sudden shift in a sensor's baseline is worth investigating even if it turns out to be a maintenance action rather than a fault.
Multivariate vs Univariate Detection
The question "should I detect anomalies univariately (one sensor at a time) or multivariately (all sensors together)" has a nuanced answer. Univariate methods (CUSUM, EWMA, per-sensor Isolation Forest) are simpler, faster, and easier to explain. They work well for point anomalies and contextual anomalies that manifest in a single sensor.
Multivariate methods (LSTM autoencoder, MTAD-GAT, TranAD) detect collective anomalies - changes in correlation structure between sensors. These are often the most informative early warnings: before a bearing temperature rises above threshold, the correlation between bearing temperature and motor current changes. Before a pump fails, the correlation between inlet pressure and flow rate degrades. These signals are invisible to univariate detectors.
In practice, run both. Use CUSUM/EWMA per sensor for fast, interpretable alerting on simple anomalies. Use LSTM autoencoder for multivariate collective anomalies. Combine the scores in an ensemble. The computational overhead of running both is small - the CUSUM runs in microseconds, the LSTM in milliseconds. The combination catches more anomaly types than either alone.
:::warning Non-Stationarity in Industrial Time Series Industrial sensor data is not stationary. Temperatures vary with ambient conditions and production load. Vibration baselines shift with product changeovers. A model trained on "normal" data from summer will flag normal winter operation as anomalous due to lower ambient temperatures. Always include operating context (load level, ambient conditions, production mode) as conditioning variables in your model, or normalize by operating regime. Failure to handle non-stationarity is the most common cause of high false positive rates in deployed anomaly detection systems. :::
:::danger Concept Drift After Equipment Maintenance After major maintenance (bearing replacement, motor rewind, impeller change), the sensor signatures change. The new bearing has slightly different fault frequencies. The rewound motor has different current harmonics. Your anomaly model, trained on data from the old equipment, will generate false positives on the "normal" operation of the maintained equipment. Implement a procedure: after maintenance, put the anomaly model in "learning mode" for 7-14 days, collecting new baseline data and retraining. Alert operators not to act on anomaly alerts during this period. This is a process problem, not just a technical one - you need the maintenance team to notify the ML system when equipment is serviced. :::
Interview Questions and Answers
Q1: What is the difference between point, contextual, and collective anomalies, and which methods detect each?
Point anomalies are individual readings that are globally extreme - a temperature of 500C in a system that normally runs at 80C. Simple thresholding, statistical control charts (Shewhart), and any normality-based method detect these. Contextual anomalies are readings that are anomalous given their context but would be normal otherwise - a temperature of 90C is normal at full load but anomalous at idle load. Detecting these requires incorporating context: conditional models that include load, ambient conditions, or operating mode as features. Isolation Forest and LSTM autoencoders handle contextual anomalies naturally if the context features are included as inputs. Collective anomalies are sequences or groups of readings that are anomalous as a whole even if individual points are not - the correlation between two sensors changes, or the autocorrelation structure of a single sensor shifts. LSTM autoencoders, which learn normal temporal patterns and flag deviations in sequence structure, are the natural tool for collective anomalies. Statistical methods like CUSUM can detect gradual mean shifts which are a simple form of collective anomaly.
Q2: How do you set the anomaly threshold when you have no labeled anomalies?
Several approaches. First, use a statistical percentile of normal data: train on confirmed-normal data, compute anomaly scores on a held-out set of normal data, set the threshold at the 99th or 99.5th percentile. This gives an expected false positive rate of 1% or 0.5% on normal data. Second, use domain knowledge: talk to maintenance engineers about acceptable alert rates. If they can investigate 2 alerts per week per machine, and you have 1,000 machines, you need a false positive rate of about 0.0002 per machine-hour. Work backward from this to set the threshold. Third, use shadow mode: deploy the model in logging-only mode for several weeks, observe the distribution of scores, and discuss the top-N scoring windows with maintenance engineers to understand whether they correspond to real anomalies. This produces an implicit labeled dataset for threshold setting. Fourth, use cost-based optimization: estimate the cost of a false positive (unnecessary inspection) and the cost of a false negative (missed failure leading to downtime), and optimize the threshold to minimize expected cost.
Q3: Why does the LSTM autoencoder work for anomaly detection if it was never trained on anomalies?
The LSTM autoencoder learns a compressed representation of normal temporal patterns. When it reconstructs a normal input sequence, the reconstruction is accurate because the encoder-decoder has learned the patterns well. When it encounters an anomalous sequence - one with patterns it was never trained on - the encoder must map it to the same latent space it learned from normal data. The resulting latent vector does not represent the anomalous sequence well, because the latent space only contains directions that explain normal variation. The decoder then reconstructs from this impoverished representation, producing an inaccurate reconstruction of the anomalous input. The high reconstruction error is the anomaly signal. Crucially, the autoencoder also provides spatial localization: by looking at which input dimensions have the highest reconstruction error, you can identify which sensors are driving the anomaly - something a single scalar anomaly score does not provide.
Q4: What are the failure modes of Isolation Forest on industrial sensor data?
Isolation Forest assumes that anomalies are "few and different" - they lie in sparse regions of the feature space and can be isolated with fewer random partitions. This fails in several ways for industrial data. First, masking: if multiple sensors are anomalous simultaneously (which happens in correlated multi-sensor faults), the anomalous point is not isolated from other anomalous points, so its anomaly score is reduced - the anomalies "hide" each other. Second, non-stationarity: if the training data contains multiple operating regimes (high load, medium load, low load), normal data in sparse operating regimes may score anomalous because the isolation tree does not see enough normal samples nearby. Third, temporal structure: Isolation Forest treats each feature vector as an independent sample and cannot detect collective anomalies that span multiple timesteps. It scores features at one instant, not sequences. Solutions: train separate models per operating regime, use temporal features (lags, rolling statistics) to encode temporal context into the feature vector, and use the autoencoder as a complement to catch temporal anomalies.
Q5: How would you architect a real-time anomaly detection system for 500 assets with 350 sensors each?
At this scale, centralized processing is unworkable - you have 500 x 350 = 175,000 sensor channels potentially generating millions of readings per second. The architecture: edge processing at the asset level, with aggregation at the cluster level, and ML at both levels. At the asset edge (industrial PC per asset): run CUSUM/EWMA for per-sensor monitoring, run feature extraction to compress raw signals to feature vectors every 5 seconds. Ship feature vectors to the cluster, not raw sensor data - this reduces data volume by 1,000x. At the cluster level (server per ~50 assets): run LSTM autoencoders on the feature vector time series, run Isolation Forest for quick scoring, manage thresholds and alert logic. At the cloud level: model training, fleet-level analytics, anomaly pattern library. The Kafka data pipeline is the right choice for the stream processing layer: sensor readings from OPC-UA/MQTT are published to per-asset Kafka topics, edge processors consume and produce feature vectors to a second set of topics, anomaly detectors consume feature topics and produce alerts to an alert topic. This architecture scales horizontally - add more edge PCs and cluster servers as the fleet grows.
Key Takeaways
Anomaly detection on industrial sensor data requires a layered approach: statistical methods (CUSUM, EWMA) for fast univariate monitoring, classical ML (Isolation Forest) for multivariate feature-based detection, and deep learning (LSTM autoencoders) for capturing collective anomalies in multivariate sequences. The hardest problems are non-stationarity (sensor baselines shift with operating conditions), concept drift (sensor signatures change after maintenance), and threshold setting without labeled anomalies. The streaming architecture - Kafka for transport, edge devices for feature extraction, GPU servers for model inference - is the production pattern that scales to industrial fleet sizes. Build the preprocessing carefully: stuck sensor detection, spike rejection, and gap imputation are not optional - garbage in guarantees garbage out regardless of model sophistication.
