Skip to main content

Predictive Maintenance with AI

Reading time: ~45 min · Interview relevance: Very High · Target roles: ML Engineer, Industrial AI Engineer, Data Scientist


The 3 AM Wake-Up Call

It is 3:17 AM on a Tuesday in a semiconductor fabrication plant in Taiwan. A lithography machine - one of only four in the facility, each costing $150 million - begins to vibrate slightly differently than it did the day before. The vibration amplitude at 47 Hz increases by 11%. The bearing temperature climbs 2 degrees Celsius above its 30-day average. Neither change is dramatic. Neither triggers any existing alarm. The operators on the night shift see nothing unusual.

At 6:41 AM, a bearing in the main spindle assembly fails catastrophically. The machine goes offline. The fab loses 2.3millioninproductionthatdayandanother2.3 million in production that day and another 1.8 million over the 17 days it takes to source the replacement bearing from Germany, fly in a Zeiss field engineer, recalibrate the machine, and re-qualify the process. The total cost: 4.1million.Thebearingitselfcosts4.1 million. The bearing itself costs 340.

What is maddening about this story - and it happens across manufacturing every single day - is that the data was there. The sensors recorded everything. The signal was present for at least 72 hours before the failure. A well-trained ML model, watching the same sensor streams, would have flagged the anomaly at hour 1 of the 72-hour window and scheduled a maintenance technician to inspect the bearing during a planned downtime window that same week. The $340 bearing gets replaced. The fab loses nothing.

This is the promise of predictive maintenance, and it is not theoretical. GE Aviation reduced unplanned engine removals by 25% using sensor-based prognostics. Caterpillar's Product Link system has prevented thousands of equipment failures for construction customers. Rolls-Royce's TotalCare program monitors 4,000+ jet engines in real time, shifting the business model from selling engines to selling thrust-hours. The pattern repeats across industries: the data was always there, the sensors were already installed, the ROI is 10x to 100x. The bottleneck has been the ML system to make sense of it all.

This lesson teaches you to build that system. You will learn why vibration data is different from tabular data, how to extract features that actually correlate with failure, which anomaly detection algorithms work at scale in production, and how to integrate your model with the SCADA and CMMS systems that maintenance teams actually use.


Why This Exists

The Three Eras of Maintenance

Before predictive maintenance, there were two approaches, both deeply unsatisfying.

Reactive maintenance - fix it when it breaks - was the default for most of industrial history. The logic is simple: run the machine until failure, then repair it. The problem is that failures are never convenient. They happen at maximum production load (because that is when stress is highest). They cascade - one failed bearing puts lateral load on adjacent components, destroying two more parts. They require emergency parts procurement at premium prices. And in industries like chemical processing, semiconductor fab, or food manufacturing, an unplanned failure can contaminate a batch, violate safety regulations, or injure workers.

Preventive maintenance was the response - replace components on a fixed schedule regardless of actual condition. Change the oil every 3,000 miles. Replace the pump impeller every six months. This is vastly better than reactive, but it has its own pathology: you replace parts that are fine (waste) and you miss parts that were degrading faster than expected (still fails). Studies consistently show that 30-40% of preventive maintenance tasks are performed too early, and another 10-15% are performed too late.

Predictive maintenance - maintain based on actual equipment condition, predicted from sensor data - solves both problems. Replace the part when it actually needs replacement, not before and not after. The catch is that you need a model that can watch sensor streams, understand normal versus abnormal, and issue a reliable warning with enough lead time to schedule maintenance. This is an ML problem.

The Economic Equation

The numbers make the case better than any argument. According to Deloitte, unplanned downtime costs industrial manufacturers 50billionperyearintheUnitedStatesalone.AMcKinseystudyfoundpredictivemaintenancereducesmaintenancecostsby102550 billion per year in the United States alone. A McKinsey study found predictive maintenance reduces maintenance costs by 10-25%, reduces breakdowns by 70-75%, and reduces downtime by 35-45%. For a mid-size automotive plant with 500M in annual revenue, a 35% reduction in downtime is worth tens of millions per year.

The ROI calculation that matters for any specific deployment: (cost of one unplanned failure) x (failures per year) x (fraction prevented by the model) - (cost of implementing and running the system). For most industrial equipment, the cost side of this equation is dominated by unplanned downtime - not sensor hardware, not cloud compute, not ML engineer time. The economics almost always work.


Historical Context

The field traces back to 1967 when the U.S. Navy initiated research into condition-based maintenance for ship machinery. The early methods were simple - measure vibration with a handheld accelerometer, compare to a threshold, schedule maintenance if exceeded. The threshold was set by experience, not data.

The real breakthrough came in the 1990s when microprocessor-based data acquisition systems made continuous, high-frequency sensor monitoring economically feasible. Pratt & Whitney and GE Aviation pioneered engine health monitoring for commercial aircraft in this era. The ACARS system on Boeing 737s was transmitting engine performance data to ground stations long before "IoT" was a term.

The machine learning era began in earnest with a pivotal 2008 paper by Abhinav Saxena and colleagues at NASA's Ames Research Center introducing the CMAPSS (Commercial Modular Aero-Propulsion System Simulation) dataset. CMAPSS provided simulated turbofan engine degradation data with ground-truth remaining useful life labels - a benchmark that unified the research community and produced a wave of papers on RUL prediction. The dataset remains the standard benchmark for prognostics research.

The current era - deep learning on sensor sequences - was catalyzed around 2016 when several groups demonstrated that LSTM networks could predict RUL on CMAPSS with RMSE 20-30% lower than traditional methods. Since then, the architecture has evolved: temporal convolutional networks (2018), transformer-based approaches (2021), and now foundation models pre-trained on large corpora of industrial sensor data.


Core Concepts

Maintenance Strategy Taxonomy

