Skip to main content

Edge AI in Manufacturing

Reading time: ~45 min · Interview relevance: High · Target roles: ML Engineer, Edge AI Engineer, Industrial AI Systems Engineer


When the Cloud Is Not Enough

It is 2:47 AM in an automotive stamping plant. A 2,500-ton press is forming body panels at 18 strokes per minute. Each stroke generates a brief, sharp vibration signature in the 200-400 Hz range that encodes the condition of the die set. A worn die produces subtly different vibration than a fresh one. Catching the difference before the die fails catastrophically - which would destroy the die, damage the press, and create a 4-hour production stoppage - requires vibration analysis at every single stroke.

The analysis window is 3.3 seconds per stroke. Within that window, you must acquire the vibration signal, extract features, run the anomaly detection model, and output a go/no-go decision. If the decision is "die condition degraded," you need to alert the operator in time to schedule a planned die change within the next shift rather than react to an unplanned failure.

Now consider what a cloud-based architecture looks like for this problem. You upload 200ms of vibration data at 10 kHz (2,000 samples) over the factory LAN, through the industrial firewall, to the cloud ML service. The cloud runs inference and sends back a result. Round-trip latency: 100-500ms on a good day, 2,000ms on a bad network day. At 18 strokes per minute, you cannot miss a single stroke. The cloud architecture fails on latency alone, before you even consider network reliability, data sovereignty requirements, or the bandwidth cost of streaming high-frequency sensor data from 200 presses simultaneously.

This is why edge AI exists in manufacturing: some decisions must be made at the source, in real time, with a guarantee of availability even when the network is down. The edge device - whether an NVIDIA Jetson, an Intel NUC, or an industrial PC with a GPU - lives next to the equipment, has direct sensor connections, and runs inference without network dependence. The cloud handles model training, fleet management, and analytics. The edge handles real-time decisions. This division of labor defines the industrial edge AI architecture.


Why This Exists

The Four Drivers of Edge AI

Latency: Some decisions cannot wait for a round-trip to the cloud. Visual inspection reject signals must reach the PLC in under 200ms. Vibration anomaly detection must keep pace with rotating equipment. Safety-critical interlocks must respond in milliseconds. Any ML application with a hard real-time requirement belongs at the edge.

Bandwidth: A single factory with 1,000 sensors generating data at 1 kHz would produce approximately 4 GB/s of raw sensor data. Transmitting this continuously to the cloud is technically feasible but economically irrational and practically challenging in most industrial network environments. Edge processing performs data reduction - raw signals become features, features become decisions, decisions become telemetry - reducing transmitted data by 100-1,000x.

Data sovereignty: Many manufacturers have contractual obligations (with automotive OEMs, aerospace primes, defense customers) not to transmit raw production data outside the facility. Some production data is trade-secret (proprietary alloy compositions, secret process parameters). Edge inference keeps sensitive data at the facility.

Reliability: Factory networks are not consumer internet. VLANs segregate OT (operational technology) from IT networks. Firewalls restrict external connectivity. Network maintenance windows can interrupt connectivity for hours. Edge devices must operate autonomously during connectivity gaps, buffering decisions and syncing with the cloud when connectivity resumes.

Edge Hardware Taxonomy

Industrial edge deployments use hardware spanning a wide performance and cost range:

DeviceGPU TOPSPowerUse CaseCost
NVIDIA Jetson Nano0.55-10WSimple inference, embedded$150
NVIDIA Jetson Xavier NX2110-20WVision, moderate models$500
NVIDIA Jetson AGX Orin27515-60WComplex models, multi-camera$1,000+
Intel NUC (OpenVINO)CPU only15-28WINT8 CPU inference$400-700
Industrial IPC + T4 GPU~6570-300WHeavy inference workloads$2,000-5,000
Raspberry Pi CM4No GPU5-15WLightweight classification only$55

The choice depends on model complexity, inference latency requirement, available power, and cost-per-device. For vision inspection (larger models, stricter latency), Jetson AGX Orin or industrial IPC with GPU is appropriate. For sensor anomaly detection (smaller models, looser latency), Jetson Xavier NX or Intel NUC is sufficient.


Historical Context

Edge computing as a concept predates "edge AI" by a decade. Content Delivery Networks (CDNs) in the 2000s moved computation toward users for latency reduction - the same principle applied to industrial data. The term "fog computing" (Cisco, 2012) described processing at network nodes between cloud and endpoint. "Edge AI" emerged around 2017 as mobile chipmakers (Qualcomm, Apple's A-series Neural Engine, Google's Edge TPU) demonstrated that meaningful neural network inference was feasible on battery-powered devices.

For industrial applications, the pivotal products were NVIDIA's Jetson TX2 (2017) and Jetson AGX Xavier (2018), which brought GPU-accelerated deep learning to an industrial form factor at a price point compatible with per-machine deployment. TensorRT (NVIDIA's inference optimizer) and ONNX (Open Neural Network Exchange format, 2017) provided the software stack. Intel's OpenVINO toolkit (2018) enabled efficient inference on Intel CPUs and integrated graphics, covering the large installed base of Intel-based industrial PCs.

The deployment of AlexNet-class vision models (50ms inference on GPU) gave way to ResNet-50 and MobileNet-V2 optimized for edge (30ms on Jetson Xavier NX with TensorRT INT8), and today to transformer-based vision models running at 100-200ms on Jetson AGX Orin.


