Skip to main content

Industrial IoT and ML

Reading time: ~50 min · Interview relevance: High · Target roles: ML Engineer, Data Engineer, Industrial AI Systems Architect


The Data Plumbing Nobody Talks About

Every ML paper on manufacturing AI shows an architecture diagram with a box labeled "sensor data" on the left and a box labeled "ML model" on the right, connected by a neat arrow. The paper never explains what is in that arrow. In practice, that arrow represents 80% of the engineering work: extracting data from PLCs that speak protocols invented in 1979, handling 1,200 different sensor types with different time bases and precisions, storing petabytes of time-series data in databases designed for exactly this purpose, streaming millions of events per second through Kafka, and feeding all of this into ML inference systems that need low-latency access to both real-time and historical data.

This is Industrial IoT engineering. It is unglamorous, technically demanding, and absolutely essential. A beautiful ML model is worthless if it cannot receive reliable, clean, properly timestamped sensor data. A predictive maintenance system that generates alerts 30 minutes after the relevant sensor event occurred because of buffer delays in the data pipeline is not a real-time system - it is a delayed reporting system.

The challenge is that industrial data systems were designed in an era before ML existed. OPC-UA, the dominant industrial communication protocol, was standardized in 2009 - before the deep learning era. Modbus was invented in 1979 - before the internet. OSIsoft PI, the dominant industrial historian system, dates to the 1990s. These systems are excellent at what they were designed for: reliably collecting and storing operational data for SCADA displays and manual reporting. They were not designed to feed low-latency ML inference pipelines or to store the hundreds of millions of rows per day that modern manufacturing analytics requires.

This lesson teaches you to bridge the gap: how to extract data from industrial systems using their native protocols, how to store and query time-series sensor data efficiently, how to build streaming pipelines for real-time ML, and how to integrate ML inference with industrial data platforms in ways that actually work in production.


Why This Exists

The OT/IT Divide

Manufacturing facilities have two separate network domains: OT (Operational Technology) and IT (Information Technology). The OT network connects the physical machines - PLCs, SCADA servers, sensors, drives, robots. It runs protocols designed for deterministic real-time control: Modbus, PROFINET, EtherNet/IP, OPC-UA. It is managed by automation engineers and controls specialists, not IT staff.

The IT network connects office systems, ERP, file servers, and increasingly cloud services. It runs standard TCP/IP protocols. IT manages it.

Historically, these networks were physically separated (air-gapped). An ML system living in the IT domain or cloud cannot see OT data directly. Data movement required manual export - a spreadsheet emailed weekly. This was acceptable for periodic reporting but is incompatible with real-time ML.

The modern approach: a demilitarized zone (DMZ) or data diode between OT and IT networks, through which data flows one-way (OT to IT) via a data broker. OPC-UA is the standard protocol for this cross-network data flow because it was designed with security (certificate-based authentication, encryption) and includes a hierarchical information model that makes OT data self-describing.

The Historian Gap

Most manufacturing facilities have a data historian - a specialized time-series database that records every sensor value at every time step. OSIsoft PI (now AVEVA PI) is the dominant system, used in over 19,000 sites globally. Wonderware Historian, GE Proficy, and Honeywell PHD are alternatives.

