Skip to main content

Structured Generation in Production

Opening Scenario: Six Months Later

Six months after launching the invoice processing system from Lesson 1, the engineering team runs a quarterly review. The metrics tell the story:

Before structured generation tooling:

  • Parse failure rate: 7.8%
  • Average latency (with retries): 4.2s
  • Silent corruption incidents: 23 (discovered via downstream reports)
  • Lines of error-handling code: 847
  • Engineer-hours per week on parsing bugs: 4-6 hours

After (Instructor + OpenAI Structured Outputs + monitoring):

  • Parse failure rate: 0.3% (only genuine refusals, not structural errors)
  • Average latency: 1.8s (no retry overhead for structural issues)
  • Silent corruption incidents: 2 (both from semantic errors, not structural)
  • Lines of error-handling code: 156
  • Engineer-hours per week on parsing bugs: 0.5 hours

The improvement is real but not magic. The 0.3% failures are not structure failures - they are cases where the model genuinely cannot extract the required data (scanned documents with poor image quality, non-invoice documents uploaded by mistake). These are the right failures: the system reports them explicitly rather than silently producing wrong data.

Getting here required not just the right library choice but the right architecture: a reliability stack, schema versioning, monitoring, and clear handling of the inevitable edge cases. This lesson builds that architecture.

The Production Reliability Stack

A production structured generation system has five layers:

Complete Production Pipeline: Document Processing

Here is a complete, production-quality document extraction pipeline combining all the lessons:

import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Generic, List, Optional, TypeVar, Union

import instructor
from openai import AsyncOpenAI
from pydantic import BaseModel, Field, validator, ValidationError
from prometheus_client import Counter, Histogram

logger = logging.getLogger(__name__)

# ========== Metrics ==========
EXTRACTION_REQUESTS = Counter(
"extraction_requests_total",
"Total extraction requests",
["schema_name", "status"], # status: success, semantic_failure, refusal, error
)
EXTRACTION_LATENCY = Histogram(
"extraction_latency_seconds",
"Extraction latency distribution",
["schema_name"],
buckets=[0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0],
)
RETRY_COUNT = Counter(
"extraction_retries_total",
"Total retry attempts",
["schema_name"],
)


# ========== Schema Definitions ==========

class LineItem(BaseModel):
description: str = Field(min_length=1, max_length=200)
quantity: int = Field(ge=1, le=10000)
unit_price: float = Field(ge=0.0)
line_total: float = Field(ge=0.0)


class Invoice(BaseModel):
vendor_name: str = Field(min_length=1, max_length=200)
invoice_number: str = Field(pattern=r"[A-Z0-9\-_]{2,30}")
invoice_date: str = Field(pattern=r"\d{4}-\d{2}-\d{2}")
due_date: Optional[str] = Field(None, pattern=r"\d{4}-\d{2}-\d{2}")
line_items: List[LineItem] = Field(min_length=1, max_length=100)
subtotal: float = Field(ge=0.0)
tax_amount: float = Field(ge=0.0)
total: float = Field(ge=0.0)
currency: str = Field(default="USD", pattern=r"[A-Z]{3}")

@validator("total")
def total_is_reasonable(cls, v, values):
"""Total should be within 1% of subtotal + tax."""
if "subtotal" in values and "tax_amount" in values:
expected = values["subtotal"] + values["tax_amount"]
if expected > 0 and abs(v - expected) / expected > 0.01:
raise ValueError(
f"Total {v} should be approximately subtotal({values['subtotal']}) "
f"+ tax({values['tax_amount']}) = {expected:.2f}. "
f"Check your arithmetic and try again."
)
return v

@validator("invoice_date")
def date_is_not_future(cls, v):
"""Invoice date should not be more than 30 days in the future."""
try:
invoice_dt = datetime.strptime(v, "%Y-%m-%d")
if (invoice_dt - datetime.now()).days > 30:
raise ValueError(
f"Invoice date {v} is more than 30 days in the future. "
f"Double-check the date format (YYYY-MM-DD)."
)
except ValueError as e:
if "unconverted" in str(e) or "does not match" in str(e):
raise ValueError(f"Invalid date format. Use YYYY-MM-DD, got: {v}")
raise
return v


# ========== Failure Classification ==========

class FailureType(str, Enum):
STRUCTURAL = "structural" # JSON parse error, schema structure
SEMANTIC = "semantic" # Valid structure, invalid business logic
REFUSAL = "refusal" # Model refused to extract
TIMEOUT = "timeout" # API timeout
UNKNOWN = "unknown"