Core Concepts

Model Optimization for Edge

A PyTorch or TensorFlow model trained on a workstation GPU will run too slowly on edge hardware without optimization. Four main techniques:

Quantization reduces the numerical precision of model weights and activations from float32 (4 bytes) to INT8 (1 byte) or INT4 (0.5 bytes). The memory reduction (4x for INT8 vs FP32) directly reduces inference latency because memory bandwidth is often the bottleneck. Accuracy loss is typically 0.5-2% for INT8 with proper calibration, acceptable for industrial inspection. TensorRT handles INT8 calibration automatically given a calibration dataset. Post-Training Quantization (PTQ) is the most common approach - quantize after training, using a small representative dataset to calibrate.

Pruning removes weights that contribute little to model output. Structured pruning removes entire channels or attention heads, producing a smaller, regular model that runs faster. Unstructured pruning zeros individual weights, which requires sparse computation support to actually speed up inference (available in some accelerators). Pruning to 50% sparsity typically costs 1-3% accuracy.

Knowledge Distillation trains a small "student" model to mimic the behavior of a large "teacher" model. The student is trained to match the teacher's soft output probabilities (which carry more information than hard labels) rather than ground-truth labels. The result: a 5-10x smaller model with 2-5% accuracy penalty. For industrial inspection, a ResNet-50 teacher can produce a MobileNet student that runs 4x faster with acceptable accuracy.

ONNX Export + TensorRT Compilation: The standard deployment pipeline for NVIDIA edge hardware:

  1. Train model in PyTorch
  2. Export to ONNX (standardized format)
  3. Compile ONNX with TensorRT (hardware-specific kernel fusion, quantization, memory optimization)
  4. Deploy TensorRT engine on Jetson or NVIDIA GPU

Code Examples

1. TensorRT INT8 Quantization for Edge Deployment

"""
TensorRT INT8 quantization pipeline for deploying PyTorch models to NVIDIA Jetson.

Workflow:
1. Train model in PyTorch (on workstation)
2. Export to ONNX
3. Compile to TensorRT engine with INT8 calibration (on target device)
4. Deploy TensorRT engine

Requires: tensorrt, pycuda, torch
On Jetson devices, TensorRT is pre-installed.
"""
import torch
import torch.nn as nn
import numpy as np
from pathlib import Path
import time


def export_to_onnx(
model: nn.Module,
input_shape: tuple,
output_path: str,
opset_version: int = 17
) -> str:
"""
Export PyTorch model to ONNX format.

input_shape: (batch, channels, height, width) for vision models
or (batch, seq_len, features) for sequence models
"""
model.eval()
dummy_input = torch.randn(*input_shape)

# Dynamic axes for variable batch size and sequence length
dynamic_axes = {"input": {0: "batch_size"}, "output": {0: "batch_size"}}

torch.onnx.export(
model,
dummy_input,
output_path,
opset_version=opset_version,
input_names=["input"],
output_names=["output"],
dynamic_axes=dynamic_axes,
do_constant_folding=True # Constant folding optimization
)

print(f"ONNX model exported to {output_path}")
# Verify the export
import onnx
onnx_model = onnx.load(output_path)
onnx.checker.check_model(onnx_model)
print("ONNX model validation passed")
return output_path


class CalibrationDataLoader:
"""
Provides calibration data for TensorRT INT8 calibration.
Uses a small representative dataset (typically 500-1000 samples).
"""

def __init__(self, data: np.ndarray, batch_size: int = 8):
self.data = data.astype(np.float32)
self.batch_size = batch_size
self.idx = 0
self.n_batches = len(data) // batch_size

def reset(self):
self.idx = 0

def next_batch(self) -> np.ndarray:
if self.idx >= len(self.data):
return None
batch = self.data[self.idx:self.idx + self.batch_size]
self.idx += self.batch_size
return batch


def build_tensorrt_engine(
onnx_path: str,
engine_path: str,
input_shape: tuple,
calibration_data: np.ndarray,
use_int8: bool = True,
workspace_gb: float = 4.0
) -> str:
"""
Build TensorRT engine from ONNX model with INT8 quantization.

This runs on the target device (Jetson or NVIDIA GPU).
The calibration step takes 5-15 minutes depending on dataset size.

Returns path to compiled TensorRT engine.
"""
try:
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit

logger = trt.Logger(trt.Logger.WARNING)
builder = trt.Builder(logger)
network = builder.create_network(
1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
)
parser = trt.OnnxParser(network, logger)

# Parse ONNX model
with open(onnx_path, "rb") as f:
if not parser.parse(f.read()):
for i in range(parser.num_errors):
print(f"ONNX parse error: {parser.get_error(i)}")
raise RuntimeError("Failed to parse ONNX model")

# Build configuration
config = builder.create_builder_config()
config.max_workspace_size = int(workspace_gb * 1024**3)

if use_int8 and builder.platform_has_fast_int8:
config.set_flag(trt.BuilderFlag.INT8)

class Int8Calibrator(trt.IInt8EntropyCalibrator2):
def __init__(self, data, batch_size, cache_file="calibration.cache"):
super().__init__()
self.loader = CalibrationDataLoader(data, batch_size)
self.cache_file = cache_file
self.device_input = cuda.mem_alloc(
data[0:batch_size].nbytes
)