These historians are excellent at storing data. They are not designed for ML workloads. Querying 50 million rows of historical sensor data for a training dataset from a PI historian using PI OLEDB can take hours. Streaming current readings to a Kafka topic for real-time ML is a custom integration effort. The timestamp granularity is often limited (PI stores data at the historian's polling interval, which may be 1 second even if the sensor samples at 10ms). The data model (PI tags) does not align with ML feature vectors.

The solution: a modern time-series database (InfluxDB, TimescaleDB) alongside the historian. The historian serves its original purpose (SCADA, compliance, reporting). The modern TSDB serves ML workloads (fast queries, streaming integration, feature computation). The historian and TSDB are synchronized via a data bridge.


Historical Context

Industrial automation communication has evolved through distinct generations. In the 1970s-1980s: serial protocols (Modbus RTU over RS-485, HART over 4-20mA current loops) connecting individual instruments to DCS (Distributed Control Systems). In the 1990s: Ethernet-based fieldbus protocols (PROFIBUS DP, DeviceNet) enabling digital communication with field devices. In the 2000s: OPC (originally "OLE for Process Control") providing a Windows COM-based standard for data exchange between SCADA systems. OPC DA (Data Access) became the universal bridge between PLC vendors and SCADA software.

The current standard, OPC-UA (Unified Architecture, 2009), replaced OPC DA with a platform-independent, secure, scalable protocol. OPC-UA adds the concept of an information model - the OPC-UA address space allows devices to describe their data semantically, not just expose raw numbers. A temperature sensor does not just expose tag "2.47.3" - it exposes a node with name "BearingTemperature", engineering unit "Celsius", metadata about the sensor type, and the current value. This semantic richness is what makes OPC-UA the right foundation for IIoT.

The IIoT platform era began around 2015 with Predix (GE), MindSphere (Siemens), ThingWorx (PTC), and the major cloud providers' IoT services (AWS IoT, Azure IoT Hub, Google IoT Core). These platforms provide managed MQTT brokers, device management, and some analytics capability. The ML integration layer is typically custom-built on top.

MQTT (Message Queuing Telemetry Transport) became the standard for IoT device-to-cloud communication because of its lightweight publish-subscribe model, small message overhead (as low as 2 bytes per message), and support for unreliable networks (QoS levels handle packet loss). It was originally designed at IBM in the 1990s for monitoring oil pipelines via satellite - the industrial IoT use case has come full circle.


Core Concepts

IIoT Architecture Layers

Physical Layer: Sensors, actuators, drives, robots
Field Layer: PLCs, DCS, local I/O systems
Industrial Network: PROFINET, EtherNet/IP, Modbus TCP (OT network)
Gateway Layer: OPC-UA servers, protocol converters, edge gateways
Transport Layer: MQTT, AMQP, HTTP (DMZ / IT network)
Platform Layer: MQTT broker, TSDB, Kafka, ML inference
Application Layer: Dashboards, ML models, ERP integration

Each layer has its own protocols, performance requirements, and engineering discipline. ML systems typically live at the Platform and Application layers - they are consumers of data produced by the layers below.

Time-Series Database Selection

Manufacturing generates enormous volumes of time-series data. A single factory with 10,000 tags at 1-second intervals generates 10,000 rows per second = 864 million rows per day. At 30 bytes per row, that is 25 GB per day. Standard relational databases (PostgreSQL, MySQL) handle time-series poorly at this scale - they lack the columnar storage, time-partitioning, and automatic compression that make time-series workloads efficient.

InfluxDB is purpose-built for time-series. It uses a time-structured merge tree (TSM) storage engine that compresses time-series data 10-100x compared to row-oriented databases. Flux query language is optimized for time-series aggregations. InfluxDB Cloud is fully managed. For manufacturing: excellent for high-frequency sensor data, real-time dashboards, and feeding streaming ML pipelines. Limitation: complex multi-table joins are awkward.

TimescaleDB extends PostgreSQL with time-series capabilities: automatic time-based partitioning (hypertables), continuous aggregates (pre-computed rollups that stay updated as new data arrives), and columnar compression. The advantage: you get all of PostgreSQL's query flexibility plus time-series performance. For manufacturing: excellent when you need to join sensor data with relational data (production orders, maintenance records), or when you have existing PostgreSQL expertise.

Apache IoTDB is designed specifically for IIoT, with a native time-series data model that mirrors the OPC-UA information model (device hierarchy, multiple measurement series). It has the fastest write performance of the three for typical IIoT workloads.

For an ML-focused manufacturing system, TimescaleDB is often the best choice because: continuous aggregates pre-compute the rolling statistics that ML feature pipelines need, the PostgreSQL compatibility makes it easy to join sensor data with training labels from the operations database, and the mature ecosystem supports production deployment.

Streaming vs Batch for ML Workloads

Two patterns for delivering sensor data to ML models:

Batch pattern: At inference time, query the TSDB for the last N minutes of sensor data, compute features, run inference. Suitable for: predictive maintenance (run every 5-30 minutes), quality prediction (run at end-of-batch), scheduled analytics. Latency: 100ms to several seconds depending on query size.

Streaming pattern: Subscribe to real-time sensor updates via Kafka or MQTT. Maintain a sliding window buffer per asset. Compute features on each new window. Run inference continuously. Suitable for: real-time anomaly detection, process monitoring, inline inspection. Latency: <100ms end-to-end.

The choice is driven by the time-sensitivity of the decision. Predictive maintenance does not need to react in seconds - it needs to predict failure days in advance. A batch query every 5 minutes is perfectly adequate. Real-time anomaly detection that must react to a sensor spike in 10 seconds requires the streaming pattern.


Code Examples

1. OPC-UA Python Client with asyncua

"""
OPC-UA client using the asyncua library.

OPC-UA is the universal standard for industrial data access.
This client handles:
- Connection with certificate-based authentication
- Reading current values (synchronous read)
- Subscription-based real-time updates (asynchronous push)
- Historical data access (HistoricalRead service)
- Browsing the OPC-UA address space

Install: pip install asyncua cryptography
"""
import asyncio
import logging
from datetime import datetime, timezone, timedelta
from typing import Dict, List, Optional, Callable, Any
import pandas as pd
import numpy as np
from asyncua import Client, ua
from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256

logger = logging.getLogger(__name__)


class IndustrialOPCUAClient:
"""
Production-grade OPC-UA client for industrial data collection.

Supports both anonymous and certificate-based authentication.
Certificate-based is required for production deployments.
"""

def __init__(
self,
url: str,
username: Optional[str] = None,
password: Optional[str] = None,
client_cert_path: Optional[str] = None,
client_key_path: Optional[str] = None,
server_cert_path: Optional[str] = None
):
self.url = url
self.username = username
self.password = password
self.client_cert = client_cert_path
self.client_key = client_key_path
self.server_cert = server_cert_path
self._client: Optional[Client] = None
self._subscriptions = {}

async def connect(self):
"""Establish connection to OPC-UA server."""
self._client = Client(url=self.url)

# Set up certificate security if certificates provided
if self.client_cert and self.client_key:
await self._client.set_security(
SecurityPolicyBasic256Sha256,
certificate=self.client_cert,
private_key=self.client_key,
server_certificate=self.server_cert,
mode=ua.MessageSecurityMode.SignAndEncrypt
)
logger.info("OPC-UA security configured with certificate authentication")

if self.username:
self._client.set_user(self.username)
self._client.set_password(self.password)

await self._client.connect()
logger.info(f"Connected to OPC-UA server: {self.url}")

async def disconnect(self):
"""Clean disconnect and cleanup subscriptions."""
for sub in self._subscriptions.values():
await sub.delete()
self._subscriptions.clear()

if self._client:
await self._client.disconnect()
logger.info("Disconnected from OPC-UA server")

async def browse_nodes(
self,
node_id: str = "i=85", # Objects folder
depth: int = 3
) -> List[Dict]:
"""
Browse the OPC-UA address space.
Returns a list of nodes with their IDs, names, and types.
Useful for discovering what data is available.
"""
root_node = self._client.get_node(node_id)
results = []
await self._browse_recursive(root_node, results, depth, 0)
return results

async def _browse_recursive(self, node, results, max_depth, current_depth):
if current_depth >= max_depth:
return
try:
children = await node.get_children()
for child in children:
browse_name = await child.read_browse_name()
node_class = await child.read_node_class()
results.append({
"node_id": str(child.nodeid),
"name": browse_name.Name,
"class": str(node_class),
"depth": current_depth
})
await self._browse_recursive(child, results, max_depth, current_depth + 1)
except Exception:
pass

async def read_values(
self,
node_ids: List[str]
) -> Dict[str, Dict]:
"""
Read current values of multiple nodes in a single request.

Returns dict of node_id -> {value, timestamp, quality}
"""
nodes = [self._client.get_node(nid) for nid in node_ids]

# Batch read - more efficient than individual reads
data_values = await self._client.read_values(nodes)

results = {}
for i, nid in enumerate(node_ids):
dv = data_values[i]
results[nid] = {
"value": dv.Value.Value if dv.Value else None,
"timestamp": dv.SourceTimestamp or datetime.now(tz=timezone.utc),
"status": str(dv.StatusCode) if dv.StatusCode else "Good"
}
return results

async def subscribe_to_changes(
self,
node_ids: List[str],
callback: Callable[[str, Any, datetime], None],
sampling_interval_ms: int = 100,
subscription_name: str = "default"
):
"""
Subscribe to value changes (server pushes updates to client).

More efficient than polling for high-frequency sensors.
The server only sends a message when the value changes by
more than the deadband threshold.
"""

class SubscriptionHandler:
def __init__(self, cb, nid_list):
self.cb = cb
self.node_to_id = {}
self.nid_list = nid_list

async def datachange_notification(self, node, val, data):
node_id = str(node.nodeid)
timestamp = data.monitored_item.Value.SourceTimestamp or datetime.now(tz=timezone.utc)
self.cb(node_id, val, timestamp)

handler = SubscriptionHandler(callback, node_ids)
subscription = await self._client.create_subscription(
sampling_interval_ms, handler
)
nodes = [self._client.get_node(nid) for nid in node_ids]
await subscription.subscribe_data_change(nodes)

self._subscriptions[subscription_name] = subscription
logger.info(
f"Subscribed to {len(node_ids)} nodes "
f"at {sampling_interval_ms}ms interval"
)
return subscription

async def read_historical(
self,
node_id: str,
start_time: datetime,
end_time: datetime,
max_values: int = 100000
) -> pd.DataFrame:
"""
Read historical data using OPC-UA HistoricalRead service.

Many industrial SCADA systems and historians expose their
historical data through this service, enabling ML training
dataset extraction via standard OPC-UA.
"""
node = self._client.get_node(node_id)

# HistoricalRead returns a list of DataValues
result = await node.read_raw_history(
starttime=start_time,
endtime=end_time,
numvalues=max_values,
returnbounds=False
)

if not result:
return pd.DataFrame(columns=["timestamp", "value", "quality"])

records = []
for dv in result:
if dv.Value and dv.Value.Value is not None:
records.append({
"timestamp": dv.SourceTimestamp or dv.ServerTimestamp,
"value": dv.Value.Value,
"quality": "Good" if dv.StatusCode.is_good() else "Bad"
})

df = pd.DataFrame(records)
if not df.empty:
df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
df = df.sort_values("timestamp").reset_index(drop=True)

logger.info(f"Read {len(df)} historical values for node {node_id}")
return df


async def run_continuous_collection(
client: IndustrialOPCUAClient,
tag_config: Dict[str, str], # {tag_name: node_id}
kafka_producer, # Kafka producer for downstream processing
influx_writer # InfluxDB writer for storage
):
"""
Continuous data collection loop.
Subscribes to all configured tags and routes data to Kafka + InfluxDB.
"""
collected = {"count": 0}

def on_value_change(node_id: str, value: Any, timestamp: datetime):
tag_name = {v: k for k, v in tag_config.items()}.get(node_id, node_id)

# Route to Kafka for stream processing / ML inference
if kafka_producer:
import json
kafka_producer.produce(
"factory-sensor-data",
value=json.dumps({
"tag": tag_name,
"value": float(value) if value is not None else None,
"timestamp": timestamp.isoformat(),
"node_id": node_id
}).encode()
)

# Write to InfluxDB for storage and historical queries
if influx_writer:
from influxdb_client import Point
point = (
Point("sensor_reading")
.tag("tag_name", tag_name)
.field("value", float(value) if value is not None else 0.0)
.time(timestamp)
)
influx_writer.write(bucket="manufacturing", record=point)

collected["count"] += 1
if collected["count"] % 1000 == 0:
logger.info(f"Collected {collected['count']} readings")

await client.subscribe_to_changes(
list(tag_config.values()),
on_value_change,
sampling_interval_ms=100
)

# Keep running
while True:
await asyncio.sleep(1.0)

2. InfluxDB Time-Series Write and Query

"""
InfluxDB v2 Python client for industrial sensor data.

InfluxDB v2 key concepts:
- Bucket: named container for data (like a database)
- Measurement: like a table name (e.g., "vibration", "temperature")
- Tag: indexed metadata (sensor_id, line_id, machine_id)
- Field: actual data values (value, amplitude, frequency)
- Time: timestamp (nanosecond precision)

Install: pip install influxdb-client
"""
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Optional


class InfluxDBSensorStore:
"""
InfluxDB store for industrial sensor time-series data.

Handles:
- High-throughput writes (batched, async)
- Feature-level queries for ML training
- Real-time queries for inference
- Continuous aggregate queries (pre-computed downsampling)
"""

def __init__(
self,
url: str,
token: str,
org: str,
bucket: str = "manufacturing"
):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.bucket = bucket
self.org = org

# Async write API - batches writes for throughput
# flush_interval_ms: how often to flush the buffer
# batch_size: max points per batch
self.write_api = self.client.write_api(
write_options=ASYNCHRONOUS,
flush_interval=1000, # Flush every 1 second
batch_size=5000 # Max 5000 points per HTTP request
)

self.query_api = self.client.query_api()

def write_sensor_reading(
self,
sensor_id: str,
machine_id: str,
measurement_type: str,
value: float,
timestamp: Optional[datetime] = None,
additional_tags: Optional[Dict] = None
):
"""
Write a single sensor reading to InfluxDB.

In production, prefer write_batch() for higher throughput.
"""
if timestamp is None:
timestamp = datetime.now(tz=timezone.utc)

point = (
Point(measurement_type)
.tag("sensor_id", sensor_id)
.tag("machine_id", machine_id)
.field("value", value)
.time(timestamp, WritePrecision.MILLISECONDS)
)

if additional_tags:
for k, v in additional_tags.items():
point = point.tag(k, v)

self.write_api.write(bucket=self.bucket, record=point)

def write_feature_vector(
self,
machine_id: str,
features: Dict[str, float],
timestamp: Optional[datetime] = None,
window_seconds: float = 5.0
):
"""
Write a computed feature vector (e.g., from FFT analysis) to InfluxDB.
This is what the ML feature pipeline writes after processing raw signals.
"""
if timestamp is None:
timestamp = datetime.now(tz=timezone.utc)

point = Point("feature_vector").tag("machine_id", machine_id)
for feature_name, feature_value in features.items():
if np.isfinite(feature_value):
point = point.field(feature_name, float(feature_value))

point = (
point
.field("window_seconds", window_seconds)
.time(timestamp, WritePrecision.MILLISECONDS)
)

self.write_api.write(bucket=self.bucket, record=point)

def query_for_training(
self,
machine_id: str,
start_time: datetime,
end_time: datetime,
measurement: str = "feature_vector",
resample_interval: str = "5s"
) -> pd.DataFrame:
"""
Query historical features for ML training dataset construction.

Returns resampled feature matrix suitable for model training.
"""
flux_query = f"""
from(bucket: "{self.bucket}")
|> range(start: {start_time.isoformat()}, stop: {end_time.isoformat()})
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["machine_id"] == "{machine_id}")
|> aggregateWindow(
every: {resample_interval},
fn: mean,
createEmpty: false
)
|> pivot(
rowKey: ["_time"],
columnKey: ["_field"],
valueColumn: "_value"
)
|> sort(columns: ["_time"])
"""

result = self.query_api.query_data_frame(flux_query, org=self.org)

if result.empty:
return pd.DataFrame()

# Clean up InfluxDB metadata columns
drop_cols = [c for c in result.columns if c.startswith("_") and c not in ["_time"]]
result = result.drop(columns=drop_cols, errors="ignore")
result = result.rename(columns={"_time": "timestamp"})
result["timestamp"] = pd.to_datetime(result["timestamp"], utc=True)

return result.reset_index(drop=True)

def query_recent_window(
self,
machine_id: str,
window_minutes: int = 5,
measurement: str = "feature_vector"
) -> pd.DataFrame:
"""
Query the most recent N minutes of feature data for real-time inference.
"""
flux_query = f"""
from(bucket: "{self.bucket}")
|> range(start: -{window_minutes}m)
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["machine_id"] == "{machine_id}")
|> pivot(
rowKey: ["_time"],
columnKey: ["_field"],
valueColumn: "_value"
)
|> sort(columns: ["_time"])
"""
result = self.query_api.query_data_frame(flux_query, org=self.org)
if result.empty:
return pd.DataFrame()
drop_cols = [c for c in result.columns if c.startswith("_") and c not in ["_time"]]
result = result.drop(columns=drop_cols, errors="ignore")
return result.rename(columns={"_time": "timestamp"}).reset_index(drop=True)

def close(self):
self.write_api.close()
self.client.close()

3. TimescaleDB Continuous Aggregates for ML Features

"""
TimescaleDB (PostgreSQL + time-series extensions) for industrial sensor data.

TimescaleDB's "continuous aggregates" are materialized views that
automatically update as new data arrives. This pre-computes the
rolling statistics (mean, std, min, max over 5-minute windows) that
ML feature pipelines need, eliminating expensive on-the-fly aggregations.

Install: pip install psycopg2-binary
Requires: TimescaleDB extension on PostgreSQL
"""
import psycopg2
import psycopg2.extras
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
from contextlib import contextmanager


class TimescaleDBSensorStore:
"""
TimescaleDB store for industrial sensor data with ML-optimized schema.
"""

def __init__(self, dsn: str):
"""
dsn: PostgreSQL connection string
Example: "host=localhost port=5432 dbname=manufacturing user=ml_user password=secret"
"""
self.dsn = dsn

@contextmanager
def _connection(self):
conn = psycopg2.connect(self.dsn)
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()

def create_schema(self):
"""
Create the sensor data schema with TimescaleDB hypertable and
continuous aggregates for ML features.
"""
with self._connection() as conn:
cur = conn.cursor()

# Main sensor readings table
cur.execute("""
CREATE TABLE IF NOT EXISTS sensor_readings (
time TIMESTAMPTZ NOT NULL,
machine_id TEXT NOT NULL,
sensor_id TEXT NOT NULL,
measurement TEXT NOT NULL,
value DOUBLE PRECISION,
quality SMALLINT DEFAULT 1
);
""")

# Convert to hypertable (TimescaleDB magic for time-partitioning)
cur.execute("""
SELECT create_hypertable(
'sensor_readings', 'time',
if_not_exists => TRUE,
chunk_time_interval => INTERVAL '1 day'
);
""")

# Index for fast machine_id + time queries
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_sensor_machine_time
ON sensor_readings (machine_id, sensor_id, time DESC);
""")

# Continuous aggregate: 5-minute rolling statistics
# This is what ML feature pipelines query
cur.execute("""
CREATE MATERIALIZED VIEW IF NOT EXISTS sensor_features_5min
WITH (timescaledb.continuous) AS
SELECT
time_bucket('5 minutes', time) AS bucket,
machine_id,
sensor_id,
measurement,
AVG(value) AS mean_value,
STDDEV(value) AS std_value,
MIN(value) AS min_value,
MAX(value) AS max_value,
COUNT(value) AS sample_count,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) AS p95_value
FROM sensor_readings
WHERE quality = 1 -- Only good quality readings
GROUP BY bucket, machine_id, sensor_id, measurement
WITH NO DATA;
""")

# Set refresh policy: keep aggregate up to date
cur.execute("""
SELECT add_continuous_aggregate_policy(
'sensor_features_5min',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '5 minutes',
schedule_interval => INTERVAL '5 minutes',
if_not_exists => TRUE
);
""")

# Compression policy: compress data older than 7 days
# TimescaleDB compression achieves 90-95% size reduction
cur.execute("""
ALTER TABLE sensor_readings
SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'machine_id, sensor_id'
);
""")

cur.execute("""
SELECT add_compression_policy(
'sensor_readings',
INTERVAL '7 days',
if_not_exists => TRUE
);
""")

print("TimescaleDB schema created successfully")

def write_readings_batch(self, readings: List[Dict]):
"""
Bulk insert sensor readings using copy_expert for maximum throughput.
Can achieve 100,000+ rows/second on a reasonable server.
"""
import io

if not readings:
return

# Build CSV buffer for COPY command
buf = io.StringIO()
for r in readings:
ts = r["time"].isoformat() if isinstance(r["time"], datetime) else r["time"]
value = r.get("value", "")
quality = r.get("quality", 1)
buf.write(
f"{ts}\t{r['machine_id']}\t{r['sensor_id']}\t"
f"{r['measurement']}\t{value}\t{quality}\n"
)
buf.seek(0)

with self._connection() as conn:
cur = conn.cursor()
cur.copy_expert(
"""COPY sensor_readings (time, machine_id, sensor_id, measurement, value, quality)
FROM STDIN""",
buf
)

def query_features_for_training(
self,
machine_ids: List[str],
start_time: datetime,
end_time: datetime,
feature_sensors: List[str]
) -> pd.DataFrame:
"""
Query 5-minute feature aggregates for ML training dataset.