@dataclass
class ExtractionFailure:
failure_type: FailureType
error_message: str
document_preview: str
attempt_count: int
last_model_response: Optional[str] = None

def is_retryable(self) -> bool:
"""Semantic failures may be retryable with better prompting."""
return self.failure_type in (FailureType.SEMANTIC, FailureType.STRUCTURAL)

def should_route_to_human(self) -> bool:
"""Some failures require human review."""
return self.failure_type in (FailureType.REFUSAL, FailureType.UNKNOWN)


T = TypeVar("T", bound=BaseModel)


@dataclass
class ExtractionResult(Generic[T]):
success: bool
data: Optional[T]
failure: Optional[ExtractionFailure]
latency_ms: float
attempts: int


# ========== Core Extraction Engine ==========

class StructuredExtractionEngine:
"""
Production extraction engine with full reliability stack.

Features:
- Instructor-based structured extraction with retry
- Prometheus metrics for observability
- Failure classification and routing
- Async batch processing with rate limiting
"""

def __init__(
self,
model: str = "gpt-4o-mini",
max_retries: int = 2,
timeout_seconds: float = 30.0,
max_concurrent: int = 20,
):
self.model = model
self.max_retries = max_retries
self.timeout = timeout_seconds
self.semaphore = asyncio.Semaphore(max_concurrent)

# Patch client with Instructor
self.client = instructor.from_openai(
AsyncOpenAI(),
mode=instructor.Mode.TOOLS,
)

async def extract(
self,
schema: type[T],
document: str,
system_prompt: Optional[str] = None,
) -> ExtractionResult[T]:
"""
Extract structured data from document.

Returns ExtractionResult - never raises exception.
All failures are encapsulated in ExtractionResult.failure.
"""
schema_name = schema.__name__
start_time = time.perf_counter()
attempts = 0

async with self.semaphore:
try:
result = await asyncio.wait_for(
self._extract_with_retry(schema, document, system_prompt),
timeout=self.timeout,
)
latency_ms = (time.perf_counter() - start_time) * 1000
EXTRACTION_REQUESTS.labels(schema_name=schema_name, status="success").inc()
EXTRACTION_LATENCY.labels(schema_name=schema_name).observe(latency_ms / 1000)
return ExtractionResult(
success=True,
data=result.data,
failure=None,
latency_ms=latency_ms,
attempts=result.attempts,
)

except asyncio.TimeoutError:
latency_ms = (time.perf_counter() - start_time) * 1000
EXTRACTION_REQUESTS.labels(schema_name=schema_name, status="error").inc()
return ExtractionResult(
success=False,
data=None,
failure=ExtractionFailure(
failure_type=FailureType.TIMEOUT,
error_message=f"Extraction timed out after {self.timeout}s",
document_preview=document[:200],
attempt_count=0,
),
latency_ms=latency_ms,
attempts=0,
)

except instructor.exceptions.InstructorRetryException as e:
latency_ms = (time.perf_counter() - start_time) * 1000
failure_type = self._classify_failure(str(e))
EXTRACTION_REQUESTS.labels(schema_name=schema_name, status=failure_type.value).inc()
return ExtractionResult(
success=False,
data=None,
failure=ExtractionFailure(
failure_type=failure_type,
error_message=str(e)[:500],
document_preview=document[:200],
attempt_count=e.n_attempts,
last_model_response=str(e.last_completion)[:500],
),
latency_ms=latency_ms,
attempts=e.n_attempts,
)

except Exception as e:
latency_ms = (time.perf_counter() - start_time) * 1000
EXTRACTION_REQUESTS.labels(schema_name=schema_name, status="error").inc()
logger.exception("Unexpected error in extraction for schema %s", schema_name)
return ExtractionResult(
success=False,
data=None,
failure=ExtractionFailure(
failure_type=FailureType.UNKNOWN,
error_message=f"Unexpected error: {type(e).__name__}: {e}",
document_preview=document[:200],
attempt_count=0,
),
latency_ms=latency_ms,
attempts=0,
)

async def _extract_with_retry(self, schema, document, system_prompt):
"""Inner extraction with Instructor retry logic."""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
else:
messages.append({
"role": "system",
"content": f"Extract structured {schema.__name__} data from documents. "
f"Be precise with numbers and dates.",
})
messages.append({"role": "user", "content": document})

attempts = [0] # mutable for closure