def get_batch_size(self):
return self.loader.batch_size

def get_batch(self, names):
batch = self.loader.next_batch()
if batch is None:
return None
cuda.memcpy_htod(self.device_input, np.ascontiguousarray(batch))
return [int(self.device_input)]

def read_calibration_cache(self):
if Path(self.cache_file).exists():
print(f"Loading calibration cache: {self.cache_file}")
with open(self.cache_file, "rb") as f:
return f.read()
return None

def write_calibration_cache(self, cache):
with open(self.cache_file, "wb") as f:
f.write(cache)
print(f"Calibration cache saved to {self.cache_file}")

calibrator = Int8Calibrator(calibration_data, batch_size=8)
config.int8_calibrator = calibrator
print("INT8 calibration configured")
elif use_int8:
print("INT8 not available on this platform, using FP16")
config.set_flag(trt.BuilderFlag.FP16)

# Build engine
print(f"Building TensorRT engine (this may take 5-15 minutes)...")
t0 = time.time()
engine = builder.build_engine(network, config)
print(f"Engine built in {time.time() - t0:.1f}s")

# Serialize to file
with open(engine_path, "wb") as f:
f.write(engine.serialize())
print(f"TensorRT engine saved to {engine_path}")
return engine_path

except ImportError:
print("TensorRT not available. Install on target device (Jetson).")
raise


class TensorRTInference:
"""
Fast inference using a compiled TensorRT engine.
This is what runs on the Jetson device in production.
"""

def __init__(self, engine_path: str):
try:
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit

logger = trt.Logger(trt.Logger.ERROR)
runtime = trt.Runtime(logger)

with open(engine_path, "rb") as f:
engine_data = f.read()

self.engine = runtime.deserialize_cuda_engine(engine_data)
self.context = self.engine.create_execution_context()

# Pre-allocate GPU memory for inputs and outputs
self.inputs = []
self.outputs = []
self.bindings = []