Uses the continuous aggregate - returns in milliseconds
instead of minutes for raw query.
"""
with self._connection() as conn:
query = """
SELECT
bucket as timestamp,
machine_id,
sensor_id,
mean_value,
std_value,
min_value,
max_value,
p95_value,
sample_count
FROM sensor_features_5min
WHERE
machine_id = ANY(%s)
AND sensor_id = ANY(%s)
AND bucket >= %s
AND bucket < %s
ORDER BY bucket, machine_id, sensor_id
"""
df = pd.read_sql(
query, conn,
params=(
machine_ids,
feature_sensors,
start_time,
end_time
)
)

# Pivot to wide format: one row per (timestamp, machine_id)
# one column per (sensor_id, feature_type)
if df.empty:
return df

df_pivot = df.pivot_table(
index=["timestamp", "machine_id"],
columns="sensor_id",
values=["mean_value", "std_value", "max_value", "p95_value"]
)
df_pivot.columns = [f"{feat}_{sensor}" for feat, sensor in df_pivot.columns]
df_pivot = df_pivot.reset_index()

return df_pivot

def query_latest_features(
self,
machine_id: str,
lookback_windows: int = 12 # Last 12 x 5min = 60min of features
) -> pd.DataFrame:
"""
Query the most recent feature windows for real-time ML inference.
"""
with self._connection() as conn:
query = """
SELECT
bucket as timestamp,
sensor_id,
mean_value,
std_value,
min_value,
max_value,
p95_value
FROM sensor_features_5min
WHERE
machine_id = %s
AND bucket >= NOW() - INTERVAL '1 hour'
ORDER BY bucket DESC, sensor_id
LIMIT %s
"""
df = pd.read_sql(
query, conn,
params=(machine_id, lookback_windows * 20)
)
return df

4. Kafka-Based IIoT Streaming Pipeline

"""
Apache Kafka-based streaming pipeline for IIoT sensor data.