# Instructor's retry callback
def on_retry(attempt, error, _):
attempts[0] = attempt
RETRY_COUNT.labels(schema_name=schema.__name__).inc()
logger.warning(
"Retry %d for %s extraction. Error: %s",
attempt,
schema.__name__,
str(error)[:200],
)

result = await self.client.chat.completions.create(
model=self.model,
response_model=schema,
max_retries=self.max_retries,
messages=messages,
temperature=0,
)

class _Result:
data = result
attempts = attempts[0]

return _Result()

@staticmethod
def _classify_failure(error_message: str) -> FailureType:
"""Classify failure type from error message."""
error_lower = error_message.lower()
if "refus" in error_lower or "cannot" in error_lower:
return FailureType.REFUSAL
if "validation error" in error_lower or "field" in error_lower:
return FailureType.SEMANTIC
if "json" in error_lower or "parse" in error_lower:
return FailureType.STRUCTURAL
return FailureType.UNKNOWN

async def extract_batch(
self,
schema: type[T],
documents: List[str],
system_prompt: Optional[str] = None,
) -> List[ExtractionResult[T]]:
"""Process a batch of documents concurrently."""
tasks = [
self.extract(schema, doc, system_prompt)
for doc in documents
]
return await asyncio.gather(*tasks)

Schema Versioning: Evolving Without Breaking Pipelines

As your application evolves, your Pydantic schemas will change. Without a versioning strategy, schema changes break running pipelines:

from pydantic import BaseModel, Field
from typing import Optional, Literal
import hashlib
import json


class SchemaRegistry:
"""
Track schema versions and ensure backward-compatible evolution.
"""

def __init__(self):
self._schemas = {}

def register(self, schema: type[BaseModel], version: str) -> None:
"""Register a schema version."""
schema_hash = self._compute_schema_hash(schema)
self._schemas[f"{schema.__name__}:{version}"] = {
"schema": schema,
"hash": schema_hash,
"registered_at": datetime.now().isoformat(),
}

def get_schema_hash(self, schema: type[BaseModel]) -> str:
return self._compute_schema_hash(schema)

@staticmethod
def _compute_schema_hash(schema: type[BaseModel]) -> str:
"""Content-hash of the schema for change detection."""
schema_json = json.dumps(schema.model_json_schema(), sort_keys=True)
return hashlib.sha256(schema_json.encode()).hexdigest()[:12]


# Schema evolution pattern: additive changes only
class InvoiceV1(BaseModel):
vendor_name: str
total: float

class InvoiceV2(BaseModel):
vendor_name: str
total: float
# New field: optional with default, backward compatible
currency: str = "USD"

class InvoiceV3(BaseModel):
vendor_name: str
total: float
currency: str = "USD"
# New field: optional, backward compatible
payment_terms: Optional[str] = None


# Safe schema evolution rules:
# 1. ADD optional fields with defaults (backward compatible)
# 2. REMOVE fields only after verifying downstream code doesn't use them
# 3. CHANGE field types: always widen (str -> Optional[str]) not narrow
# 4. RENAME fields: add new name + keep old name as alias, then remove alias


class SafeInvoiceEvolution(BaseModel):
vendor_name: str
total: float
# Renamed from "total" to "total_amount" - keep alias for migration period
total_amount: Optional[float] = None # New name

@validator("total_amount", pre=True, always=True)
def backfill_total_amount(cls, v, values):
"""Backfill new field from old field during migration period."""
if v is None and "total" in values:
return values["total"]
return v

Monitoring and Alerting

import time
from collections import defaultdict, deque
from threading import Lock
from typing import Dict


class ExtractionMonitor:
"""
Real-time monitoring for extraction pipeline health.
Tracks failure rates, latency percentiles, and retry patterns.
"""

def __init__(self, window_seconds: int = 300):
self.window_seconds = window_seconds
self._lock = Lock()
# Each entry: (timestamp, latency_ms, success, failure_type)
self._events: Dict[str, deque] = defaultdict(deque)

def record(
self,
schema_name: str,
success: bool,
latency_ms: float,
failure_type: Optional[str] = None,
):
with self._lock:
self._events[schema_name].append({
"timestamp": time.time(),
"latency_ms": latency_ms,
"success": success,
"failure_type": failure_type,
})

def _get_recent_events(self, schema_name: str) -> list:
"""Get events within the monitoring window."""
cutoff = time.time() - self.window_seconds
events = self._events.get(schema_name, deque())
return [e for e in events if e["timestamp"] > cutoff]