Reactive: Run until failure → Lowest upfront cost, highest total cost
Preventive: Schedule-based → Predictable but wasteful
Predictive: Condition-based → Optimal but requires ML infrastructure
Prescriptive: AI-recommended action → Next frontier, closes the loop

The four strategies form a maturity ladder. Most large manufacturers are somewhere between preventive and early predictive. Truly prescriptive maintenance - where the AI not only predicts but recommends the specific maintenance action and schedules the work order - is rare and represents the frontier.

Sensor Data Types

Industrial equipment generates several types of sensor data, each with different characteristics and information content.

Vibration signals are the richest source of mechanical degradation information. Captured by accelerometers at 1-20 kHz sampling rates, vibration signals encode the kinematic signatures of every rotating component. A bearing with a worn outer race produces a characteristic impact frequency at fBPFO=n2fr(1dDcosθ)f_{BPFO} = \frac{n}{2} f_r (1 - \frac{d}{D}\cos\theta) where nn is the number of rolling elements, frf_r is shaft rotation frequency, dd is ball diameter, DD is pitch diameter, and θ\theta is contact angle. When you know these parameters, you can look for this exact frequency in the vibration spectrum - and its presence indicates outer race wear.

Temperature signals evolve slowly (seconds to minutes) and indicate thermal stress, lubrication failure, and overloading. They are low-dimensional but reliable. A bearing temperature rising 10°C above baseline with no change in ambient or load conditions is almost always a sign of lubrication degradation.

Current signatures (Motor Current Signature Analysis, MCSA) are especially valuable for induction motors. Electrical faults, eccentricity, and broken rotor bars each produce characteristic sidebands around the line frequency. The advantage: you are measuring through the electrical supply, so no physical sensor needs to be attached to the motor.

Pressure signals matter for hydraulic and pneumatic systems. A hydraulic cylinder that takes 0.3 seconds longer to reach target pressure than it did six months ago is experiencing seal wear.

Acoustic emission sensors detect stress wave events - micro-cracking, plastic deformation - at ultrasonic frequencies (100 kHz - 1 MHz). They catch very early stage damage that vibration sensors miss, but generate enormous data volumes.

Feature Engineering for Time Series

Raw sensor data is not what you feed to a model. You engineer features that summarize the statistical and frequency-domain properties of signal windows.

Time-domain statistical features - computed over a rolling window of, say, 1 second of vibration data at 10 kHz (10,000 samples):

  • Root Mean Square (RMS): xrms=1Ni=1Nxi2x_{rms} = \sqrt{\frac{1}{N}\sum_{i=1}^{N} x_i^2} - overall vibration intensity
  • Kurtosis: K=1N(xixˉ)4(1N(xixˉ)2)2K = \frac{\frac{1}{N}\sum(x_i - \bar{x})^4}{(\frac{1}{N}\sum(x_i - \bar{x})^2)^2} - sensitivity to impulsive events (spikes from bearing impacts)
  • Crest Factor: CF=xpeakxrmsCF = \frac{x_{peak}}{x_{rms}} - ratio of peak to RMS, elevated by impulsive faults
  • Skewness: asymmetry of the amplitude distribution

Frequency-domain features via FFT - the Fast Fourier Transform converts a time-domain signal x(t)x(t) into its frequency components X(f)X(f). For a vibration signal, this reveals:

  • Spectral amplitude at known fault frequencies (BPFO, BPFI, BSF, FTF)
  • Dominant frequency and its harmonics
  • Spectral entropy - a flat spectrum means noise, peaks mean periodicity

Envelope analysis - critical for bearing fault detection. The process: bandpass filter around a resonance frequency, compute the envelope (absolute value of Hilbert transform), take FFT of the envelope. This demodulates the bearing fault impacts that are amplitude-modulating the resonance. The resulting "envelope spectrum" shows fault frequencies clearly even when the raw vibration spectrum is dominated by unrelated noise.

Wavelet features - Continuous Wavelet Transform (CWT) or Discrete Wavelet Transform (DWT) provide time-frequency localization that FFT cannot: you can see a fault frequency emerge at a specific time window, useful for transient faults.

Anomaly Detection Approaches

Isolation Forest works by randomly partitioning the feature space and observing that anomalies - which are different from the bulk of normal points - require fewer partitions to isolate. The anomaly score is the average path length across an ensemble of isolation trees. It is fast, scales well, and does not require labeled failures. The weakness: it does not model temporal structure, so you typically run it on a window of features rather than raw time series.

Autoencoder-based anomaly detection trains a neural network to compress and reconstruct normal sensor data. Normal patterns are reconstructed accurately. Anomalous patterns - anything the model was not trained on - produce high reconstruction error. The reconstruction error itself is the anomaly score. This approach is powerful for multivariate sensor data because the autoencoder learns the correlations between channels.

LSTM-based anomaly detection uses a sequence model to predict the next timestep given the preceding window. Anomalies cause prediction error to spike. This is more appropriate for sensors with strong temporal autocorrelation.

Remaining Useful Life Prediction

RUL is the number of time units until failure. Predicting RUL from a sensor sequence is a regression problem: given a sequence of sensor readings {x1,x2,...,xt}\{x_1, x_2, ..., x_t\}, predict RULtRUL_t - how many cycles or hours until failure.

The CMAPSS dataset frames this precisely. Each engine starts in normal condition and runs until failure. You are given sensor readings for the run and must predict, at each timestep, how many cycles remain. The key challenge: early in the run, sensors look identical across all engines (they all start healthy), so you cannot distinguish RUL=300 from RUL=200 in early cycles. You need the model to learn that degradation typically follows certain patterns.

A practical framing: instead of predicting exact RUL, predict a health index (1.0 = healthy, 0.0 = at failure threshold) and then predict whether the equipment will fail within a planning horizon (e.g., 48 hours). The binary prediction is more useful for maintenance scheduling.


Code Examples

1. Vibration Signal Feature Extraction with FFT