Kafka is the backbone for real-time sensor data pipelines in large facilities:
- Multiple data sources (OPC-UA servers, MQTT gateways) publish to Kafka topics
- ML inference consumers subscribe to topics and process in real-time
- Kafka retains data for configurable period (7 days) enabling replay

Topic design for manufacturing:
factory.{factory_id}.raw.sensors - Raw sensor readings (high volume)
factory.{factory_id}.features - Computed feature vectors (lower volume)
factory.{factory_id}.anomaly.alerts - Anomaly alerts (very low volume)
factory.{factory_id}.ml.predictions - ML model outputs

Install: pip install confluent-kafka
"""
import json
import time
from datetime import datetime
from typing import Dict, Callable, Optional, List
import logging
import threading

logger = logging.getLogger(__name__)


class SensorDataProducer:
"""
Kafka producer for industrial sensor data.
Handles serialization, partitioning, and error recovery.
"""

def __init__(
self,
bootstrap_servers: str,
factory_id: str,
client_id: str = "sensor-producer",
security_config: Optional[Dict] = None
):
try:
from confluent_kafka import Producer

config = {
"bootstrap.servers": bootstrap_servers,
"client.id": client_id,
# Reliability settings
"acks": "all", # Wait for all replicas
"retries": 10,
"retry.backoff.ms": 100,
# Throughput settings
"linger.ms": 5, # Batch messages for 5ms
"batch.size": 65536, # 64KB batches
"compression.type": "lz4", # Fast compression
}

if security_config:
config.update(security_config)

self.producer = Producer(config)
self.factory_id = factory_id
self.raw_topic = f"factory.{factory_id}.raw.sensors"
self.features_topic = f"factory.{factory_id}.features"

except ImportError:
raise RuntimeError("confluent-kafka not installed. pip install confluent-kafka")

def send_reading(
self,
machine_id: str,
sensor_id: str,
value: float,
timestamp: Optional[datetime] = None
):
"""Publish a single sensor reading."""
if timestamp is None:
timestamp = datetime.utcnow()

payload = json.dumps({
"machine_id": machine_id,
"sensor_id": sensor_id,
"value": value,
"timestamp": timestamp.isoformat(),
"factory_id": self.factory_id
})

self.producer.produce(
topic=self.raw_topic,
key=machine_id.encode(), # Partition by machine_id for ordering
value=payload.encode(),
on_delivery=self._delivery_callback
)
# Poll occasionally to handle delivery reports
self.producer.poll(0)

def send_features(
self,
machine_id: str,
features: Dict[str, float],
timestamp: Optional[datetime] = None
):
"""Publish a computed feature vector."""
if timestamp is None:
timestamp = datetime.utcnow()

payload = json.dumps({
"machine_id": machine_id,
"features": features,
"timestamp": timestamp.isoformat(),
"factory_id": self.factory_id
})

self.producer.produce(
topic=self.features_topic,
key=machine_id.encode(),
value=payload.encode()
)
self.producer.poll(0)

def _delivery_callback(self, err, msg):
if err:
logger.error(f"Message delivery failed: {err}")

def flush(self, timeout: float = 10.0):
self.producer.flush(timeout=timeout)


class MLInferenceConsumer:
"""
Kafka consumer that runs ML inference on feature vectors.

