:::tip 🎮 Interactive Playground Visualize this concept: Try the Sync vs Async Inference demo on the EngineersOfAI Playground - no code required. :::
Synchronous vs Asynchronous Inference
The Production Scenario
Your content moderation model takes 800ms to process a video thumbnail. Your product team wants every uploaded image reviewed before it goes live. At 500 uploads per minute, that's 500 requests per minute each holding a connection open for 800ms. Your serving infrastructure handles it fine - until upload volume spikes 10x during a product launch and you have 5,000 concurrent long-running connections. Your load balancer starts dropping connections. Your error rate spikes. Users get upload failures. The content moderation team is confused because their model service shows fine utilization - the problem is the 5,000 threads all sitting idle, waiting 800ms each for a result that most callers don't need immediately anyway.
The engineer who built this system made a natural but costly assumption: every caller needs the result before they can proceed. In reality, the upload service only needs to know that the job has been accepted. The actual moderation result can arrive 2 seconds later via a webhook callback. The user doesn't even see the image until after moderation completes - they just see a "processing" spinner. The bottleneck was not the model. It was the assumption that inference must be synchronous.
This is the core insight that separates synchronous and asynchronous inference. Synchronous inference means: the caller blocks and waits. Asynchronous inference means: the caller submits a job and continues, and the result arrives separately. The right choice depends on whether the caller actually needs the result before proceeding - and in many ML use cases, they do not.
Understanding when to use each pattern, and how to build each correctly, is one of the most practically important skills in ML systems engineering. It affects your infrastructure cost, your system's maximum throughput, and how you handle model latency when it spikes or degrades.
Why This Exists - The Two Failure Modes
Synchronous inference without a queue fails under load spikes. Every concurrent request holds a thread (or async coroutine) and a connection. At high concurrency, you exhaust thread pools, connection pools, or file descriptors. The system does not degrade gracefully - it falls off a cliff.
Asynchronous inference without careful design fails on latency visibility. Callers submit jobs and hope for results. Without timeouts, dead-letter queues, and status polling, jobs silently disappear. Users wait indefinitely. Operations teams cannot tell if the system is working.
Both patterns have failure modes. The skill is knowing which failure mode you can tolerate, and building the safeguards for the one you choose.
Historical Context
Synchronous request-response has been the default since the earliest HTTP servers. A client sends a request, the server processes it, the client waits for the response. Simple, predictable, easy to reason about. This works for operations that complete in under 100ms - the latency threshold beyond which users notice waiting.
Asynchronous job queues have existed since the 1970s in batch computing systems. The modern pattern for web services emerged from early message queue systems like ActiveMQ (2004) and then RabbitMQ (2007). Celery, the Python distributed task queue, popularized the async worker pattern for web backends around 2009. Kafka (2011) brought the concept of a durable, high-throughput distributed log that could decouple producers from consumers at massive scale.
The application of these patterns to ML inference became necessary as model latencies grew. A logistic regression model runs in microseconds - synchronous is fine. A BERT-based classifier runs in 50ms - synchronous is acceptable. A large vision transformer runs in 800ms - synchronous starts to hurt. A large language model generating 500 tokens runs in 30 seconds - synchronous is simply wrong.
Core Concepts
Synchronous Inference: When Every Millisecond Is Visible
In synchronous inference, the caller sends a request and blocks until the model returns a prediction:
Client → [request] → Model Service → [inference] → [response] → Client
blocks here ^ unblocks here
This pattern works when:
- Latency is low (under 200ms) and predictable
- The caller cannot proceed without the result (e.g., fraud check before transaction approval)
- The use case is user-facing and latency is directly visible to the user
- Load is predictable and well within serving capacity
It breaks down when:
- Model latency is high or variable (GPU queue backup, cold starts)
- Traffic spikes unpredictably (viral content upload, flash sales)
- The result is not needed immediately (email classification, content tagging)
- You need to process more requests than serving capacity allows at peak
Asynchronous Inference: Decoupling Request Acceptance from Processing
In asynchronous inference, the caller submits a job to a queue and immediately receives a job ID. The model service processes the queue. The caller retrieves results via polling or callback:
Client → [submit job] → Queue → [job ID] → Client continues
↓
Worker Pool
↓
Model Service
↓
Result Store
↓
Client polls / receives callback
This pattern provides natural buffering. The queue absorbs traffic spikes. Workers process at their own rate. If traffic spikes 10x, the queue grows - jobs take longer but nothing fails. If you add more workers, throughput increases without any changes to the client.
Latency Profile Comparison
| Aspect | Synchronous | Asynchronous |
|---|---|---|
| Response time (client) | Model latency | Near-zero (job accepted) |
| Result availability | Immediately | After processing (seconds to minutes) |
| Peak throughput | Limited by concurrent connections | Limited only by queue depth + worker throughput |
| Failure visibility | Immediate (connection error or timeout) | Delayed (job stays in queue or goes to DLQ) |
| Infrastructure cost at peak | Scales with peak load | Scales with average load |
| Code complexity | Low | High (job tracking, polling/callback, DLQ) |
Implementation: Synchronous Inference with FastAPI
# sync_inference_server.py
import asyncio
import time
from typing import Optional
import numpy as np
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
app = FastAPI()
class HeavyModel:
"""Simulates a slow model - e.g., a large vision transformer."""
def __init__(self, inference_time_ms: float = 800):
self.inference_time_ms = inference_time_ms
async def predict_async(self, features: np.ndarray) -> dict:
# Non-blocking sleep simulates GPU work
await asyncio.sleep(self.inference_time_ms / 1000)
return {
"label": "safe",
"confidence": float(np.random.uniform(0.7, 1.0)),
}
model = HeavyModel(inference_time_ms=800)
class InferenceRequest(BaseModel):
image_url: str
features: list[float]
class InferenceResponse(BaseModel):
label: str
confidence: float
latency_ms: float
@app.post("/predict", response_model=InferenceResponse)
async def predict(request: InferenceRequest):
"""Synchronous inference - caller blocks until model completes."""
start = time.perf_counter()
features = np.array(request.features, dtype=np.float32)
# With 5000 concurrent requests, all 5000 coroutines are waiting here.
# Unlike threads, async coroutines are cheap - but the downstream
# GPU is still the bottleneck. Requests queue in the event loop.
result = await model.predict_async(features)
latency_ms = (time.perf_counter() - start) * 1000
return InferenceResponse(
label=result["label"],
confidence=result["confidence"],
latency_ms=latency_ms,
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)
Implementation: Asynchronous Inference with Redis Queue
# async_inference_server.py
import asyncio
import json
import time
import uuid
from enum import Enum
from typing import Optional
import numpy as np
import redis.asyncio as aioredis
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
# Redis connection - shared across all requests
redis_client: aioredis.Redis = None
QUEUE_KEY = "inference:queue"
RESULT_PREFIX = "inference:result:"
RESULT_TTL_SECONDS = 300 # Results expire after 5 minutes
class JobStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class SubmitRequest(BaseModel):
image_url: str
features: list[float]
callback_url: Optional[str] = None # For push notification on completion
class SubmitResponse(BaseModel):
job_id: str
status: JobStatus
estimated_wait_seconds: float
class JobResult(BaseModel):
job_id: str
status: JobStatus
label: Optional[str] = None
confidence: Optional[float] = None
error: Optional[str] = None
created_at: float
completed_at: Optional[float] = None
@app.on_event("startup")
async def startup():
global redis_client
redis_client = aioredis.from_url(
"redis://localhost:6379",
encoding="utf-8",
decode_responses=True,
)
@app.post("/predict/submit", response_model=SubmitResponse)
async def submit_inference_job(request: SubmitRequest):
"""Accept job immediately, return job ID. Does NOT wait for model."""
job_id = str(uuid.uuid4())
job_payload = {
"job_id": job_id,
"image_url": request.image_url,
"features": request.features,
"callback_url": request.callback_url,
"created_at": time.time(),
"status": JobStatus.PENDING,
}
# Store initial job status in Redis
await redis_client.setex(
f"{RESULT_PREFIX}{job_id}",
RESULT_TTL_SECONDS,
json.dumps(job_payload),
)
# Push job to queue
await redis_client.rpush(QUEUE_KEY, json.dumps(job_payload))
# Estimate wait time based on current queue depth
queue_depth = await redis_client.llen(QUEUE_KEY)
# Assume 800ms per job, single worker
estimated_wait = queue_depth * 0.8
return SubmitResponse(
job_id=job_id,
status=JobStatus.PENDING,
estimated_wait_seconds=estimated_wait,
)
@app.get("/predict/result/{job_id}", response_model=JobResult)
async def get_inference_result(job_id: str):
"""Poll for result. Returns current status if still processing."""
result_json = await redis_client.get(f"{RESULT_PREFIX}{job_id}")
if result_json is None:
raise HTTPException(
status_code=404,
detail=f"Job {job_id} not found - may have expired or never existed",
)
data = json.loads(result_json)
return JobResult(**data)
# async_inference_worker.py - runs as a separate process
import asyncio
import json
import time
import numpy as np
import httpx
import redis.asyncio as aioredis
from typing import Optional
QUEUE_KEY = "inference:queue"
RESULT_PREFIX = "inference:result:"
DLQ_KEY = "inference:dlq" # Dead letter queue for failed jobs
RESULT_TTL_SECONDS = 300
MAX_RETRIES = 3
class ModelWorker:
"""The actual model inference worker."""
def __init__(self):
self.redis: Optional[aioredis.Redis] = None
self.http_client: Optional[httpx.AsyncClient] = None
async def setup(self):
self.redis = aioredis.from_url(
"redis://localhost:6379",
encoding="utf-8",
decode_responses=True,
)
self.http_client = httpx.AsyncClient(timeout=5.0)
async def run_inference(self, features: np.ndarray) -> dict:
"""Simulate GPU inference."""
await asyncio.sleep(0.8) # 800ms inference
return {
"label": "safe",
"confidence": float(np.random.uniform(0.7, 1.0)),
}
async def process_job(self, job_payload: dict):
job_id = job_payload["job_id"]
retries = job_payload.get("retries", 0)
# Mark as processing
job_payload["status"] = "processing"
await self.redis.setex(
f"{RESULT_PREFIX}{job_id}",
RESULT_TTL_SECONDS,
json.dumps(job_payload),
)
try:
features = np.array(job_payload["features"], dtype=np.float32)
result = await self.run_inference(features)
# Mark as completed
job_payload["status"] = "completed"
job_payload["label"] = result["label"]
job_payload["confidence"] = result["confidence"]
job_payload["completed_at"] = time.time()
await self.redis.setex(
f"{RESULT_PREFIX}{job_id}",
RESULT_TTL_SECONDS,
json.dumps(job_payload),
)
# Fire callback if provided
if job_payload.get("callback_url"):
await self._fire_callback(
job_payload["callback_url"], job_payload
)
except Exception as e:
if retries < MAX_RETRIES:
# Re-queue with incremented retry count
job_payload["retries"] = retries + 1
await self.redis.rpush(QUEUE_KEY, json.dumps(job_payload))
print(f"Job {job_id} failed, retry {retries + 1}/{MAX_RETRIES}")
else:
# Move to dead letter queue
job_payload["status"] = "failed"
job_payload["error"] = str(e)
await self.redis.rpush(DLQ_KEY, json.dumps(job_payload))
await self.redis.setex(
f"{RESULT_PREFIX}{job_id}",
RESULT_TTL_SECONDS,
json.dumps(job_payload),
)
print(f"Job {job_id} exhausted retries, moved to DLQ")
async def _fire_callback(self, callback_url: str, payload: dict):
"""Notify caller via webhook when job completes."""
try:
await self.http_client.post(callback_url, json=payload, timeout=5.0)
except Exception as e:
print(f"Callback failed for {payload['job_id']}: {e}")
async def run_forever(self):
"""Main worker loop - blocks on queue and processes jobs."""
await self.setup()
print("Worker started, listening on queue...")
while True:
# BLPOP blocks until a job is available (up to 1 second timeout)
result = await self.redis.blpop(QUEUE_KEY, timeout=1)
if result is None:
continue # Timeout, loop again
_, job_json = result
job_payload = json.loads(job_json)
await self.process_job(job_payload)
if __name__ == "__main__":
worker = ModelWorker()
asyncio.run(worker.run_forever())
Streaming Inference for LLMs
LLM token generation is a third pattern: the model produces output incrementally, and users want to see each token as it is generated rather than waiting for the complete response.
# streaming_inference_server.py
import asyncio
from typing import AsyncGenerator
import numpy as np
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
async def generate_tokens(
prompt: str,
max_tokens: int = 100,
) -> AsyncGenerator[str, None]:
"""Simulate LLM token generation with SSE format."""
words = ["The", "model", "generates", "tokens", "one", "at", "a", "time",
"which", "gives", "the", "impression", "of", "real-time", "typing"]
for i, word in enumerate(words[:max_tokens]):
await asyncio.sleep(0.05) # 50ms per token
# Server-Sent Events format
data = json.dumps({"token": word + " ", "index": i, "done": False})
yield f"data: {data}\n\n"
# Signal end of stream
final = json.dumps({"token": "", "index": max_tokens, "done": True})
yield f"data: {final}\n\n"
@app.post("/generate/stream")
async def stream_generation(prompt: str, max_tokens: int = 100):
"""Stream tokens as Server-Sent Events.
Client JavaScript:
const es = new EventSource('/generate/stream?prompt=...');
es.onmessage = (e) => {
const data = JSON.parse(e.data);
document.getElementById('output').innerHTML += data.token;
if (data.done) es.close();
};
"""
return StreamingResponse(
generate_tokens(prompt, max_tokens),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disable Nginx buffering
},
)
@app.post("/generate/complete")
async def complete_generation(prompt: str, max_tokens: int = 100):
"""Non-streaming: wait for all tokens, return complete response.
Only use this for short completions or server-to-server calls."""
tokens = []
async for sse_event in generate_tokens(prompt, max_tokens):
if sse_event.startswith("data: "):
data = json.loads(sse_event[6:])
tokens.append(data["token"])
return {"text": "".join(tokens)}
Architecture: The Async Inference Pipeline
Kafka for High-Throughput Async Inference
Redis queue works well at moderate scale. For very high throughput or when you need replay, durability, and multiple consumers, Kafka is the right choice:
# kafka_inference_producer.py
import json
import uuid
import time
from confluent_kafka import Producer
producer = Producer({"bootstrap.servers": "localhost:9092"})
def submit_inference_job(features: list[float], image_url: str) -> str:
job_id = str(uuid.uuid4())
job = {
"job_id": job_id,
"image_url": image_url,
"features": features,
"submitted_at": time.time(),
}
producer.produce(
topic="inference-jobs",
key=job_id.encode(),
value=json.dumps(job).encode(),
)
producer.flush()
return job_id
# kafka_inference_consumer.py
import json
import numpy as np
import time
from confluent_kafka import Consumer, KafkaException
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "inference-workers",
"auto.offset.reset": "earliest",
# Disable auto-commit: commit only after successful processing
"enable.auto.commit": False,
})
consumer.subscribe(["inference-jobs"])
result_producer = Producer({"bootstrap.servers": "localhost:9092"})
def run_inference(features: list[float]) -> dict:
time.sleep(0.8) # Simulate model
return {"label": "safe", "confidence": 0.92}
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
job = json.loads(msg.value().decode())
features = job["features"]
result = run_inference(features)
# Publish result to results topic
result_msg = {
"job_id": job["job_id"],
"label": result["label"],
"confidence": result["confidence"],
"completed_at": time.time(),
}
result_producer.produce(
topic="inference-results",
key=job["job_id"].encode(),
value=json.dumps(result_msg).encode(),
)
result_producer.flush()
# Commit only after result is published (at-least-once semantics)
consumer.commit(msg)
except KeyboardInterrupt:
pass
finally:
consumer.close()
How OpenAI's Streaming API Works
OpenAI's streaming API is a good example of production-quality SSE streaming. When you call the API with stream=True, each token arrives as a separate data: event:
import openai
import sys
client = openai.OpenAI()
# Streaming: tokens arrive as they are generated
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Explain batch inference in 3 sentences."}],
stream=True,
)
# Each chunk is one or a few tokens
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)
print() # Final newline
The server-side implementation behind this uses a generator that yields tokens from the model and formats them as SSE events. The key infrastructure detail is that Nginx or the CDN layer must be configured to disable response buffering for SSE endpoints - otherwise tokens accumulate in a buffer and are delivered in bursts rather than one at a time.
When to Use Each Pattern
Synchronous: Fraud scoring (must complete before transaction proceeds), real-time search ranking (user is waiting for results), autocomplete (user is typing and expects instant feedback).
Asynchronous: Content moderation (user doesn't see content immediately anyway), document processing (OCR, summarization), recommendation pre-computation, A/B test offline evaluation, batch model evaluation.
Streaming: LLM generation (users expect to see tokens as they are generated), long document summarization (show progress), real-time transcription.
Production Engineering Notes
Set job TTLs: Uncompleted jobs should expire from your result store. Without TTLs, your Redis grows without bound.
Dead letter queues are mandatory: Any job that fails more than N times should be moved to a DLQ, not silently discarded. You need visibility into why jobs are failing and the ability to replay them after fixing the bug.
Monitor queue depth: Queue depth is the primary health metric for async systems. Alert when it grows faster than workers can drain it - this means you need more workers or the model is slower than expected.
Idempotency: If a worker processes a job and crashes before committing the offset (Kafka) or deleting the message (SQS), the job will be processed again. Design your result writing to be idempotent: writing the same result twice should have no adverse effects.
:::warning The Polling Anti-Pattern
Having the client poll /result/{job_id} every 100ms is wasteful. The right interval for most async ML jobs is 1-2 seconds. For latency-sensitive cases, use WebSockets or SSE to push the result to the client when it is ready. Avoid tight polling loops - they increase load on your Redis and API layer with zero benefit to the user experience.
:::
:::danger Silent Job Loss
The most dangerous async bug: a job is removed from the queue before the worker confirms it has been successfully processed. In Redis, using LPOP instead of a BRPOPLPUSH pattern means: if the worker crashes after popping the job but before writing the result, the job is gone forever. Use BRPOPLPUSH to move jobs to a "processing" list atomically, and only delete from the processing list after the result is written.
:::
:::danger Unbounded Queue Growth If your workers are slower than your producers, your queue grows without bound. This is a production emergency - queue depth grows, job latency increases, memory usage grows. Implement backpressure: return HTTP 429 or delay acceptance when the queue exceeds a threshold. Better to reject jobs early with a clear error than to quietly accept them and have them wait hours for processing. :::
Interview Q&A
Q: When would you choose asynchronous inference over synchronous, even if the model is fast?
Even with fast models, choose asynchronous when: the result is not needed immediately by the caller (email classification, content tagging), you want to decouple your traffic spikes from model capacity (absorb 10x bursts via queue), you need to batch requests for GPU efficiency (accumulate 32 requests, run one batched forward pass), or you want to retry failures transparently without the caller needing to implement retry logic. Asynchronous decoupling provides natural buffering that synchronous systems cannot offer.
Q: How do you handle timeouts in a synchronous ML inference endpoint?
Set a tight server-side timeout at the model call level and a broader client-side timeout. If the model exceeds the server timeout, return an HTTP 504 with a Retry-After header. The client should have circuit breaker logic: after N consecutive timeouts, stop sending requests for a brief period and serve a cached fallback or default prediction. Never let a slow model hold connections indefinitely - this cascades into upstream service failures as connection pools exhaust.
Q: What is the difference between Server-Sent Events and WebSockets for streaming ML responses?
SSE is one-directional: server pushes events to the client over HTTP. It is simpler, works through HTTP proxies and CDNs, and reconnects automatically on disconnection. WebSockets are bidirectional: both sides can send messages at any time. For LLM streaming where the client sends one request and the server streams back tokens, SSE is the right choice - simpler, proxy-friendly, and sufficient. Use WebSockets when you need bidirectional streaming, such as a voice assistant where audio flows both ways simultaneously.
Q: How do you ensure exactly-once processing in an async inference pipeline?
Exactly-once is very hard. Most production systems settle for at-least-once (jobs may be processed more than once) combined with idempotent result writing. The pattern: (1) Use Kafka with manual offset commit - commit only after the result is written to the result store. (2) Use the job ID as a result store key and write atomically. (3) Check if the result already exists before processing - if yes, skip inference and re-publish the existing result. This gives you at-least-once delivery with idempotent outcomes, which is effectively once from the caller's perspective.
Q: Describe the architecture of an async inference system that can handle a 10x traffic spike without dropping jobs.
The core is a queue (Kafka or SQS) with sufficient retention to hold the spike volume, a worker pool that can be auto-scaled based on queue depth, and backpressure to reject submissions when the queue exceeds a safe threshold. At 10x traffic, the queue grows, job latency increases proportionally, but no jobs are lost. Auto-scaling adds more workers - Kubernetes KEDA can scale deployments based on Kafka consumer lag. Once the spike subsides, the queue drains and workers scale back down. The result store (Redis) must be sized to hold all in-flight results: 10x traffic * average processing time * average result size. Monitor three metrics: queue depth, worker count, and job age (time from submission to completion).