def get_health_report(self, schema_name: str) -> dict:
"""Generate health report for a schema's extraction pipeline."""
events = self._get_recent_events(schema_name)

if not events:
return {"status": "no_data", "events": 0}

total = len(events)
successes = sum(1 for e in events if e["success"])
failures = total - successes

latencies = [e["latency_ms"] for e in events]
latencies.sort()

failure_breakdown = defaultdict(int)
for e in events:
if not e["success"] and e.get("failure_type"):
failure_breakdown[e["failure_type"]] += 1

health_status = "healthy"
failure_rate = failures / total
if failure_rate > 0.05:
health_status = "degraded"
if failure_rate > 0.15:
health_status = "critical"

return {
"status": health_status,
"window_seconds": self.window_seconds,
"total_requests": total,
"success_rate": successes / total,
"failure_rate": failure_rate,
"failure_breakdown": dict(failure_breakdown),
"latency_p50_ms": latencies[int(total * 0.5)],
"latency_p95_ms": latencies[int(total * 0.95)],
"latency_p99_ms": latencies[int(total * 0.99)] if total > 100 else None,
"alert": failure_rate > 0.05,
}

def check_alerts(self) -> List[dict]:
"""Return list of active alerts."""
alerts = []
for schema_name in self._events:
report = self.get_health_report(schema_name)
if report.get("alert"):
alerts.append({
"schema": schema_name,
"severity": "critical" if report["failure_rate"] > 0.15 else "warning",
"failure_rate": report["failure_rate"],
"message": (
f"Schema '{schema_name}' failure rate is "
f"{report['failure_rate']:.1%} "
f"(threshold: 5%)"
),
})
return alerts


# Integration example
monitor = ExtractionMonitor(window_seconds=300) # 5-minute window

async def extract_with_monitoring(engine, schema, document):
result = await engine.extract(schema, document)
monitor.record(
schema_name=schema.__name__,
success=result.success,
latency_ms=result.latency_ms,
failure_type=result.failure.failure_type.value if result.failure else None,
)
return result

Caching Structured Outputs

Caching is straightforward for deterministic extraction (temperature=0) but requires care:

import hashlib
import json
from functools import lru_cache
from typing import Optional, TypeVar, Type
import redis

T = TypeVar("T", bound=BaseModel)


class StructuredOutputCache:
"""
Redis-backed cache for structured extraction results.
Only caches successful extractions at temperature=0.
"""

def __init__(self, redis_client: redis.Redis, ttl_seconds: int = 3600):
self.redis = redis_client
self.ttl = ttl_seconds

def _cache_key(self, schema: type, document: str, model: str) -> str:
"""Create a deterministic cache key."""
content = f"{schema.__name__}:{model}:{document}"
return "extraction:" + hashlib.sha256(content.encode()).hexdigest()

def get(self, schema: type[T], document: str, model: str) -> Optional[T]:
"""Retrieve cached extraction if available."""
key = self._cache_key(schema, document, model)
cached = self.redis.get(key)
if cached:
data = json.loads(cached)
return schema(**data)
return None

def set(self, schema: type, document: str, model: str, result) -> None:
"""Cache a successful extraction result."""
key = self._cache_key(schema, document, model)
self.redis.setex(
key,
self.ttl,
json.dumps(result.model_dump()),
)

def is_cacheable(self, document: str) -> bool:
"""
Only cache if document is deterministic content.
Don't cache if document contains current timestamps, random IDs, etc.
"""
# Simple heuristic: skip very short documents (likely dynamic)
if len(document) < 50:
return False
# Skip documents mentioning "current time" or "now" (dynamic content)
dynamic_signals = ["current time", "right now", "today at", "just happened"]
doc_lower = document.lower()
if any(signal in doc_lower for signal in dynamic_signals):
return False
return True

Handling Edge Cases

The most common edge cases in production structured generation:

from typing import Optional
from pydantic import BaseModel, validator


# Edge Case 1: LLM refuses to extract from certain inputs
# Solution: explicit refusal detection + graceful default

class SafeExtraction(BaseModel):
"""Use this wrapper for inputs that might trigger refusals."""
was_extractable: bool
reason_not_extractable: Optional[str] = None
# Your actual fields, all optional:
vendor_name: Optional[str] = None
total: Optional[float] = None


# Edge Case 2: Partial extraction - some fields present, some missing
# Solution: make fields Optional, then validate completeness at a higher level

class PartialInvoice(BaseModel):
"""Accept partial extractions and track what's missing."""
vendor_name: Optional[str] = None
total: Optional[float] = None
invoice_date: Optional[str] = None