Each feature vector received triggers:
1. Anomaly detection inference
2. Result publication to alerts topic
3. Result writing to time-series database
"""

def __init__(
self,
bootstrap_servers: str,
factory_id: str,
group_id: str,
anomaly_detector, # Fitted anomaly detector
alert_producer: SensorDataProducer,
db_writer # InfluxDB or TimescaleDB writer
):
try:
from confluent_kafka import Consumer

self.consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"auto.offset.reset": "latest",
"enable.auto.commit": True,
"auto.commit.interval.ms": 5000
})

self.factory_id = factory_id
self.features_topic = f"factory.{factory_id}.features"
self.consumer.subscribe([self.features_topic])

self.anomaly_detector = anomaly_detector
self.alert_producer = alert_producer
self.db_writer = db_writer

self._running = False
self._processed = 0
self._alerts = 0

except ImportError:
raise

def start(self):
"""Start consuming and processing messages in a background thread."""
self._running = True
self._thread = threading.Thread(target=self._consume_loop, daemon=True)
self._thread.start()
logger.info(f"ML inference consumer started for factory {self.factory_id}")

def _consume_loop(self):
while self._running:
msg = self.consumer.poll(timeout=1.0)

if msg is None:
continue
if msg.error():
from confluent_kafka import KafkaError
if msg.error().code() != KafkaError._PARTITION_EOF:
logger.error(f"Kafka error: {msg.error()}")
continue

try:
payload = json.loads(msg.value().decode())
machine_id = payload["machine_id"]
features = payload["features"]
timestamp = datetime.fromisoformat(payload["timestamp"])

# Run inference
import numpy as np
feature_array = np.array(
[features.get(k, 0) for k in sorted(features.keys())]
).reshape(1, -1)

anomaly_score = float(
self.anomaly_detector.score(feature_array)[0]
)
is_alert = anomaly_score < self.anomaly_detector.threshold

self._processed += 1

# Write result to DB
if self.db_writer:
self.db_writer.write_sensor_reading(
sensor_id="anomaly_score",
machine_id=machine_id,
measurement_type="ml_predictions",
value=anomaly_score,
timestamp=timestamp
)

# Publish alert
if is_alert:
self._alerts += 1
alert_topic = f"factory.{self.factory_id}.anomaly.alerts"
self.alert_producer.producer.produce(
topic=alert_topic,
key=machine_id.encode(),
value=json.dumps({
"machine_id": machine_id,
"timestamp": timestamp.isoformat(),
"anomaly_score": round(anomaly_score, 4),
"severity": "HIGH" if anomaly_score < 0.3 else "MEDIUM"
}).encode()
)
self.alert_producer.producer.poll(0)

if self._processed % 1000 == 0:
logger.info(
f"Processed {self._processed} feature vectors, "
f"{self._alerts} alerts generated"
)

except Exception as e:
logger.error(f"Processing error: {e}")

def stop(self):
self._running = False
if hasattr(self, "_thread"):
self._thread.join(timeout=10.0)
self.consumer.close()

5. Modbus TCP Data Reader

"""
Modbus TCP client for reading from legacy industrial equipment.