for binding in self.engine:
size = (
trt.volume(self.engine.get_binding_shape(binding))
* self.engine.max_batch_size
)
dtype = trt.nptype(self.engine.get_binding_dtype(binding))
host_mem = cuda.pagelocked_empty(size, dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
self.bindings.append(int(device_mem))

if self.engine.binding_is_input(binding):
self.inputs.append({"host": host_mem, "device": device_mem})
else:
self.outputs.append({"host": host_mem, "device": device_mem})

self.stream = cuda.Stream()
self._available = True

except ImportError:
print("TensorRT not available, falling back to ONNX Runtime")
self._available = False
self._onnx_session = None

def infer(self, input_data: np.ndarray) -> np.ndarray:
"""
Run inference. Returns model output as numpy array.

Typical latency (Jetson AGX Orin, INT8):
- ResNet-50 (224x224): ~5ms
- MobileNet-V2 (224x224): ~2ms
- LSTM sequence (batch=1, seq=60, features=14): ~1ms
"""
if not self._available:
# Fallback
return self._onnx_infer(input_data)

import pycuda.driver as cuda

np.copyto(self.inputs[0]["host"], input_data.ravel())
cuda.memcpy_htod_async(
self.inputs[0]["device"],
self.inputs[0]["host"],
self.stream
)

self.context.execute_async_v2(
bindings=self.bindings,
stream_handle=self.stream.handle
)

cuda.memcpy_dtoh_async(
self.outputs[0]["host"],
self.outputs[0]["device"],
self.stream
)
self.stream.synchronize()

return self.outputs[0]["host"].copy()

2. ONNX Runtime Inference on Jetson / CPU

"""
ONNX Runtime inference - works on Jetson, Intel NUC, x86 industrial PCs.
Good fallback when TensorRT is not available or when targeting Intel hardware.

ONNX Runtime auto-selects the best execution provider:
- TensorrtExecutionProvider: NVIDIA GPUs with TensorRT
- CUDAExecutionProvider: NVIDIA GPUs without TensorRT
- OpenVINOExecutionProvider: Intel CPUs/iGPUs
- CPUExecutionProvider: any CPU
"""
import onnxruntime as ort
import numpy as np
import time
from pathlib import Path
from typing import List, Optional


class ONNXRuntimeInference:
"""
ONNX Runtime wrapper for edge deployment.
Supports multiple execution providers with automatic fallback.
"""

def __init__(
self,
model_path: str,
preferred_providers: Optional[List[str]] = None
):
# Execution provider priority: fastest first
if preferred_providers is None:
preferred_providers = [
"TensorrtExecutionProvider",
"CUDAExecutionProvider",
"OpenVINOExecutionProvider",
"CPUExecutionProvider"
]

# Only keep providers that are actually available
available = ort.get_available_providers()
providers = [p for p in preferred_providers if p in available]
if not providers:
providers = ["CPUExecutionProvider"]

print(f"Available providers: {available}")
print(f"Using: {providers[0]}")

# Session options for production
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
sess_options.intra_op_num_threads = 4 # Tune for your hardware
sess_options.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL

# Enable TensorRT-specific options if using TensorRT provider
provider_options = []
for provider in providers:
if provider == "TensorrtExecutionProvider":
provider_options.append({
"trt_engine_cache_enable": True,
"trt_engine_cache_path": "/tmp/trt_cache",
"trt_int8_enable": True,
"trt_int8_calibration_table_name": "calibration.flatbuffers"
})
else:
provider_options.append({})

self.session = ort.InferenceSession(
model_path,
sess_options=sess_options,
providers=providers,
provider_options=provider_options
)

# Cache input/output metadata for fast inference
self.input_name = self.session.get_inputs()[0].name
self.output_name = self.session.get_outputs()[0].name
self.input_shape = self.session.get_inputs()[0].shape
print(f"Model loaded: input={self.input_name} {self.input_shape}")

def infer(self, input_data: np.ndarray) -> np.ndarray:
"""Run inference on a single batch."""
t0 = time.perf_counter()
outputs = self.session.run(
[self.output_name],
{self.input_name: input_data.astype(np.float32)}
)
elapsed_ms = (time.perf_counter() - t0) * 1000
return outputs[0], elapsed_ms

def warmup(self, n_warmup: int = 5):
"""
Run warmup inferences to initialize CUDA contexts and TensorRT engines.
Always warmup before measuring latency.
"""
dummy_input_shape = [
d if isinstance(d, int) and d > 0 else 1
for d in self.input_shape
]
dummy = np.zeros(dummy_input_shape, dtype=np.float32)
print(f"Warming up ({n_warmup} iterations)...")
for _ in range(n_warmup):
self.infer(dummy)
print("Warmup complete")

def benchmark(
self,
input_data: np.ndarray,
n_iterations: int = 100
) -> dict:
"""Benchmark inference latency."""
self.warmup()
latencies = []

for _ in range(n_iterations):
_, latency = self.infer(input_data)
latencies.append(latency)

latencies = np.array(latencies)
return {
"mean_ms": round(float(np.mean(latencies)), 2),
"p50_ms": round(float(np.percentile(latencies, 50)), 2),
"p95_ms": round(float(np.percentile(latencies, 95)), 2),
"p99_ms": round(float(np.percentile(latencies, 99)), 2),
"max_ms": round(float(np.max(latencies)), 2),
"throughput_fps": round(1000.0 / np.mean(latencies), 1)
}

3. MQTT Publisher/Subscriber for Edge-Cloud Communication

"""
MQTT-based edge-cloud communication pattern.

Edge device publishes:
- Inference results (anomaly scores, decisions)
- System health metrics (CPU, GPU utilization, temperature)
- Periodic feature summaries

Cloud subscribes to:
- Aggregate inference results for fleet analytics
- System health for fleet management
- Alerts for operator notification

MQTT is the standard protocol for industrial IoT edge-cloud:
- Lightweight (low overhead vs HTTP)
- Publish-subscribe (decoupled architecture)
- QoS levels (at-most-once / at-least-once / exactly-once)
- Retained messages (new subscribers get last known state immediately)
- LWT (Last Will and Testament) for device disconnect detection

Install: pip install paho-mqtt
"""
import json
import time
import threading
from datetime import datetime
from typing import Callable, Dict, Optional
import logging

logger = logging.getLogger(__name__)


class EdgePublisher:
"""
Edge device MQTT publisher.
Publishes inference results and device health to MQTT broker.
"""

# Topic structure: factory/line/asset/data_type
TOPIC_ALERTS = "factory/{factory_id}/line/{line_id}/asset/{asset_id}/alerts"
TOPIC_SCORES = "factory/{factory_id}/line/{line_id}/asset/{asset_id}/scores"
TOPIC_HEALTH = "factory/{factory_id}/line/{line_id}/asset/{asset_id}/health"
TOPIC_STATUS = "factory/{factory_id}/edge/{device_id}/status"

def __init__(
self,
broker_host: str,
broker_port: int,
factory_id: str,
line_id: str,
asset_id: str,
device_id: str,
username: Optional[str] = None,
password: Optional[str] = None,
tls_cert_path: Optional[str] = None
):
try:
import paho.mqtt.client as mqtt

self.factory_id = factory_id
self.line_id = line_id
self.asset_id = asset_id
self.device_id = device_id

self.client = mqtt.Client(
client_id=f"edge_{device_id}",
clean_session=True
)

if username and password:
self.client.username_pw_set(username, password)

if tls_cert_path:
self.client.tls_set(ca_certs=tls_cert_path)

# Last Will and Testament: publish offline status if client disconnects
lwt_topic = self.TOPIC_STATUS.format(
factory_id=factory_id, device_id=device_id
)
lwt_payload = json.dumps({
"device_id": device_id,
"status": "offline",
"timestamp": datetime.utcnow().isoformat()
})
self.client.will_set(lwt_topic, lwt_payload, qos=1, retain=True)

self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect

self.client.connect(broker_host, broker_port, keepalive=60)
self.client.loop_start()

self._connected = False
self._topic_base = {
"factory_id": factory_id,
"line_id": line_id,
"asset_id": asset_id
}

except ImportError:
logger.error("paho-mqtt not installed. Run: pip install paho-mqtt")
raise

def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
self._connected = True
# Publish online status with retained flag
status_topic = self.TOPIC_STATUS.format(
factory_id=self.factory_id, device_id=self.device_id
)
self.client.publish(
status_topic,
json.dumps({
"device_id": self.device_id,
"status": "online",
"timestamp": datetime.utcnow().isoformat()
}),
qos=1, retain=True
)
logger.info(f"Connected to MQTT broker")
else:
logger.error(f"MQTT connection failed: rc={rc}")

def _on_disconnect(self, client, userdata, rc):
self._connected = False
logger.warning(f"MQTT disconnected: rc={rc}. Will auto-reconnect.")

def publish_anomaly_score(
self,
score: float,
is_alert: bool,
severity: str = "LOW",
top_sensors: list = None
):
"""Publish anomaly detection result."""
topic = self.TOPIC_SCORES.format(**self._topic_base)
payload = {
"timestamp": datetime.utcnow().isoformat(),
"asset_id": self.asset_id,
"anomaly_score": round(score, 4),
"is_alert": is_alert,
"severity": severity,
"top_sensors": top_sensors or []
}
self.client.publish(
topic,
json.dumps(payload),
qos=0 # QoS 0 for high-frequency scores (allow loss)
)

if is_alert:
alert_topic = self.TOPIC_ALERTS.format(**self._topic_base)
self.client.publish(
alert_topic,
json.dumps(payload),
qos=1 # QoS 1 for alerts (at-least-once delivery)
)

def publish_health(self, cpu_pct: float, gpu_pct: float, temp_c: float):
"""Publish edge device health metrics."""
topic = self.TOPIC_HEALTH.format(**self._topic_base)
self.client.publish(
topic,
json.dumps({
"timestamp": datetime.utcnow().isoformat(),
"device_id": self.device_id,
"cpu_pct": round(cpu_pct, 1),
"gpu_pct": round(gpu_pct, 1),
"temp_c": round(temp_c, 1)
}),
qos=0
)

def disconnect(self):
self.client.loop_stop()
self.client.disconnect()


class CloudSubscriber:
"""
Cloud-side MQTT subscriber that receives and processes edge device messages.
"""

def __init__(
self,
broker_host: str,
broker_port: int,
alert_callback: Optional[Callable] = None,
score_callback: Optional[Callable] = None
):
try:
import paho.mqtt.client as mqtt

self.client = mqtt.Client(client_id="cloud_subscriber")
self.alert_callback = alert_callback
self.score_callback = score_callback

self.client.on_connect = self._on_connect
self.client.on_message = self._on_message

self.client.connect(broker_host, broker_port)
self.client.loop_start()

except ImportError:
raise

def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
# Subscribe to all alerts across all factories and assets
client.subscribe("factory/+/line/+/asset/+/alerts", qos=1)
# Subscribe to status updates for fleet monitoring
client.subscribe("factory/+/edge/+/status", qos=1)
logger.info("Cloud subscriber connected and subscribed")

def _on_message(self, client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
topic = msg.topic

if "/alerts" in topic:
if self.alert_callback:
self.alert_callback(topic, payload)

elif "/status" in topic:
device_id = payload.get("device_id")
status = payload.get("status")
logger.info(f"Device {device_id} is {status}")

except Exception as e:
logger.error(f"Message processing error: {e}")

4. OTA Model Update Mechanism

"""
Over-the-air (OTA) model update for edge device fleet.

When you retrain a model in the cloud (new training data, improved architecture),
you need to push the updated model to all edge devices without requiring
physical access to each device.

This is a critical operational capability for fleets of 100+ edge devices.

Architecture:
- Cloud: model registry, version management, deployment orchestration
- Edge: update agent that checks for new versions, validates, hot-swaps
"""
import hashlib
import json
import os
import shutil
import time
import threading
from pathlib import Path
from typing import Optional, Callable
import logging
import urllib.request

logger = logging.getLogger(__name__)


class ModelRegistry:
"""
Cloud-side model registry.
Manages model versions and deployment targets.

In production: backed by S3/GCS/Azure Blob + DynamoDB/Firestore
"""

def __init__(self, registry_base_url: str):
self.base_url = registry_base_url

def get_latest_version(self, model_name: str, device_profile: str) -> dict:
"""
Query registry for latest model version for a device profile.

device_profile: e.g., "jetson-agx-orin-int8" or "intel-nuc-openvino"
Returns: {version, url, checksum, metadata}
"""
url = f"{self.base_url}/models/{model_name}/latest?profile={device_profile}"
with urllib.request.urlopen(url) as response:
return json.loads(response.read())


class EdgeModelUpdater:
"""
Edge device update agent.
Runs as a background service, checks for model updates periodically.
Implements atomic update: download new model, validate, swap atomically.
"""

def __init__(
self,
model_name: str,
model_dir: str,
registry: ModelRegistry,
device_profile: str,
current_version: str,
check_interval_s: int = 3600, # Check every hour
on_update_callback: Optional[Callable] = None
):
self.model_name = model_name
self.model_dir = Path(model_dir)
self.registry = registry
self.device_profile = device_profile
self.current_version = current_version
self.check_interval = check_interval_s
self.on_update_callback = on_update_callback

self.model_dir.mkdir(parents=True, exist_ok=True)
self._running = False
self._update_thread = None

def _compute_checksum(self, file_path: str) -> str:
"""SHA256 checksum for model file integrity verification."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()

def _download_model(self, url: str, target_path: str) -> bool:
"""Download model with progress logging."""
try:
logger.info(f"Downloading model from {url}...")
urllib.request.urlretrieve(url, target_path)
logger.info(f"Download complete: {target_path}")
return True
except Exception as e:
logger.error(f"Download failed: {e}")
return False

def _validate_model(self, model_path: str, expected_checksum: str) -> bool:
"""Validate model file integrity."""
actual_checksum = self._compute_checksum(model_path)
if actual_checksum != expected_checksum:
logger.error(
f"Checksum mismatch: expected {expected_checksum[:16]}..., "
f"got {actual_checksum[:16]}..."
)
return False

# Optionally: run a quick inference to verify the model is functional
# This catches corruption that passes checksum validation
return True

def _atomic_swap(
self,
new_model_path: str,
current_model_path: str,
backup_path: str
) -> bool:
"""
Atomically swap old model with new model.
Keep backup so we can rollback if the new model fails.
"""
try:
# Back up current model
if Path(current_model_path).exists():
shutil.copy2(current_model_path, backup_path)
logger.info(f"Backup saved to {backup_path}")

# Atomic rename (same filesystem = atomic on Linux)
os.rename(new_model_path, current_model_path)
logger.info(f"Model swapped to new version")
return True

except Exception as e:
logger.error(f"Atomic swap failed: {e}")
# Attempt rollback
if Path(backup_path).exists():
shutil.copy2(backup_path, current_model_path)
logger.info("Rolled back to previous model")
return False

def check_and_update(self) -> bool:
"""
Check registry for updates. Download and apply if available.
Returns True if an update was applied.
"""
try:
latest = self.registry.get_latest_version(
self.model_name, self.device_profile
)
latest_version = latest["version"]

if latest_version == self.current_version:
logger.debug(f"Already on latest version: {latest_version}")
return False

logger.info(
f"Update available: {self.current_version} -> {latest_version}"
)

# Download to temporary path
temp_path = str(self.model_dir / f"{self.model_name}_{latest_version}.tmp")
current_path = str(self.model_dir / f"{self.model_name}.engine")
backup_path = str(self.model_dir / f"{self.model_name}.backup")

if not self._download_model(latest["url"], temp_path):
return False

if not self._validate_model(temp_path, latest["checksum"]):
os.remove(temp_path)
return False

if not self._atomic_swap(temp_path, current_path, backup_path):
return False

self.current_version = latest_version

# Notify inference system to reload model
if self.on_update_callback:
self.on_update_callback(current_path, latest_version)

logger.info(f"Successfully updated to version {latest_version}")
return True

except Exception as e:
logger.error(f"Update check failed: {e}")
return False

def start(self):
"""Start background update checker."""
self._running = True
self._update_thread = threading.Thread(
target=self._update_loop, daemon=True
)
self._update_thread.start()
logger.info(f"OTA updater started, check interval: {self.check_interval}s")

def _update_loop(self):
while self._running:
self.check_and_update()
time.sleep(self.check_interval)

def stop(self):
self._running = False
if self._update_thread:
self._update_thread.join(timeout=5.0)

5. Edge Fleet Monitoring with Prometheus

"""
Fleet monitoring for industrial edge device fleets.

Exports Prometheus metrics from each edge device.
Prometheus server scrapes all devices.
Grafana visualizes fleet health.

This is how you manage 100+ edge devices at scale.
"""
from prometheus_client import start_http_server, Gauge, Counter, Histogram
import time
import threading
import psutil
import os


class EdgeDeviceMetrics:
"""
Prometheus metrics exported by each edge device.
Scraped by central Prometheus server every 15 seconds.
"""

def __init__(
self,
device_id: str,
asset_id: str,
factory_id: str,
port: int = 8000
):
# Labels for all metrics
labels = {
"device_id": device_id,
"asset_id": asset_id,
"factory_id": factory_id
}
label_names = list(labels.keys())

# Model inference metrics
self.inference_latency = Histogram(
"edge_inference_latency_ms",
"Inference latency in milliseconds",
label_names,
buckets=[5, 10, 25, 50, 100, 200, 500, 1000]
)
self.inference_count = Counter(
"edge_inference_total",
"Total number of inferences",
label_names
)
self.alerts_generated = Counter(
"edge_anomaly_alerts_total",
"Total anomaly alerts generated",
label_names + ["severity"]
)
self.current_anomaly_score = Gauge(
"edge_anomaly_score",
"Current anomaly score (0-1)",
label_names
)

# System metrics
self.cpu_usage = Gauge(
"edge_cpu_usage_percent",
"CPU usage percentage",
label_names
)
self.memory_usage = Gauge(
"edge_memory_usage_percent",
"Memory usage percentage",
label_names
)
self.gpu_usage = Gauge(
"edge_gpu_usage_percent",
"GPU usage percentage",
label_names
)
self.device_temperature = Gauge(
"edge_device_temperature_c",
"Device temperature in Celsius",
label_names
)
self.model_version = Gauge(
"edge_model_version",
"Deployed model version (numeric)",
label_names + ["version_string"]
)

# Store labels for use in metric updates
self.labels = labels
self.label_names = label_names

# Start Prometheus HTTP server
start_http_server(port)
print(f"Prometheus metrics server started on port {port}")

# Start background system metrics collection
self._start_system_metrics_loop()

def _get_label_values(self, **extra_labels) -> list:
"""Return label values in the correct order."""
values = [self.labels[k] for k in self.label_names]
return values

def record_inference(self, latency_ms: float, anomaly_score: float, is_alert: bool, severity: str = "LOW"):
"""Record an inference event with all associated metrics."""
label_vals = self._get_label_values()
self.inference_latency.labels(*label_vals).observe(latency_ms)
self.inference_count.labels(*label_vals).inc()
self.current_anomaly_score.labels(*label_vals).set(anomaly_score)
if is_alert:
self.alerts_generated.labels(*label_vals, severity).inc()

def set_model_version(self, version_string: str):
"""Record the currently deployed model version."""
label_vals = self._get_label_values()
# Extract numeric part for alerting on stale versions
try:
version_numeric = float(version_string.replace("v", "").replace("-", ".").split(".")[0])
except ValueError:
version_numeric = 0
self.model_version.labels(*label_vals, version_string).set(version_numeric)

def _collect_system_metrics(self):
"""Collect system metrics periodically."""
label_vals = self._get_label_values()
self.cpu_usage.labels(*label_vals).set(psutil.cpu_percent(interval=None))
self.memory_usage.labels(*label_vals).set(psutil.virtual_memory().percent)

# GPU metrics (Jetson)
try:
with open("/sys/devices/gpu.0/load", "r") as f:
gpu_load = int(f.read().strip()) / 10.0 # Jetson format
self.gpu_usage.labels(*label_vals).set(gpu_load)
except FileNotFoundError:
pass

# Device temperature
try:
temps = psutil.sensors_temperatures()
if "coretemp" in temps:
avg_temp = sum(t.current for t in temps["coretemp"]) / len(temps["coretemp"])
self.device_temperature.labels(*label_vals).set(avg_temp)
except (AttributeError, Exception):
pass

def _start_system_metrics_loop(self):
def loop():
while True:
try:
self._collect_system_metrics()
except Exception:
pass
time.sleep(15)
threading.Thread(target=loop, daemon=True).start()

System Architecture


Production Engineering Notes

Model Optimization Reality Check

INT8 quantization nearly always reduces latency by 2-4x compared to FP32 on NVIDIA hardware, with 0.5-2% accuracy loss. These are good numbers for industrial inspection. However, not all models quantize cleanly. Architectures with many element-wise operations, layer normalizations before every layer, or operations that do not have INT8 kernels may see larger accuracy drops. Always measure accuracy on your validation set after quantization before deploying.

A practical quantization checklist: (1) Start with FP16 - most of the latency benefit of INT8 with near-zero accuracy cost. (2) Move to INT8 if FP16 is still too slow. (3) Use TensorRT's automatic INT8 calibration with 500-1000 representative real images. (4) Measure accuracy on your full validation set after quantization. (5) If accuracy drop exceeds 1.5%, try QAT (Quantization-Aware Training) instead of PTQ.

Fleet Management at Scale

When you have 50+ edge devices in a single factory, manual management becomes untenable. You need automation for: model deployment (OTA as described above), configuration management (what threshold is each device using?), health monitoring (which devices are underperforming?), log collection (aggregating inference logs for model improvement), and certificate rotation (TLS certificates expire).

The recommended stack: Prometheus + Grafana for metrics (both open source, well-documented, support Kubernetes at cloud scale), MQTT for real-time telemetry (lightweight, already needed for sensor data), Ansible for configuration management (push configuration to edge devices), and a model registry (MLflow, DVC, or a custom S3-backed registry) for version management. This stack scales from a 10-device pilot to a 500-device factory deployment without architectural changes.

Security for Industrial Edge

Industrial edge devices live in the OT (Operational Technology) network, which should be air-gapped or strictly firewalled from the internet. MQTT connections from edge devices to the cloud should use TLS with client certificate authentication (not just username/password). Each edge device gets its own certificate, issued by your PKI. Certificate rotation should be automated - manually rotating 500 certificates is error-prone.

Model files transmitted via OTA must be checksummed and the checksum signed by your code signing key. The edge device verifies the signature before accepting the model. This prevents a compromised MQTT broker from pushing a malicious model to your fleet.


:::warning Thermal Throttling on Jetson Devices NVIDIA Jetson devices throttle CPU and GPU performance when they exceed thermal limits. An unpowered Jetson Nano running at full GPU load in a 40C factory ambient will thermal-throttle within minutes, increasing inference latency 2-3x. Always spec the power mode (Jetson has multiple power modes from 5W to 20W), ensure adequate airflow or active cooling for your ambient temperature, and monitor device temperature in production. A model that meets your 100ms latency target at 25C ambient may fail at 45C ambient if the device thermally throttles. :::

:::danger Stale Models in Long-Running Deployments An edge device that has been running without updates for 6 months may be running a model trained on data from a fundamentally different production state: different raw material supplier, different maintenance history, different product mix. The model's "normal" may no longer match the current production "normal," causing both false positives (new normal flagged as anomaly) and false negatives (new degradation patterns not recognized). Implement a maximum model age policy: if an edge device has not received a model update in N weeks, escalate to the fleet management team. Force model updates even if the version has not changed - recalibration of the same architecture on fresh data may be needed. :::


Interview Questions and Answers

Q1: When should you deploy ML inference at the edge vs in the cloud?

The decision framework has four dimensions. Latency: if the decision must be made in under 500ms, edge is required (cloud round-trip is typically 50-500ms on a good day, unbounded on a bad one). Data volume: if you are processing high-frequency sensor data or video streams, the bandwidth cost of sending raw data to the cloud is prohibitive - process at the edge and send only results. Reliability: if the application must work during network outages, edge is required. Data sovereignty: if the data cannot leave the facility due to contractual or regulatory requirements, edge is required. For manufacturing AI specifically: visual inspection, vibration anomaly detection, safety interlocks all belong at the edge. Model training, fleet analytics, and complex offline analysis belong in the cloud. The edge-cloud split should be thought of as: edge handles real-time decisions with guaranteed availability, cloud handles everything that can tolerate latency and network dependency.

Q2: What is the TensorRT optimization pipeline and why does it matter for edge inference?

TensorRT is NVIDIA's inference optimization library. Given a trained model (exported to ONNX), TensorRT applies several optimizations: kernel fusion (merging adjacent operations like convolution + batch norm + activation into a single CUDA kernel), memory optimization (scheduling when buffers are allocated and freed to minimize GPU memory usage), precision calibration (INT8 quantization with calibration dataset), and layer-specific algorithm selection (choosing the fastest CUDA kernel for each operation given the specific hardware). The result is typically 2-5x faster inference than running the original PyTorch model through the standard CUDA backend. This matters at the edge because the hardware is constrained: a Jetson Xavier NX has far less GPU compute than a data center GPU. TensorRT is what makes it possible to run a ResNet-50 at 30fps on a $500 edge device.

Q3: How does INT8 quantization work and when does it hurt accuracy?

INT8 quantization maps the floating-point range of each layer's activations to the 256 integer values [-128, 127]. The mapping is calibrated using a representative dataset: you run normal data through the model and observe the actual range of each activation. Given the range [min, max], you compute a scale factor s=(maxmin)/255s = (max - min) / 255 and quantize as xint8=round(x/s)x_{int8} = round(x / s). At inference, each INT8 multiply-accumulate is done in INT32 to prevent overflow, then requantized back to INT8 before the next layer. Accuracy degradation is minimal (under 1%) for most classification and detection models because the important information is in the relative magnitudes of activations, which is preserved even at INT8 precision. Models that tend to degrade more: (1) Transformers with softmax attention, because softmax produces very small values that are quantized to near-zero in INT8. (2) Models with very wide dynamic range in activations (some NLP models). (3) Regression models where the exact scale of the output matters (as opposed to classification where you just need the argmax). For industrial inspection models (classification and anomaly detection), INT8 is almost universally safe to use.

Q4: How do you handle model rollback when an OTA update causes degraded performance?

The OTA system should always maintain the previous model version as a backup. The rollback trigger can be: automated (if the model's validation metrics drop below a threshold after update, auto-rollback) or manual (operator sees increased false positive rate in the monitoring dashboard and initiates rollback through the fleet management UI). For automated rollback, you need the edge device to compute a quick performance check after loading the new model - run a set of reference images with known labels that are stored locally, measure accuracy. If accuracy drops below the threshold set during deployment, automatically revert to the backup model and send an alert to the ML team. The atomic swap pattern (rename rather than copy) ensures that the rollback can be done instantly by renaming the backup file. Never delete the backup until the new model has been running successfully for a minimum period (e.g., 24 hours).

Q5: How do you benchmark and monitor inference latency in production on edge devices?

Benchmarking before deployment: use a representative batch of inputs (same size and type as production data), warm up the model with 10-20 inferences (CUDA initialization and TensorRT context creation add latency to cold starts), then measure 100+ inferences and report P50, P95, P99, and P99.9 latency. The P95/P99 values matter more than the mean for real-time applications - you need to know the worst-case latency, not just the average. In production monitoring: instrument the inference loop with a Prometheus Histogram metric (as shown in the code above), which captures the full latency distribution. Set Grafana alerts for P95 latency exceeding 80% of your budget (e.g., if budget is 100ms, alert at 80ms P95). Common causes of latency spikes in production: thermal throttling (device temperature exceeds limit, GPU clock reduces), memory pressure (another process is using GPU memory), or model update artifacts (first inference after OTA update is slower due to TensorRT engine rebuild). The Prometheus metrics differentiate these because CPU usage, GPU usage, and device temperature are all reported alongside latency.


Key Takeaways

Edge AI in manufacturing is driven by latency, bandwidth, reliability, and data sovereignty requirements that cloud-only architectures cannot satisfy. The optimization stack - TensorRT INT8 for NVIDIA hardware, OpenVINO for Intel hardware, ONNX Runtime as the universal fallback - takes trained models and makes them fast enough for real-time industrial inference on constrained hardware. MQTT provides the lightweight, reliable edge-cloud communication layer. Fleet management at scale requires OTA model updates, Prometheus-based health monitoring, and automated anomaly detection on the edge devices themselves. The operational challenges - thermal management, security, model staleness - are as important as the ML challenges. A well-optimized model running on a thermally throttled Jetson with a 6-month-old training dataset is a production failure.

© 2026 EngineersOfAI. All rights reserved.