import numpy as np
from scipy.fft import fft, fftfreq
from scipy.signal import butter, filtfilt, hilbert
from scipy.stats import kurtosis, skew
import pandas as pd
from typing import Dict, List, Optional

def extract_time_domain_features(signal: np.ndarray) -> Dict[str, float]:
"""
Extract time-domain statistical features from a vibration signal window.

Args:
signal: 1D numpy array of vibration amplitudes (e.g., 10,000 samples at 10 kHz)

Returns:
Dictionary of feature name -> feature value
"""
features = {}

# Basic statistics
features["mean"] = np.mean(signal)
features["std"] = np.std(signal)
features["rms"] = np.sqrt(np.mean(signal**2))
features["peak"] = np.max(np.abs(signal))
features["peak_to_peak"] = np.max(signal) - np.min(signal)

# Shape factors
features["crest_factor"] = features["peak"] / (features["rms"] + 1e-10)
features["kurtosis"] = kurtosis(signal)
features["skewness"] = skew(signal)

# Shape factor (RMS / mean absolute value)
mean_abs = np.mean(np.abs(signal))
features["shape_factor"] = features["rms"] / (mean_abs + 1e-10)

# Impulse factor
features["impulse_factor"] = features["peak"] / (mean_abs + 1e-10)

# Margin factor (peak / RMS of square root of absolute values)
sqrt_mean = np.mean(np.sqrt(np.abs(signal)))**2
features["margin_factor"] = features["peak"] / (sqrt_mean + 1e-10)

return features