Modbus is the oldest widely-used industrial protocol (1979).
Despite its age, enormous amounts of industrial equipment still
speak Modbus: older PLCs, drives, meters, sensors.

Modbus register types:
- Coils (0x): Read/write, 1-bit digital outputs
- Discrete Inputs (1x): Read-only, 1-bit digital inputs
- Input Registers (3x): Read-only, 16-bit analog inputs
- Holding Registers (4x): Read/write, 16-bit values (most common for sensors)

Install: pip install pymodbus
"""
from pymodbus.client import ModbusTcpClient
from pymodbus.constants import Endian
from pymodbus.payload import BinaryPayloadDecoder
import numpy as np
import struct
import time
from typing import Dict, List, Optional, Tuple


class ModbusSensorReader:
"""
Modbus TCP reader for industrial sensors.

Handles:
- 16-bit integer registers (common for simple sensors)
- 32-bit float registers (IEEE 754, stored in two registers)
- Scale factor and offset conversion (raw counts to engineering units)
"""

def __init__(
self,
host: str,
port: int = 502,
unit_id: int = 1, # Modbus slave address
timeout: float = 3.0,
retries: int = 3
):
self.client = ModbusTcpClient(
host=host,
port=port,
timeout=timeout,
retries=retries
)
self.unit_id = unit_id
self.connected = False

def connect(self) -> bool:
self.connected = self.client.connect()
if self.connected:
print(f"Connected to Modbus device at {self.client.host}:{self.client.port}")
else:
print(f"Failed to connect to Modbus device at {self.client.host}:{self.client.port}")
return self.connected

def disconnect(self):
self.client.close()
self.connected = False

def read_float32(self, register_address: int) -> Optional[float]:
"""
Read a 32-bit IEEE 754 float stored in two consecutive 16-bit registers.
This is the most common format for sensor values in Modbus.
"""
result = self.client.read_holding_registers(
address=register_address,
count=2,
slave=self.unit_id
)
if result.isError():
return None

decoder = BinaryPayloadDecoder.fromRegisters(
result.registers,
byteorder=Endian.Big,
wordorder=Endian.Big
)
return decoder.decode_32bit_float()

def read_uint16(
self,
register_address: int,
scale: float = 1.0,
offset: float = 0.0
) -> Optional[float]:
"""
Read a 16-bit unsigned integer register and convert to engineering units.
scale and offset: engineering_value = raw * scale + offset
Example: 0-4095 ADC count -> 0-100 bar pressure
scale = 100/4095, offset = 0
"""
result = self.client.read_holding_registers(
address=register_address,
count=1,
slave=self.unit_id
)
if result.isError():
return None
return result.registers[0] * scale + offset

def read_sensor_group(
self,
register_map: Dict[str, Dict]
) -> Dict[str, Optional[float]]:
"""
Read multiple sensors defined by a register map.