@property
def completeness_score(self) -> float:
"""What fraction of required fields are present?"""
required = ["vendor_name", "total", "invoice_date"]
present = sum(1 for f in required if getattr(self, f) is not None)
return present / len(required)

@property
def is_usable(self) -> bool:
"""Is this extraction complete enough to be useful?"""
return self.completeness_score >= 0.67 # At least 2/3 fields


# Edge Case 3: Model outputs values outside expected ranges
# Solution: Pydantic validators with model-friendly error messages

class BoundedExtraction(BaseModel):
confidence: float

@validator("confidence")
def clamp_confidence(cls, v):
"""Clamp confidence to [0, 1] rather than rejecting out-of-range values."""
if v < 0:
return 0.0
if v > 1:
return 1.0
return v


# Edge Case 4: Very long documents that exceed context window
# Solution: smart truncation with important-content preservation

def truncate_for_extraction(
document: str,
max_chars: int = 12000,
preserve_start_chars: int = 4000,
preserve_end_chars: int = 2000,
) -> str:
"""
Truncate a document while preserving the most extraction-relevant parts.

Invoices: key info is often at the start (header) and end (totals)
Legal docs: key terms in preamble and signature sections
Articles: title + first paragraphs + conclusion
"""
if len(document) <= max_chars:
return document

middle_chars = max_chars - preserve_start_chars - preserve_end_chars
if middle_chars < 0:
# If max_chars is too small, just take the start
return document[:max_chars]

start = document[:preserve_start_chars]
end = document[-preserve_end_chars:]
middle = document[preserve_start_chars:-preserve_end_chars]

# Sample from the middle (take evenly spaced segments)
if len(middle) > middle_chars:
step = len(middle) // middle_chars
middle = middle[::step][:middle_chars]

truncation_note = f"\n[... {len(document) - max_chars:,} characters truncated ...]\n"
return start + truncation_note + middle + truncation_note + end

Complete Reference Architecture

"""
Reference: Production-scale structured extraction service.
Combines all patterns from this module.
"""

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel as FastAPIBaseModel
import asyncio

app = FastAPI()

# Initialize once at startup
engine = StructuredExtractionEngine(
model="gpt-4o-mini",
max_retries=2,
timeout_seconds=30,
max_concurrent=20,
)
monitor = ExtractionMonitor(window_seconds=300)


class ExtractionRequest(FastAPIBaseModel):
document: str
schema_name: str # "Invoice", "Person", etc.


SCHEMA_MAP = {
"Invoice": Invoice,
# Add more schemas here
}


@app.post("/extract")
async def extract_endpoint(request: ExtractionRequest):
"""Structured extraction endpoint."""
schema = SCHEMA_MAP.get(request.schema_name)
if not schema:
raise HTTPException(status_code=400, detail=f"Unknown schema: {request.schema_name}")

# Truncate if needed
document = truncate_for_extraction(request.document)

# Extract
result = await extract_with_monitoring(engine, schema, document)

if result.success:
return {
"success": True,
"data": result.data.model_dump(),
"latency_ms": result.latency_ms,
"attempts": result.attempts,
}
else:
return {
"success": False,
"failure_type": result.failure.failure_type.value,
"error": result.failure.error_message,
"latency_ms": result.latency_ms,
}


@app.get("/health/{schema_name}")
async def health_endpoint(schema_name: str):
"""Pipeline health for a specific schema."""
return monitor.get_health_report(schema_name)


@app.get("/alerts")
async def alerts_endpoint():
"""Active pipeline alerts."""
return monitor.check_alerts()

Common Mistakes