def extract_frequency_domain_features(
signal: np.ndarray,
sampling_rate: float,
fault_frequencies: Optional[Dict[str, float]] = None
) -> Dict[str, float]:
"""
Extract frequency-domain features via FFT.

Args:
signal: 1D vibration signal
sampling_rate: Sampling rate in Hz (e.g., 10000 for 10 kHz)
fault_frequencies: Optional dict of {'bpfo': 47.3, 'bpfi': 62.1, ...}
These are the theoretical bearing fault frequencies.

Returns:
Dictionary of frequency domain features
"""
N = len(signal)
# Compute FFT - use only positive frequencies (one-sided spectrum)
fft_vals = fft(signal)
fft_magnitude = np.abs(fft_vals[:N//2]) * 2 / N # Normalized one-sided
freqs = fftfreq(N, d=1.0/sampling_rate)[:N//2]

features = {}

# Dominant frequency and its magnitude
dominant_idx = np.argmax(fft_magnitude)
features["dominant_frequency"] = freqs[dominant_idx]
features["dominant_magnitude"] = fft_magnitude[dominant_idx]

# Spectral energy in frequency bands
# Low: 0-1kHz, Mid: 1-5kHz, High: 5-10kHz (adjust for your sensor range)
low_mask = freqs < 1000
mid_mask = (freqs >= 1000) & (freqs < 5000)
high_mask = freqs >= 5000

total_energy = np.sum(fft_magnitude**2) + 1e-10
features["spectral_energy_low"] = np.sum(fft_magnitude[low_mask]**2) / total_energy
features["spectral_energy_mid"] = np.sum(fft_magnitude[mid_mask]**2) / total_energy
features["spectral_energy_high"] = np.sum(fft_magnitude[high_mask]**2) / total_energy

# Spectral entropy - how spread out is the energy across frequencies?
# Low entropy = energy concentrated at few frequencies (often fault signature)
# High entropy = broadband noise
normalized_magnitude = fft_magnitude / (np.sum(fft_magnitude) + 1e-10)
features["spectral_entropy"] = -np.sum(
normalized_magnitude * np.log(normalized_magnitude + 1e-10)
)

# Spectral centroid (weighted mean frequency)
features["spectral_centroid"] = np.sum(freqs * fft_magnitude) / (np.sum(fft_magnitude) + 1e-10)

# Amplitudes at bearing fault frequencies (if provided)
if fault_frequencies:
freq_resolution = freqs[1] - freqs[0] # Hz per bin
for fault_name, fault_freq in fault_frequencies.items():
# Find nearest FFT bin to fault frequency
if fault_freq < freqs[-1]: # Check it's within range
bin_idx = int(fault_freq / freq_resolution)
# Average over +/- 2 bins to handle slight frequency variation
bin_range = slice(max(0, bin_idx-2), min(len(fft_magnitude), bin_idx+3))
features[f"fault_amp_{fault_name}"] = np.mean(fft_magnitude[bin_range])

return features


def compute_bearing_fault_frequencies(
shaft_rpm: float,
n_balls: int,
ball_diameter_mm: float,
pitch_diameter_mm: float,
contact_angle_deg: float = 0.0
) -> Dict[str, float]:
"""
Calculate theoretical bearing fault frequencies.

BPFO: Ball Pass Frequency Outer race
BPFI: Ball Pass Frequency Inner race
BSF: Ball Spin Frequency
FTF: Fundamental Train (cage) Frequency
"""
f_r = shaft_rpm / 60.0 # shaft rotation in Hz
d = ball_diameter_mm
D = pitch_diameter_mm
cos_theta = np.cos(np.radians(contact_angle_deg))

bpfo = (n_balls / 2) * f_r * (1 - (d/D) * cos_theta)
bpfi = (n_balls / 2) * f_r * (1 + (d/D) * cos_theta)
bsf = (D / (2*d)) * f_r * (1 - ((d/D) * cos_theta)**2)
ftf = (f_r / 2) * (1 - (d/D) * cos_theta)

return {"bpfo": bpfo, "bpfi": bpfi, "bsf": bsf, "ftf": ftf}


def extract_envelope_spectrum_features(
signal: np.ndarray,
sampling_rate: float,
center_freq: float,
bandwidth: float,
fault_frequencies: Dict[str, float]
) -> Dict[str, float]:
"""
Envelope analysis for bearing fault detection.

Process:
1. Bandpass filter around structural resonance
2. Rectify (absolute value)
3. Low-pass filter to get envelope
4. FFT of envelope - reveals fault frequencies
"""
nyq = sampling_rate / 2.0

# Bandpass filter around resonance frequency
low = (center_freq - bandwidth/2) / nyq
high = (center_freq + bandwidth/2) / nyq
low = max(low, 0.001)
high = min(high, 0.999)

b, a = butter(4, [low, high], btype='band')
filtered = filtfilt(b, a, signal)

# Compute envelope via Hilbert transform
analytic = hilbert(filtered)
envelope = np.abs(analytic)

# Subtract mean to center envelope
envelope -= np.mean(envelope)

# FFT of envelope
N = len(envelope)
env_fft = np.abs(fft(envelope)[:N//2]) * 2 / N
env_freqs = fftfreq(N, d=1.0/sampling_rate)[:N//2]

features = {}
freq_resolution = env_freqs[1] - env_freqs[0]

for fault_name, fault_freq in fault_frequencies.items():
if fault_freq < env_freqs[-1]:
bin_idx = int(fault_freq / freq_resolution)
bin_range = slice(max(0, bin_idx-2), min(len(env_fft), bin_idx+3))
features[f"envelope_{fault_name}"] = np.mean(env_fft[bin_range])
# Also check second harmonic
if 2*fault_freq < env_freqs[-1]:
bin2_idx = int(2*fault_freq / freq_resolution)
bin2_range = slice(max(0, bin2_idx-2), min(len(env_fft), bin2_idx+3))
features[f"envelope_{fault_name}_2x"] = np.mean(env_fft[bin2_range])

return features


def build_feature_matrix(
sensor_data: pd.DataFrame,
time_col: str,
signal_col: str,
window_seconds: float,
step_seconds: float,
sampling_rate: float,
fault_frequencies: Optional[Dict[str, float]] = None
) -> pd.DataFrame:
"""
Slide a window over continuous sensor data and extract features per window.

Args:
sensor_data: DataFrame with timestamp and signal columns
window_seconds: Window length in seconds
step_seconds: Step size (overlap = window - step)
sampling_rate: Sensor sampling rate in Hz
"""
window_samples = int(window_seconds * sampling_rate)
step_samples = int(step_seconds * sampling_rate)
signal = sensor_data[signal_col].values
timestamps = sensor_data[time_col].values

all_features = []

for start in range(0, len(signal) - window_samples, step_samples):
end = start + window_samples
window = signal[start:end]

features = {}
features["timestamp"] = timestamps[start]
features.update(extract_time_domain_features(window))
features.update(
extract_frequency_domain_features(window, sampling_rate, fault_frequencies)
)
all_features.append(features)

return pd.DataFrame(all_features)

2. LSTM-Based RUL Predictor on CMAPSS Data

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from typing import Tuple, List

# ------------------------------------------------------------------
# CMAPSS DATA LOADING
# The NASA CMAPSS dataset: 4 train/test sets (FD001-FD004).
# Each row: engine_id, cycle, 3 operational settings, 21 sensor readings
# Training: multiple run-to-failure trajectories
# Test: truncated sequences - predict RUL at the final timestep
# ------------------------------------------------------------------

CMAPSS_SENSOR_COLS = [f"s{i}" for i in range(1, 22)]
CMAPSS_SETTING_COLS = ["setting1", "setting2", "setting3"]
CMAPSS_COL_NAMES = (
["engine_id", "cycle"] + CMAPSS_SETTING_COLS + CMAPSS_SENSOR_COLS
)

# Sensors with near-zero variance in FD001 - usually dropped
SENSORS_TO_DROP = {"s1", "s5", "s6", "s10", "s16", "s18", "s19"}
USEFUL_SENSORS = [s for s in CMAPSS_SENSOR_COLS if s not in SENSORS_TO_DROP]


def load_cmapss(data_dir: str, subset: str = "FD001") -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Load CMAPSS train/test/RUL files."""
train_path = f"{data_dir}/train_{subset}.txt"
test_path = f"{data_dir}/test_{subset}.txt"
rul_path = f"{data_dir}/RUL_{subset}.txt"

train = pd.read_csv(train_path, sep=" ", header=None, names=CMAPSS_COL_NAMES)
test = pd.read_csv(test_path, sep=" ", header=None, names=CMAPSS_COL_NAMES)
rul = pd.read_csv(rul_path, header=None, names=["rul"])

# Drop columns that are all NaN (CMAPSS files have trailing spaces)
train = train.dropna(axis=1, how="all")
test = test.dropna(axis=1, how="all")

return train, test, rul


def add_rul_labels(df: pd.DataFrame, rul_cap: int = 130) -> pd.DataFrame:
"""
Add RUL column to training data.

The "piecewise linear" RUL target is standard practice:
- Cap RUL at rul_cap for early cycles (machine is healthy, RUL doesn't matter much)
- This prevents the model from trying to distinguish RUL=300 from RUL=250
when the machine looks identical at both timepoints.
"""
max_cycles = df.groupby("engine_id")["cycle"].max().rename("max_cycle")
df = df.join(max_cycles, on="engine_id")
df["rul"] = df["max_cycle"] - df["cycle"]
df["rul"] = df["rul"].clip(upper=rul_cap) # Piecewise linear cap
df = df.drop(columns=["max_cycle"])
return df


def normalize_cmapss(
train: pd.DataFrame,
test: pd.DataFrame,
feature_cols: List[str]
) -> Tuple[pd.DataFrame, pd.DataFrame, StandardScaler]:
"""Fit scaler on train, apply to both train and test."""
scaler = StandardScaler()
train = train.copy()
test = test.copy()
train[feature_cols] = scaler.fit_transform(train[feature_cols])
test[feature_cols] = scaler.transform(test[feature_cols])
return train, test, scaler


class CMAPSSDataset(Dataset):
"""
PyTorch Dataset for CMAPSS RUL prediction.

Each sample: (sequence of length seq_len, RUL at last timestep)
We slide a window over each engine's trajectory.
"""

def __init__(
self,
df: pd.DataFrame,
feature_cols: List[str],
seq_len: int = 30,
mode: str = "train" # "train" or "test"
):
self.seq_len = seq_len
self.mode = mode
self.sequences = []
self.labels = []

for engine_id, group in df.groupby("engine_id"):
group = group.sort_values("cycle")
features = group[feature_cols].values
ruls = group["rul"].values if "rul" in group.columns else None

if mode == "train":
# Slide window over training trajectory
for i in range(len(features) - seq_len + 1):
self.sequences.append(features[i:i+seq_len])
self.labels.append(ruls[i+seq_len-1])
else:
# Test mode: use only the last seq_len timesteps
if len(features) >= seq_len:
self.sequences.append(features[-seq_len:])
else:
# Pad with zeros at beginning if sequence is too short
pad = np.zeros((seq_len - len(features), len(feature_cols)))
self.sequences.append(np.vstack([pad, features]))
# RUL label comes from the separate RUL file - added externally
if ruls is not None:
self.labels.append(ruls[-1])

self.sequences = np.array(self.sequences, dtype=np.float32)
self.labels = np.array(self.labels, dtype=np.float32) if self.labels else None

def __len__(self):
return len(self.sequences)

def __getitem__(self, idx):
x = torch.tensor(self.sequences[idx])
if self.labels is not None:
y = torch.tensor(self.labels[idx])
return x, y
return x


class LSTMRULPredictor(nn.Module):
"""
Bidirectional LSTM for remaining useful life prediction.

Architecture:
- Bidirectional LSTM captures both forward and backward temporal patterns
- Attention over LSTM outputs weights important timesteps
- FC layers map to scalar RUL prediction
"""

def __init__(
self,
input_size: int,
hidden_size: int = 128,
num_layers: int = 2,
dropout: float = 0.2,
bidirectional: bool = True
):
super().__init__()

self.hidden_size = hidden_size
self.num_directions = 2 if bidirectional else 1

self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0.0,
bidirectional=bidirectional
)

lstm_output_size = hidden_size * self.num_directions

# Attention mechanism over time steps
self.attention = nn.Sequential(
nn.Linear(lstm_output_size, 64),
nn.Tanh(),
nn.Linear(64, 1)
)

# Regression head
self.fc = nn.Sequential(
nn.Linear(lstm_output_size, 64),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(64, 1)
)

def forward(self, x: torch.Tensor) -> torch.Tensor:
# x shape: (batch, seq_len, input_size)
lstm_out, _ = self.lstm(x)
# lstm_out shape: (batch, seq_len, hidden_size * num_directions)

# Attention weights over time steps
attn_weights = self.attention(lstm_out) # (batch, seq_len, 1)
attn_weights = torch.softmax(attn_weights, dim=1)

# Weighted sum over time steps
context = (lstm_out * attn_weights).sum(dim=1) # (batch, hidden * directions)

# Predict RUL
rul = self.fc(context).squeeze(-1) # (batch,)
return rul


def train_rul_model(
train_loader: DataLoader,
val_loader: DataLoader,
input_size: int,
num_epochs: int = 50,
learning_rate: float = 1e-3,
device: str = "cpu"
) -> LSTMRULPredictor:

model = LSTMRULPredictor(input_size=input_size).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, patience=5, factor=0.5, verbose=True
)
criterion = nn.MSELoss()

best_val_rmse = float("inf")
best_state = None

for epoch in range(num_epochs):
# Training
model.train()
train_losses = []
for x_batch, y_batch in train_loader:
x_batch = x_batch.to(device)
y_batch = y_batch.to(device)

optimizer.zero_grad()
pred = model(x_batch)
loss = criterion(pred, y_batch)
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), 1.0) # Gradient clipping
optimizer.step()
train_losses.append(loss.item())

# Validation
model.eval()
val_losses = []
with torch.no_grad():
for x_batch, y_batch in val_loader:
x_batch = x_batch.to(device)
y_batch = y_batch.to(device)
pred = model(x_batch)
loss = criterion(pred, y_batch)
val_losses.append(loss.item())

val_rmse = np.sqrt(np.mean(val_losses))
scheduler.step(val_rmse)

if val_rmse < best_val_rmse:
best_val_rmse = val_rmse
best_state = {k: v.clone() for k, v in model.state_dict().items()}

if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}: train_loss={np.mean(train_losses):.2f}, "
f"val_rmse={val_rmse:.2f}")

model.load_state_dict(best_state)
print(f"Best validation RMSE: {best_val_rmse:.2f} cycles")
return model

3. Isolation Forest for Anomaly Detection

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
from typing import Tuple, Optional
import joblib


class SensorAnomalyDetector:
"""
Isolation Forest based anomaly detector for multivariate sensor data.

Design decisions:
- Train on a "golden window" of known-normal operation data
- Score new windows: negative scores = more anomalous
- Threshold tuned to control false alarm rate
"""

def __init__(
self,
contamination: float = 0.01, # Expected fraction of anomalies in training data
n_estimators: int = 200,
random_state: int = 42
):
self.scaler = StandardScaler()
self.model = IsolationForest(
contamination=contamination,
n_estimators=n_estimators,
random_state=random_state,
n_jobs=-1
)
self.threshold = None
self.feature_names = None

def fit(self, X: np.ndarray, feature_names: Optional[list] = None):
"""
Train on normal operating data.

X: Feature matrix of shape (n_windows, n_features)
Computed from "known good" periods of operation.
"""
self.feature_names = feature_names
X_scaled = self.scaler.fit_transform(X)
self.model.fit(X_scaled)

# Compute anomaly scores on training data
# Isolation Forest scores: more negative = more anomalous
# score_samples returns raw scores, decision_function centers them
scores = self.model.score_samples(X_scaled)

# Set threshold at 1st percentile of training scores
# This means we expect < 1% of normal windows to trigger alerts
self.threshold = np.percentile(scores, 1.0)
print(f"Anomaly threshold set at: {self.threshold:.4f}")
print(f"Training score range: [{scores.min():.4f}, {scores.max():.4f}]")
return self

def score(self, X: np.ndarray) -> np.ndarray:
"""Return anomaly scores. More negative = more anomalous."""
X_scaled = self.scaler.transform(X)
return self.model.score_samples(X_scaled)

def predict(self, X: np.ndarray) -> np.ndarray:
"""
Return 1 for anomaly, 0 for normal.
Uses the threshold set during training.
"""
scores = self.score(X)
return (scores < self.threshold).astype(int)

def tune_threshold(
self,
X_val: np.ndarray,
y_val: np.ndarray,
target_precision: float = 0.8
) -> float:
"""
Tune threshold to achieve target precision on a labeled validation set.

In manufacturing, precision matters more than recall for anomaly alerts:
- False alarms (low precision) destroy operator trust
- Operators start ignoring all alerts
- The system becomes useless

Target precision of 0.8 means 80% of alerts are real anomalies.
"""
from sklearn.metrics import precision_recall_curve

scores = self.score(X_val)

# Note: we need to flip sign since higher scores = more normal
# precision_recall_curve expects higher scores to mean positive class
precision, recall, thresholds = precision_recall_curve(y_val, -scores)

# Find threshold where precision first exceeds target
valid_idx = np.where(precision >= target_precision)[0]
if len(valid_idx) > 0:
# Among thresholds with sufficient precision, pick best recall
best_idx = valid_idx[np.argmax(recall[valid_idx])]
# Convert back to score space (recall: -threshold -> threshold)
self.threshold = -thresholds[best_idx]
print(f"Tuned threshold: {self.threshold:.4f}")
print(f"Expected precision: {precision[best_idx]:.3f}, recall: {recall[best_idx]:.3f}")
else:
print("WARNING: Cannot achieve target precision with this model. Check data quality.")

return self.threshold

def save(self, path: str):
joblib.dump({"scaler": self.scaler, "model": self.model, "threshold": self.threshold}, path)

@classmethod
def load(cls, path: str) -> "SensorAnomalyDetector":
obj = cls.__new__(cls)
data = joblib.load(path)
obj.scaler = data["scaler"]
obj.model = data["model"]
obj.threshold = data["threshold"]
return obj

4. OPC-UA Data Reader for Industrial Integration

"""
OPC-UA industrial data reader using the asyncua library.

OPC-UA (Open Platform Communications Unified Architecture) is the standard
protocol for industrial equipment data access. Every modern PLC (Siemens,
Allen Bradley, Beckhoff) and SCADA system supports it.

Install: pip install asyncua
"""
import asyncio
import logging
from datetime import datetime
from typing import Dict, List, Optional, Callable, Any
import pandas as pd
from asyncua import Client, ua

logger = logging.getLogger(__name__)


class OPCUADataReader:
"""
Async OPC-UA client for reading sensor data from PLCs and SCADA systems.

Supports:
- Batch reading of multiple tags
- Subscription-based real-time updates (push model, not polling)
- Historical data access via HistoricalRead service
"""

def __init__(
self,
url: str,
username: Optional[str] = None,
password: Optional[str] = None,
security_policy: str = "Basic256Sha256", # Production: always use security
):
self.url = url
self.username = username
self.password = password
self.security_policy = security_policy
self.client: Optional[Client] = None
self._subscription = None
self._monitored_items = {}

async def connect(self):
"""Connect to OPC-UA server."""
self.client = Client(url=self.url)

if self.username and self.password:
self.client.set_user(self.username)
self.client.set_password(self.password)

await self.client.connect()
logger.info(f"Connected to OPC-UA server at {self.url}")

# Print server info
root = self.client.get_root_node()
server_info = await self.client.get_server_node().read_browse_name()
logger.info(f"Server: {server_info}")

async def disconnect(self):
"""Clean disconnect."""
if self._subscription:
await self._subscription.delete()
if self.client:
await self.client.disconnect()
logger.info("Disconnected from OPC-UA server")

async def read_tags(self, node_ids: List[str]) -> Dict[str, Any]:
"""
Read current values of multiple tags in one request.

node_ids: OPC-UA node identifiers, e.g.:
["ns=2;i=1001", "ns=2;s=Motor1.Temperature", ...]

Returns dict of node_id -> value
"""
nodes = [self.client.get_node(nid) for nid in node_ids]

# Batch read is more efficient than individual reads
values = await self.client.read_values(nodes)

return {
node_ids[i]: {
"value": values[i].Value.Value if values[i].Value else None,
"timestamp": values[i].SourceTimestamp,
"status": str(values[i].StatusCode)
}
for i in range(len(node_ids))
}

async def read_historical(
self,
node_id: str,
start_time: datetime,
end_time: datetime,
max_values: int = 10000
) -> pd.DataFrame:
"""
Read historical data from OPC-UA historian (if server supports it).
Many SCADA systems expose historian data via OPC-UA HistoricalRead.
"""
node = self.client.get_node(node_id)

result = await node.read_raw_history(
starttime=start_time,
endtime=end_time,
numvalues=max_values
)

records = []
for data_value in result:
records.append({
"timestamp": data_value.SourceTimestamp,
"value": data_value.Value.Value if data_value.Value else None,
"status": str(data_value.StatusCode)
})

return pd.DataFrame(records)

async def subscribe_to_changes(
self,
node_ids: List[str],
callback: Callable[[str, Any, datetime], None],
sampling_interval_ms: int = 100
):
"""
Subscribe to value changes (server pushes updates to client).
This is more efficient than polling for high-frequency sensors.

callback: Function called on each update with (node_id, value, timestamp)
"""

class DataChangeHandler:
def __init__(self, cb, nid_map):
self.cb = cb
self.nid_map = nid_map # handle -> node_id mapping

async def datachange_notification(self, node, val, data):
node_id = str(node.nodeid)
self.cb(node_id, val, data.monitored_item.Value.SourceTimestamp)

handler = DataChangeHandler(callback, {})
self._subscription = await self.client.create_subscription(
sampling_interval_ms, handler
)

nodes = [self.client.get_node(nid) for nid in node_ids]
await self._subscription.subscribe_data_change(nodes)
logger.info(f"Subscribed to {len(node_ids)} OPC-UA nodes")


# Example: SCADA integration pattern
async def scada_to_ml_pipeline_example():
"""
Example: continuous data collection loop that feeds ML anomaly detector.
In production this runs as a service, polling tags every few seconds
and accumulating windows for feature extraction.
"""
# These node IDs come from the PLC/SCADA configuration
MOTOR_TAGS = {
"ns=2;s=Line1.Motor1.TemperatureBearing": "temp_bearing",
"ns=2;s=Line1.Motor1.VibrationX": "vib_x",
"ns=2;s=Line1.Motor1.VibrationY": "vib_y",
"ns=2;s=Line1.Motor1.CurrentPhaseA": "current_a",
"ns=2;s=Line1.Motor1.SpeedRPM": "speed_rpm",
}

reader = OPCUADataReader(
url="opc.tcp://scada-server:4840/",
username="ml_service",
password="secure_password_from_vault"
)

buffer = []

async def on_data_change(node_id, value, timestamp):
tag_name = MOTOR_TAGS.get(node_id, node_id)
buffer.append({"tag": tag_name, "value": value, "ts": timestamp})

await reader.connect()

try:
await reader.subscribe_to_changes(
list(MOTOR_TAGS.keys()),
on_data_change,
sampling_interval_ms=50 # 20 Hz sampling
)

# Keep running, processing buffer every 5 seconds
while True:
await asyncio.sleep(5.0)
if len(buffer) >= 100:
df = pd.DataFrame(buffer[-500:]) # Last 500 readings
buffer.clear()
# Here: pivot df, extract features, run anomaly detector
# anomaly_score = detector.score(features)
# if anomaly_score < threshold: trigger_alert()
print(f"Processed batch of {len(df)} readings")

finally:
await reader.disconnect()

System Architecture


Production Engineering Notes

Alarm Management is a Product Problem, Not a Technical Problem

The biggest failure mode for predictive maintenance systems in production is not the model - it is alarm fatigue. A model with 90% recall and 50% precision will generate so many false positives that operators stop reading the alerts within two weeks. The alerts become background noise. Then the system fails to prevent the failure it was designed to prevent, and gets blamed for the outage it could have stopped.

The right framing: treat every alert as a product shipped to the maintenance team. If your false alert rate is 40%, you are shipping broken product 40% of the time. That destroys trust. Design for precision first, recall second. A system that misses 20% of failures but is right 90% of the time when it alerts will be used. A system that catches everything but cries wolf constantly will be ignored.

Practical alarm management:

  • Use a confirmation window: require the anomaly score to stay elevated for N consecutive windows before alerting (reduces spikes)
  • Implement alert suppression: if maintenance is in progress, suppress alerts for that asset
  • Send the score trend, not just the binary alert: "bearing anomaly score has been elevated for 6 hours, currently at 0.73/1.0, trending upward"
  • Track precision and recall by asset type in your monitoring dashboard
  • Build a feedback loop: maintenance team logs what they found (bearing worn, false alarm, other issue), use this to tune thresholds

Edge vs Cloud Deployment

For vibration data at 10 kHz, a single sensor generates 10,000 samples per second. At 4 bytes per sample, that is 40 KB/s per sensor. A machine with 20 sensors generates 800 KB/s continuously. Sending this raw to the cloud costs bandwidth, money, and latency. It also often violates the OT (Operational Technology) network security policy - raw sensor data on the IT network is a security concern in many plants.

The standard architecture: extract features at the edge (on an industrial PC or edge gateway co-located with the machine), send feature vectors (not raw waveforms) to the cloud. Feature vectors are 100-1000x smaller than raw data. You send a feature vector every 5-30 seconds instead of 40 KB/s.

When to run ML inference at the edge vs cloud:

  • Edge: anomaly detection that needs low latency (<1s response), operations where cloud connectivity is unreliable, sites with strict data sovereignty requirements
  • Cloud: model training and retraining, fleet-level analytics, complex models that won't fit on edge hardware, initial deployment before edge hardware is deployed

Handling Sensor Drift and Failures

Real industrial sensors drift. Thermocouples age and read 3 degrees low. Accelerometers accumulate grime on the mounting surface, changing their sensitivity. Sensors fail completely (stuck at zero, at max, or at some constant value).

Your ML pipeline must detect and handle these:

  • Stuck sensor detection: if a sensor returns the same value for N consecutive readings, flag it as failed. Normal sensors never have exactly identical consecutive readings due to noise.
  • Range check: if a value is outside the physical possible range for that sensor (e.g., temperature -50°C for an indoor motor), reject it.
  • Rate-of-change check: if a value changes faster than physically possible (e.g., bearing temperature jumps 50°C in 1 second), flag the reading.
  • Sensor drift detection: compare 7-day rolling mean to 90-day baseline. If the mean has shifted by more than 2 standard deviations with no corresponding shift in correlated sensors, suspect drift.

When a sensor is flagged as unreliable, do not feed its readings to the model. Use the last known good value, or impute using a model trained on correlated sensors. Always log the data quality issue for the asset team.


:::warning Data Leakage in Predictive Maintenance Models When training on run-to-failure data, never include future information in your training features. A common mistake: computing a rolling window feature where the window includes timesteps after the target timestep. Another common mistake: normalizing using statistics computed on the entire dataset (including test/future data). Always fit your scaler on training data only, and ensure your feature windows only look backward in time. :::

:::danger The "Cry Wolf" Failure Mode An anomaly detection system that generates too many false alerts will be disabled by plant operators within days of deployment. Before deploying, run your model in shadow mode (alerts are logged but not shown to operators) for at least 4 weeks. Measure the precision on confirmed faults. If precision is below 70%, do not go live. Operators forgive missed detections. They do not forgive wasted maintenance trips to healthy machines. :::


Interview Questions and Answers

Q1: How do you set the anomaly detection threshold for a maintenance alert system?

The threshold is a precision-recall tradeoff decision, not a technical one alone - it requires input from the maintenance team. Start by collecting a labeled validation dataset: some windows from periods when you know the equipment was degraded (confirmed by maintenance logs after the fact) and a large number of windows from confirmed-normal periods. Plot the precision-recall curve. Then have a conversation: what is the cost of a false alert (maintenance crew goes to inspect a healthy machine - maybe 2 hours of labor)? What is the cost of a missed failure (unplanned downtime - maybe $50K)? If the cost ratio is 25:1, you can accept precision as low as 1/25 = 4% before false alerts become economically worse than misses. In practice, you target higher precision (70-90%) because false alarms also destroy trust and cause operators to ignore the system entirely. Use the tuned threshold from the validation set, then monitor precision and recall in production weekly and recalibrate every quarter.

Q2: Why does kurtosis feature work well for detecting bearing faults?

Kurtosis measures the "tailedness" of the amplitude distribution - how often extreme values occur compared to a Gaussian distribution. A healthy bearing produces broadband vibration that is approximately Gaussian (kurtosis close to 3.0). A bearing with localized damage (e.g., a spall on the outer race) produces periodic impact events - short-duration spikes - that dramatically increase the frequency of extreme amplitude values, driving kurtosis to 10, 20, or higher. This happens even before the fault is visible in the frequency domain, making kurtosis one of the earliest indicators of bearing fault development. The limitation: kurtosis drops again in the late stages of failure when the damage is distributed across the surface and the signal becomes broadly elevated rather than impulsively spiky. So kurtosis is best as an early warning indicator, not a severity indicator.

Q3: Explain the piecewise linear RUL target and why it matters for training.

In a run-to-failure dataset, the true RUL at the start of a long run might be 300 cycles. At 50 cycles in, it might be 250. The sensor readings at both points are nearly identical - the machine is healthy in both cases. If you train a model to predict exact RUL (300 vs 250), you are asking it to learn a difference that is not present in the input features. The model cannot do this, and the loss on these early timesteps confuses the gradient updates. The piecewise linear target solves this by capping RUL at a maximum value (commonly 130 cycles for CMAPSS). Any timestep with more than 130 cycles remaining gets labeled as 130. The model now only needs to learn RUL in the range [0, 130], which corresponds to the degradation-visible part of the trajectory. This dramatically improves prediction accuracy in the region that matters for maintenance planning.

Q4: What is envelope analysis and why is it better than raw FFT for bearing fault detection?

Raw FFT shows all frequency content in the signal. For a bearing with a fault, the fault impacts (at frequency fBPFOf_{BPFO}) modulate the amplitude of structural resonances in the machine - typically at frequencies of 5-20 kHz. In the raw FFT, this appears as sidebands around the resonance frequency. But the resonance peak dominates and the sidebands are small, making them hard to detect against noise. Envelope analysis demodulates this amplitude modulation. You bandpass filter around the resonance, compute the envelope (using the Hilbert transform), and then take the FFT of the envelope. This envelope spectrum contains only the modulating frequencies (the bearing fault frequencies) without the carrier resonance. The fault frequencies appear as strong peaks in the envelope spectrum even at early fault stages. The key is choosing the right resonance frequency to filter around - this requires knowledge of the machine's structural resonances, typically from bump tests or manufacturer data.

Q5: How would you integrate a predictive maintenance model with a CMMS (Computerized Maintenance Management System)?

CMMS integration is primarily an API and data contract problem. Most industrial CMMS systems (Maximo, SAP PM, UpKeep, Fiix) expose REST or SOAP APIs for work order creation. The integration pattern: (1) ML model generates an alert with asset ID, timestamp, predicted failure mode, confidence score, and estimated time to failure; (2) Alert engine applies business rules (don't generate a work order if one is already open for this asset, aggregate alerts from multiple sensors for the same machine, check if the machine is already scheduled for maintenance within the predicted failure window); (3) If a work order is warranted, POST to CMMS API with structured payload including asset ID, work type (inspection vs replacement vs overhaul), priority (based on estimated time to failure), and recommended action text; (4) CMMS auto-assigns to available technician based on skills and location; (5) After maintenance, technician logs what was found - this feedback is captured and used to label the pre-alert data as true positive or false positive for model retraining. The feedback loop is critical and often neglected - without it, you cannot measure or improve your model's actual performance.


Key Takeaways

Predictive maintenance with AI is a high-ROI application that follows a consistent pattern: sensor data collection via industrial protocols (OPC-UA, Modbus), feature engineering in time and frequency domains (FFT, envelope analysis, statistical features), anomaly detection to flag degrading behavior (isolation forest, autoencoders, LSTM), remaining useful life regression for maintenance planning, and integration with operational systems (CMMS, SCADA) to close the loop. The hardest parts are not the ML - they are alarm management (precision vs recall tradeoff in a human trust system), data quality (sensor drift, failures, missing data), and the feedback loop (logging what maintenance found to retrain and validate the model). Get those three right, and the ML becomes almost straightforward.

© 2026 EngineersOfAI. All rights reserved.