register_map format:
{
"pressure_bar": {"addr": 100, "type": "float32"},
"temperature_c": {"addr": 102, "type": "uint16", "scale": 0.1, "offset": -40},
"flow_lpm": {"addr": 104, "type": "float32"}
}
"""
values = {}
for sensor_name, config in register_map.items():
addr = config["addr"]
dtype = config.get("type", "float32")

if dtype == "float32":
values[sensor_name] = self.read_float32(addr)
elif dtype == "uint16":
scale = config.get("scale", 1.0)
offset = config.get("offset", 0.0)
values[sensor_name] = self.read_uint16(addr, scale, offset)

return values

def poll_loop(
self,
register_map: Dict[str, Dict],
callback, # Callable[[Dict], None]
interval_ms: int = 1000
):
"""
Continuous polling loop - reads all registers every interval_ms.
Call callback with each reading batch.
"""
print(f"Starting Modbus poll loop at {interval_ms}ms interval")
while True:
t0 = time.perf_counter()
values = self.read_sensor_group(register_map)
values["timestamp"] = time.time()
callback(values)

elapsed_ms = (time.perf_counter() - t0) * 1000
sleep_ms = max(0, interval_ms - elapsed_ms)
time.sleep(sleep_ms / 1000.0)

System Architecture


Production Engineering Notes

Time Synchronization Is Critical

In a manufacturing facility, sensor data arrives from hundreds of devices with their own internal clocks. A vibration sensor's clock may drift 5 minutes from the PLC's clock over a month. When you correlate vibration data with production events from the ERP system, a 5-minute timestamp mismatch can make a vibration spike appear to precede the event that caused it - corrupting any causal analysis.

The solution: PTP (Precision Time Protocol, IEEE 1588) provides sub-microsecond clock synchronization across Ethernet networks. All devices in the OT network should be PTP-synchronized. For devices that do not support PTP (older sensors, Modbus devices), the gateway that reads them should apply a calibrated offset based on periodic comparison with PTP time. Always store the source timestamp (device clock) and the receive timestamp (gateway clock) separately - this lets you diagnose synchronization issues after the fact.

For ML training: when building training datasets from multiple sensors on different devices, always join on timestamp with a tolerance window (e.g., within 500ms), not on exact timestamp equality. Exact timestamp equality will drop most rows in a real manufacturing dataset.

Data Quality and Completeness

Real industrial sensor data has gaps, bad quality markers, and inconsistencies. InfluxDB and TimescaleDB both support quality annotations (InfluxDB status codes, TimescaleDB quality columns), but upstream systems must populate them correctly. OPC-UA quality codes (Good, Uncertain, Bad) should be stored alongside values and used to filter data before ML training.

Common causes of data quality issues: communication errors (brief network disconnection = gap in data), sensor failure (constant or NaN values), engineering unit misconfiguration (temperature in Kelvin when the model expects Celsius), register address changes after PLC program update (data suddenly means something different but no one told the data team).

Build a data quality dashboard: per-sensor completeness (what fraction of expected samples actually arrived?), per-sensor value distribution (has the mean or variance shifted recently?), per-sensor bad quality rate. Review this weekly. A sensor with declining completeness is often the early warning of a failing communication link or sensor.

Message Schema Versioning

When you change the format of sensor data messages in Kafka (add a new field, rename a sensor), downstream consumers that expect the old format break. In a manufacturing environment with multiple teams consuming the same topics, uncoordinated schema changes cause production incidents.

The solution: use a schema registry (Confluent Schema Registry is the standard, available as managed service). Producers register their schema; consumers validate received messages against the expected schema. Schema evolution rules (backward compatibility: new schema can read data written with old schema) prevent breaking changes. Enforce schema registration as part of the deployment process for any code that writes to production Kafka topics.


:::warning Modbus Security Vulnerabilities Modbus was designed in 1979 with no security considerations. It has no authentication, no encryption, and no access control. Any device on the same Ethernet network can read or write any Modbus register on any other device. In a factory where the OT and IT networks are connected (as required for ML data collection), Modbus devices are exposed. Never directly expose Modbus devices on the same network as IT systems. Use a protocol gateway or firewall that provides OPC-UA or MQTT fronting for Modbus devices, with proper authentication at the gateway level. :::

:::danger Clock Drift Corrupts Correlation Analysis If sensor clocks are not synchronized, temporal correlations in your training data are corrupted by timestamp noise. A model trained on data where the vibration sensor clock is 45 seconds ahead of the temperature sensor clock learns spurious correlations - vibration changes appear to predict temperature changes 45 seconds later, when the actual relationship is simultaneous. Always verify timestamp synchronization before building training datasets. Run a clock drift audit: compare timestamps of events that should be simultaneous across different sensors (a press stroke triggers both a vibration spike and a current spike at the same moment - measure the timestamp difference) and flag any sensor with systematic drift above your tolerance. :::


Interview Questions and Answers

Q1: What is OPC-UA and why is it the preferred protocol for industrial ML data collection?

OPC-UA (Open Platform Communications Unified Architecture) is a machine-to-machine communication protocol for industrial automation, standardized by the OPC Foundation in 2009. It is preferred for ML data collection for several reasons. First, it is platform-independent - unlike its predecessor OPC DA, which was Windows COM-based, OPC-UA runs on any operating system and can be implemented in any language. Python clients (asyncua, opcua-asyncio) can connect to Siemens, Allen Bradley, Beckhoff, or any other vendor's PLC. Second, it has a self-describing information model - OPC-UA nodes carry semantic metadata (engineering units, description, range limits) that makes data understandable without external documentation. Third, it includes both pull (synchronous read) and push (subscription-based) data access modes. Fourth, it has enterprise-grade security: certificate-based authentication, message signing, and encryption are built into the protocol. Fifth, the HistoricalRead service provides standardized access to historian data for training dataset extraction - you do not need vendor-specific historian APIs.

Q2: When would you use InfluxDB vs TimescaleDB for industrial sensor data?

Both are excellent for time-series data but have different strengths. InfluxDB is purpose-built for time-series: the Flux query language is optimized for time-series operations (windowing, downsampling, anomaly detection), writes are extremely fast (millions of points per second), and the data model (measurements, tags, fields) maps naturally to sensor data. Choose InfluxDB when: your primary workload is ingest and real-time query of sensor data, you need a managed cloud offering (InfluxDB Cloud), and your team is comfortable learning Flux. TimescaleDB is PostgreSQL with time-series extensions: all standard SQL works, joins between sensor data and relational tables (production orders, maintenance records, bills of material) are trivial, and continuous aggregates provide automatic feature pre-computation. Choose TimescaleDB when: you need to join sensor data with operational data from your manufacturing database, your team has PostgreSQL expertise, or you want the full flexibility of SQL for ad-hoc analysis. For ML-heavy workloads, TimescaleDB often wins because the training dataset construction almost always involves joining sensor data with label data (maintenance records, quality outcomes), and SQL is far better for this than Flux.

Q3: How do you handle late-arriving data in a Kafka-based sensor pipeline?

Late-arriving data is common in industrial environments: a sensor on a spotty wireless connection might deliver a reading 30 seconds late; a gateway that buffered 5 minutes of data during a brief network outage delivers it all at once when reconnected. The approaches depend on latency tolerance. For real-time inference (anomaly detection with 5-second windows): late arrivals are simply dropped - you cannot retroactively re-run the model on a window that has already been scored. Design the system to tolerate occasional gaps. For batch analytics and training dataset construction: use event-time processing rather than wall-clock time. Kafka Streams and Apache Flink support windowed aggregations based on the event timestamp in the message, not the wall-clock time when the message arrived. Late messages that arrive within a configurable grace period are included in the correct window; messages beyond the grace period are either dropped or routed to a dead-letter queue for manual investigation. Always store the event timestamp (sensor clock) and the ingest timestamp (Kafka receipt time) separately - the difference tells you the pipeline latency distribution, which is essential for diagnosing late arrival patterns.

Q4: What is the digital thread concept and how does ML fit into it?

The digital thread is the connected flow of data across a product's entire lifecycle: design (CAD models, simulation results), manufacturing (process parameters, sensor data, quality measurements), field operation (usage data, maintenance records, failure history), and end-of-life (decommissioning). The term comes from aerospace, where Lockheed and Boeing use it to maintain a continuous traceable link from the original design intent through every manufacturing and service event. ML fits into the digital thread at multiple points: process optimization models that use digital thread data from previous similar parts to predict optimal parameters for the current part, predictive maintenance models that use field failure data to improve the manufacturing process (if parts fail in the field due to a manufacturing process variation, the digital thread connects that failure back to the specific process parameters of the failing batch), and quality prediction models that use in-process sensor data plus historical failure data from the digital thread. The practical challenge is data integration: design data lives in CAD/PLM systems (Siemens Teamcenter, PTC Windchill), manufacturing data in MES/ERP systems, and field data in CRM/service management systems. The digital thread requires standardized identifiers and APIs to link data across these systems.

Q5: How would you design a Kafka topic structure for a large manufacturing facility?

Topic design is an architectural decision that is very hard to change once production data is flowing. Key principles: (1) Topic per data type, not topic per device. One topic for "factory.{id}.raw.sensors" handles all sensors, with machine_id in the message key for partitioning. This is vastly more scalable than one topic per sensor. (2) Use message keys for partitioning by machine. Kafka partitions are the unit of ordering - messages with the same key go to the same partition and are ordered. Use machine_id as the key so all readings from one machine are ordered and processed by the same consumer. (3) Separate raw data from processed data. "factory.raw.sensors" contains raw values. "factory.features" contains computed feature vectors. "factory.anomaly.alerts" contains anomaly events. Consumers choose their starting point in the processing pipeline. (4) Set retention by data type. Raw sensor data: 7 days (re-processing window if a consumer fails). Feature vectors: 30 days (for retraining). Alerts: 1 year (compliance). (5) Configure replication factor = 3 for all production topics. In a factory, the Kafka cluster may have nodes on both the OT and IT sides of the DMZ - replicate across both for resilience.


Key Takeaways

Industrial IoT data engineering is the unsexy but essential foundation for any manufacturing ML system. The stack: OPC-UA for secure, self-describing industrial data access; MQTT for lightweight edge-to-cloud transport; Kafka for scalable streaming with replay capability; InfluxDB or TimescaleDB for efficient time-series storage with ML-friendly query patterns; Modbus as the legacy protocol you will inevitably encounter. The operational challenges - clock synchronization, data quality annotation, schema versioning, OT/IT network segmentation - are as important as the technology choices. A well-designed IIoT data pipeline turns the manufacturing floor into a structured, queryable, ML-ready data source. Without this foundation, even the best ML models cannot reach their potential because they will be starved of reliable, timely, well-labeled data.

© 2026 EngineersOfAI. All rights reserved.