:::danger Not Separating Structural and Semantic Failures in Monitoring A structural failure (invalid JSON, wrong field type) indicates a library or configuration problem. A semantic failure (valid structure, wrong content like negative prices) indicates a model understanding problem or data quality issue. A refusal (model explicitly says it can't extract) indicates an upstream data problem. These require entirely different responses. Logging all three as "extraction_error" hides which layer is failing and makes debugging much harder. Always tag failures with their type. :::

:::warning Caching Extractions Without Considering Prompt Version If you cache extraction results by document hash and later update your system prompt or Pydantic schema, you will serve stale cached results with the old behavior. Include the model name, system prompt hash, and schema hash in your cache key. This prevents stale cache hits after updates and ensures cache invalidation happens automatically when any component changes. :::

:::danger Silent Semantic Failures Are the Worst Outcome A model that returns {"total": -150.0} for an invoice has produced valid JSON conforming to a poorly-specified schema. If your schema allows negative totals and your code doesn't validate business rules, this silently enters your database. Silent data corruption is far more damaging than a loud failure. Always add semantic validators (cross-field consistency, business rule checks, range validation) in addition to structural validation. :::

:::warning Async Batching Without Rate Limiting Sending 1000 documents through asyncio.gather() without a semaphore will hit OpenAI's rate limits and generate 429 errors, causing your batch to fail partway through. Always use a semaphore to limit concurrent requests to a safe level (typically 20-50 for GPT-4o-mini tier 2 limits). Use exponential backoff for 429 errors rather than failing immediately. :::

Interview Q&A

Q1: How do you design a production structured extraction pipeline that distinguishes between different failure types?

Design three separate code paths: (1) Structural failures - occur when the model produces output that fails JSON parsing or schema structure validation. With Instructor + OpenAI Structured Outputs, these should be near-zero. If they occur, they indicate a library misconfiguration or model API issue. Response: alert immediately, do not retry. (2) Semantic failures - valid structure, invalid business logic. For example, an invoice where total doesn't match subtotal + tax. These are retryable with more explicit prompting. Response: retry once with additional constraints in the prompt, then escalate to human review. (3) Refusals - the model explicitly declines to extract. These indicate upstream data problems (wrong document type, poor image quality, non-relevant content). Response: route to human review, do not retry with the same model. Track the document for quality analysis.

Q2: How do you handle schema versioning in a production extraction pipeline?

Four principles: (1) Additive evolution only - new fields must be Optional with defaults, backward-compatible with existing code; (2) Content-hash schemas - compute a hash of the JSON schema and include it in cache keys, database records, and monitoring labels so you know which schema version produced each result; (3) Migration periods for renames - when renaming a field, add the new name as an additional field with a validator that backfills from the old name; remove the old name after all downstream code has been updated; (4) Never remove required fields without a migration - if a field becomes optional, change it to Optional[T] with a default; if you want to remove it entirely, make it optional for one release cycle, then remove it.

Q3: Describe the monitoring setup you would build for a structured generation pipeline.

Three layers: (1) Request-level metrics - per-schema counters for total requests, successes, structural failures, semantic failures, and refusals. Histograms for latency with p50/p95/p99 buckets. Counter for retry attempts. These go into Prometheus/Datadog. (2) Alert thresholds - alert on failure_rate > 5% (warning) or > 15% (critical) per schema over a 5-minute rolling window. Alert on p95 latency > 10 seconds. Alert on any increase in refusal rate (might indicate upstream data quality degradation). (3) Debug logging - for every failure, log: the document preview (first 200 chars, no PII), the failure type and error message, the last model response, and the attempt count. This is the information needed to reproduce and fix failures.

Q4: When is caching structured outputs safe, and when is it not?

Safe to cache: deterministic extraction at temperature=0 from static documents. The same document processed with the same model and same schema will produce the same output. Cache key: SHA256 of (schema_json + model_name + document). Cache with TTL (24h for invoices, 1h for news articles). Unsafe to cache: (1) documents containing timestamps, session IDs, or other dynamic content; (2) extractions with temperature > 0 (non-deterministic); (3) after any schema update (invalidate by including schema hash in key); (4) after any system prompt change; (5) user-personalized extractions where the same document might produce different valid results for different users. Always include the schema content hash (not just schema name) in cache keys - a schema rename without a content change should be a cache hit; a content change should be a cache miss.

Q5: How do you handle very long documents that exceed the model's context window?

Three strategies with different tradeoffs: (1) Smart truncation - preserve the most information-dense sections (document start and end for invoices, introduction and conclusion for articles). Inject a truncation notice so the model knows content was removed. Works when the key information is concentrated in specific document regions. (2) Hierarchical extraction - split into chunks, extract from each chunk, merge results. For invoices: extract header (vendor, date) from first 2K tokens, extract line items from middle chunks, extract totals from last 2K tokens, merge. Requires careful merge logic. (3) Summarize then extract - first pass: summarize the document to fit in context; second pass: extract from the summary. Works for semantic extraction (themes, sentiment) but loses specific details. Choose based on where the important information lives in your documents and what level of detail is required. Always test truncation strategies on your specific document type before deploying.

:::tip 🎮 Interactive Playground

Visualize this concept: Try the Prompt Routing demo on the EngineersOfAI Playground - no code required.

:::

© 2026 EngineersOfAI. All rights reserved.