:::tip 🎮 Interactive Playground Visualize this concept: Try the Embedding Space demo on the EngineersOfAI Playground - no code required. :::
Embedding Models in Production
Reading time: 40–50 minutes Interview relevance: Very High - embedding model selection and operational challenges appear in almost every ML/AI engineering system design interview Target roles: AI Engineer, ML Engineer, Backend Engineer (AI), MLOps Engineer, Research Engineer
The Crisis at 2 AM
The Slack message arrives at 2:07 AM: "RAG is broken. All similarity scores are near zero. Nothing is being retrieved."
The on-call engineer opens the dashboard. Retrieval accuracy - the metric they track as their north star for answer quality - has collapsed from 87% to 3%. The system is serving answers, but they are hallucinated garbage because almost no relevant context is being retrieved. Three hundred thousand active users are affected.
She traces back the deployment log. Six hours ago, a well-intentioned ML engineer updated the embedding model from voyage-2 to voyage-3. The motivation was valid: voyage-3 had scored 4 points higher on the MTEB benchmark. The engineer updated the embedding API call in the query path, deployed it, and went home. What they did not do was re-embed the 2 million documents in the knowledge base.
Here is the catastrophe in geometric terms: every document embedding in the vector database was generated by voyage-2. Those embeddings live in a high-dimensional space shaped by voyage-2's learned parameters - its particular notion of what "similar" means. The new query embeddings come from voyage-3, which lives in an entirely different geometric space. Measuring cosine similarity between a voyage-2 document embedding and a voyage-3 query embedding is like measuring the distance between a point on Mars and a point on Earth using a ruler that only measures distances within a single planet. The numbers are meaningless.
The fix requires re-embedding all 2 million documents using voyage-3. At 1,000 documents per second (a generous throughput estimate accounting for API rate limits), that takes 33 minutes of pure API time. But the actual migration takes 3 days - because the team must validate each batch, maintain the old index for rollback, manage dual-write during the transition, and respect a 6-hour rate limit window negotiated with the embedding provider. For 3 days, the system serves degraded results. The company loses user trust, and the post-mortem leads to an engineering-wide policy change.
The root cause was not using a better embedding model. The root cause was the team never established an embedding compatibility policy. They did not understand that an embedding model is not a swappable component like a sorting algorithm - it is a permanent architectural commitment that shapes every vector in your entire data store. Changing it is not a configuration change. It is a migration on the scale of a database schema change.
This lesson is about understanding embeddings deeply enough that you never make this mistake, and about building the systems that make your team resilient even when someone else does.
Why Embeddings Matter
What an Embedding Actually Is
An embedding is a dense vector - a list of floating point numbers - that represents the semantic meaning of a piece of text in a continuous geometric space. When a model encodes the sentence "The dog chased the cat," it produces something like:
[0.023, -0.847, 0.412, 0.001, ..., -0.234] # 1024 numbers
The genius of these representations is that semantic similarity becomes geometric proximity. "The puppy ran after the kitten" produces a nearly identical vector. "The stock market crashed" produces a completely different vector, far away in this space. Retrieval becomes a geometric search problem: find the stored vectors nearest to the query vector.
This geometric metaphor is not just a convenient analogy - it is exactly how similarity search is implemented. You measure distances (usually cosine similarity) between vectors, and the ones with the smallest angular distance (highest similarity score) are the most semantically relevant.
The Compatibility Catastrophe
The critical insight the opening story illustrates: embeddings from different models are not comparable. There is no meaningful number you can produce by computing cosine similarity between a vector from model-A and a vector from model-B. Each model defines its own geometry for the embedding space - the axes mean different things, the clusters form around different concepts, the distances follow different distributions.
This is why embedding model selection is a permanent architectural decision:
Documents embedded with Model A → Vector DB ← Queries encoded with Model A ✓ Works
Documents embedded with Model A → Vector DB ← Queries encoded with Model B ✗ Garbage
Once you embed your corpus with Model A, every query must also go through Model A. Every new document must go through Model A. If you ever want to switch to Model B, you must re-embed your entire corpus.
Cosine Similarity: The Right Metric for Embeddings
For high-dimensional embedding vectors, cosine similarity is almost always the correct distance metric:
This measures the angle between two vectors, not their magnitude. Why this matters:
Dot product works when vectors are normalized to unit length, because then . Many embedding models output normalized vectors by default, making dot product equivalent to cosine similarity and computationally cheaper.
Euclidean distance (L2) is wrong for high-dimensional embedding vectors. In high dimensions, the triangle inequality degrades in a phenomenon called "concentration of measure." All points tend to be approximately equidistant from each other, making Euclidean distance increasingly uninformative. For embeddings specifically, Euclidean distance conflates magnitude (how much content) with direction (what the content means). You want direction.
import numpy as np
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Compute cosine similarity between two vectors."""
# Normalize both vectors
a_norm = a / np.linalg.norm(a)
b_norm = b / np.linalg.norm(b)
# Dot product of normalized vectors = cosine similarity
return float(np.dot(a_norm, b_norm))
# Demonstration: same direction, different magnitude
a = np.array([1.0, 2.0, 3.0])
b = np.array([2.0, 4.0, 6.0]) # Exactly 2x the magnitude of a
print(f"Cosine similarity: {cosine_similarity(a, b):.4f}") # 1.0 - identical direction
print(f"Euclidean distance: {np.linalg.norm(a - b):.4f}") # 3.74 - misleading "far apart"
# Cosine correctly identifies semantic equivalence.
# Euclidean incorrectly penalizes for longer document text.
The cosine similarity correctly identifies that a and b have the same meaning (same direction). Euclidean distance incorrectly marks them as distant.
The Embedding Model Landscape
Voyage AI: The Anthropic-Recommended Choice
When building RAG systems with Claude, Anthropic explicitly recommends Voyage AI as the embedding provider. Voyage was purpose-built for retrieval - their models are trained specifically for the retrieval task, not as a general-purpose byproduct.
voyage-3: Voyage's flagship model. 1024 dimensions. Excels at general retrieval tasks, code, and domain-specific content. The right default for Claude RAG systems.
voyage-3-lite: Smaller, faster, cheaper. 512 dimensions. The quality gap vs. voyage-3 is small enough that you should benchmark on your specific data before assuming you need the larger model.
voyage-code-3: Specialized for code retrieval. When your RAG system answers questions about codebases, this significantly outperforms general-purpose models because it understands programming language syntax, function signatures, and code structure.
voyage-finance-2: Domain-specialized for financial documents. Particularly strong on 10-Ks, earnings calls, and financial analysis tasks.
Key advantage: tight integration with Claude's context. Voyage models are tuned with Claude's context window in mind, meaning retrieved chunks tend to have the right granularity for Claude's generation step.
OpenAI Embeddings
text-embedding-3-small: 1536 dimensions (can request fewer). Excellent price-performance ratio. The go-to if you are already on OpenAI infrastructure.
text-embedding-3-large: 3072 dimensions. OpenAI's highest quality option. Worth the cost for applications where retrieval quality is business-critical.
Key feature: OpenAI's models support dimension reduction via the dimensions parameter. You can request 256, 512, or any size up to the maximum. This is genuine lower-dimensional embedding (not post-hoc truncation) and retains most quality. Useful for memory-constrained deployments.
Cohere Embeddings
embed-english-v3.0: 1024 dimensions. One unique feature: Cohere requires you to specify input_type - search_document or search_query. This asymmetric embedding (different representations for documents vs. queries) is a genuine quality boost for retrieval tasks.
embed-multilingual-v3.0: 1024 dimensions. When you need cross-language retrieval (user asks in English, documents are in Spanish), this is the best managed option.
Open Source Options
BGE (BAAI General Embedding): The BAAI institute's series (bge-small-en, bge-base-en, bge-large-en, bge-m3) represents the state of the art in open source retrieval models. bge-m3 is particularly impressive: it handles multilingual text and produces both dense and sparse embeddings from a single model. If you can self-host, BGE offers nearly API-level quality at the cost of your own compute.
E5 (Microsoft): e5-large-v2 and multilingual variants are strong performers that punch above their size class. Like BGE, freely available on HuggingFace.
GTE (Alibaba/Thudm): A strong competitor to E5, particularly for multilingual tasks. The gte-large model is competitive with models 3-4x its size.
Dimension and Cost Comparison
| Model | Dimensions | MTEB Score | Cost (per 1M tokens) |
|---|---|---|---|
| voyage-3 | 1024 | 68.2 | $0.06 |
| voyage-3-lite | 512 | 67.1 | $0.02 |
| text-embedding-3-large | 3072 | 64.6 | $0.13 |
| text-embedding-3-small | 1536 | 62.3 | $0.02 |
| cohere-embed-v3 | 1024 | 64.5 | $0.10 |
| bge-large-en-v1.5 | 1024 | 63.9 | Self-hosted |
| bge-m3 | 1024 | 66.5 | Self-hosted |
Higher dimensions generally mean better quality - more "room" to encode semantic nuance. But higher dimensions also mean more storage, slower ANN search, and higher memory requirements.
The practical sweet spot: 1024 dimensions for most production systems. Beyond 1024, you pay increasingly large costs for increasingly small quality gains.
Critical Production Decisions
Decision 1: Dimension Selection
Do not default to maximum dimensions. Work through the storage math:
def calculate_storage_requirements(
num_documents: int,
chunks_per_document: float,
dimensions: int,
bytes_per_float: int = 4,
) -> dict:
"""Calculate RAM and storage requirements for a vector index."""
total_vectors = int(num_documents * chunks_per_document)
bytes_raw = total_vectors * dimensions * bytes_per_float
# HNSW index overhead: approximately 1.5-2x the raw vector data
bytes_with_index = bytes_raw * 2.0
return {
"total_vectors": f"{total_vectors:,}",
"raw_storage_gb": round(bytes_raw / (1024**3), 1),
"with_index_gb": round(bytes_with_index / (1024**3), 1),
}
# 500K documents, average 5 chunks each
for dims in [256, 512, 1024, 1536, 3072]:
req = calculate_storage_requirements(500_000, 5, dims)
print(f"{dims:4d} dims: {req['raw_storage_gb']:5.1f} GB raw, "
f"{req['with_index_gb']:5.1f} GB with index")
Output:
256 dims: 2.4 GB raw, 4.8 GB with index
512 dims: 4.8 GB raw, 9.5 GB with index
1024 dims: 9.5 GB raw, 19.1 GB with index
1536 dims: 14.3 GB raw, 28.6 GB with index
3072 dims: 28.6 GB raw, 57.2 GB with index
A 3072-dimension index for 500K documents requires 57GB of RAM. For most teams, 1024 dimensions at 19GB is far more manageable.
Decision 2: Always Normalize Your Embeddings
Normalize before storing. Not at query time. Not sometimes. Always, before storage:
import numpy as np
def normalize_embedding(embedding: list[float]) -> list[float]:
"""Normalize embedding to unit length for cosine similarity."""
arr = np.array(embedding, dtype=np.float32)
norm = np.linalg.norm(arr)
if norm < 1e-10:
return arr.tolist() # Guard against zero vectors
return (arr / norm).tolist()
When all vectors are unit-length normalized, cosine similarity equals dot product. Modern ANN libraries (HNSW, FAISS) are optimized for dot product. Normalizing upfront gives you the correctness of cosine similarity at the speed of dot product.
Decision 3: Batch Size Optimization
Embedding APIs charge per token, but throughput depends heavily on batching:
- Too small batches: you pay per-request overhead repeatedly, total time is dominated by network round trips
- Too large batches: you hit rate limits, memory constraints, or API timeouts
Good starting points:
- Managed APIs (Voyage, OpenAI): 100–256 texts per batch
- Self-hosted models: limited by GPU memory, typically 32–128 texts per batch
Always benchmark with your actual text lengths - a batch of 100 short sentences has very different characteristics than a batch of 100 long paragraphs.
Decision 4: Embedding Caching
Embeddings are deterministic: same text + same model = same embedding. Perfect caching targets.
import hashlib, json
def cache_key(text: str, model_version_key: str) -> str:
"""
Cache key that includes model version.
Changing the model automatically invalidates all old cache entries.
"""
content = json.dumps(
{"text": text.strip(), "model": model_version_key},
sort_keys=True,
ensure_ascii=False,
)
return f"emb:v1:{hashlib.sha256(content.encode()).hexdigest()}"
Cache aggressively for:
- Frequently repeated queries (user questions cluster around common patterns)
- Document chunks during re-indexing passes
- Development and testing (avoid burning API quota)
Do not cache if:
- Documents change frequently (cache invalidation becomes complex)
- Memory constraints prohibit caching millions of embeddings
- You are doing semantic deduplication (need fresh embeddings)
Decision 5: Model Versioning
Treat embedding model versions like database schema versions. Every stored embedding must carry metadata about what generated it:
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class EmbeddingModelRecord:
"""Metadata that must travel with every stored embedding."""
model_id: str # e.g., "voyage-3"
model_version: str # e.g., "2024-10-01" (provider release date)
dimensions: int # e.g., 1024
normalize: bool # Whether outputs are normalized
embedded_at: datetime # When this batch was generated
def is_compatible_with(self, other: "EmbeddingModelRecord") -> bool:
"""Two model records are compatible only if they are identical."""
return (
self.model_id == other.model_id
and self.model_version == other.model_version
and self.dimensions == other.dimensions
)
Decision 6: Multilingual Requirements
Three strategies, in ascending complexity:
Strategy 1 - Single multilingual model: One model handles all languages. Cross-language search works out of the box. Simpler to operate. Quality may lag behind monolingual models. Use cohere-embed-multilingual-v3.0 or bge-m3.
Strategy 2 - Language detection + routing: Detect the document language, use a per-language specialist model, maintain per-language indexes. Best quality per language, much more complex to operate.
Strategy 3 - Translation pipeline: Translate everything to English before embedding. Works surprisingly well. Adds latency and translation costs.
Start with Strategy 1. Only invest in Strategy 2 if benchmarking shows meaningful quality gaps on your actual data.
Architecture and Diagrams
Embedding Pipeline with Caching and Batching
Embedding Model Selection Decision Tree
Model Migration Risk Analysis
Production Code
The following implementation covers the full production embedding stack: configuration, caching, batching, version management, and end-to-end RAG with Claude.
"""
Production Embedding System for RAG
=====================================
Covers:
- EmbeddingConfig: model configuration with versioning
- EmbeddingCache: content-addressed cache with model-aware keys
- ProductionEmbedder: batched, cached, retry-enabled embedding
- EmbeddingVersionManager: detect incompatibilities, manage migrations
- EmbeddingAwareRAGPipeline: embed → store → retrieve → Claude generates
Install: pip install voyageai anthropic numpy
"""
import anthropic
import numpy as np
import hashlib
import json
import time
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────
# Configuration
# ─────────────────────────────────────────────
class EmbeddingProvider(Enum):
VOYAGE = "voyage"
OPENAI = "openai"
COHERE = "cohere"
LOCAL = "local"
@dataclass
class EmbeddingConfig:
"""
Immutable configuration for an embedding model.
Treat like a database schema version - changing any field
means all existing embeddings are incompatible with new ones.
"""
model_id: str
provider: EmbeddingProvider
dimensions: int
normalize: bool = True
max_batch_size: int = 128
max_tokens_per_text: int = 32000
input_type: Optional[str] = None # Cohere: "search_document" / "search_query"
@property
def version_key(self) -> str:
"""
Stable identifier for this exact model configuration.
Used as part of cache keys and compatibility checks.
"""
return f"{self.provider.value}/{self.model_id}/{self.dimensions}"
# Pre-defined configs for common models
VOYAGE_3 = EmbeddingConfig(
model_id="voyage-3",
provider=EmbeddingProvider.VOYAGE,
dimensions=1024,
normalize=True,
max_batch_size=128,
)
VOYAGE_3_LITE = EmbeddingConfig(
model_id="voyage-3-lite",
provider=EmbeddingProvider.VOYAGE,
dimensions=512,
normalize=True,
max_batch_size=256,
)
OPENAI_3_SMALL = EmbeddingConfig(
model_id="text-embedding-3-small",
provider=EmbeddingProvider.OPENAI,
dimensions=1536,
normalize=True,
max_batch_size=100,
)
# ─────────────────────────────────────────────
# Cache Layer
# ─────────────────────────────────────────────
class EmbeddingCache:
"""
Embedding cache with model-aware keys.
Keys include the model version, so switching models
automatically invalidates old cache entries without
requiring an explicit cache clear.
In production: replace self._store with a Redis client:
import redis
self._redis = redis.Redis(host="localhost", port=6379)
"""
def __init__(self, ttl_seconds: int = 86400 * 7):
self.ttl = ttl_seconds
# Production: redis.Redis(...)
self._store: dict[str, tuple[list[float], float]] = {}
self.hits = 0
self.misses = 0
def _make_key(self, text: str, model_version_key: str) -> str:
"""
Deterministic, model-aware, fixed-length cache key.
Different model → different key → automatic cache miss.
"""
content = json.dumps(
{"text": text.strip(), "model": model_version_key},
sort_keys=True,
ensure_ascii=False,
)
return f"emb:v1:{hashlib.sha256(content.encode('utf-8')).hexdigest()}"
def get(self, text: str, config: EmbeddingConfig) -> Optional[list[float]]:
"""Retrieve embedding from cache. Returns None on miss or expiry."""
key = self._make_key(text, config.version_key)
if key in self._store:
embedding, stored_at = self._store[key]
if time.time() - stored_at < self.ttl:
self.hits += 1
return embedding
del self._store[key]
self.misses += 1
return None
def set(self, text: str, config: EmbeddingConfig, embedding: list[float]) -> None:
"""Store embedding in cache."""
key = self._make_key(text, config.version_key)
self._store[key] = (embedding, time.time())
def get_batch(
self,
texts: list[str],
config: EmbeddingConfig,
) -> dict[int, list[float]]:
"""Batch cache lookup. Returns {index: embedding} for hits only."""
results = {}
for i, text in enumerate(texts):
cached = self.get(text, config)
if cached is not None:
results[i] = cached
return results
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
def stats(self) -> dict:
return {
"hits": self.hits,
"misses": self.misses,
"hit_rate_pct": f"{self.hit_rate:.1%}",
"cached_items": len(self._store),
}
# ─────────────────────────────────────────────
# Core Embedder
# ─────────────────────────────────────────────
@dataclass
class EmbeddingResult:
"""Result from a single embedding call."""
text: str
embedding: list[float]
model_version_key: str
from_cache: bool
latency_ms: float
class ProductionEmbedder:
"""
Production-grade embedder with:
- Automatic batching (respects max_batch_size)
- Content-addressed caching with model-aware keys
- Retry with exponential backoff
- Normalization enforcement
- Throughput tracking
"""
def __init__(
self,
config: EmbeddingConfig,
api_key: str,
cache: Optional[EmbeddingCache] = None,
max_retries: int = 3,
):
self.config = config
self.cache = cache or EmbeddingCache()
self.max_retries = max_retries
self._total_calls = 0
if config.provider == EmbeddingProvider.VOYAGE:
try:
import voyageai
self._client = voyageai.Client(api_key=api_key)
except ImportError:
raise ImportError("pip install voyageai")
elif config.provider == EmbeddingProvider.OPENAI:
try:
from openai import OpenAI
self._client = OpenAI(api_key=api_key)
except ImportError:
raise ImportError("pip install openai")
else:
raise ValueError(f"Provider {config.provider} not yet implemented")
def _call_api(self, texts: list[str], input_type: str = "document") -> list[list[float]]:
"""Raw API call with retry and exponential backoff."""
for attempt in range(self.max_retries):
try:
if self.config.provider == EmbeddingProvider.VOYAGE:
response = self._client.embed(
texts=texts,
model=self.config.model_id,
input_type=input_type,
)
embeddings = response.embeddings
elif self.config.provider == EmbeddingProvider.OPENAI:
response = self._client.embeddings.create(
input=texts,
model=self.config.model_id,
dimensions=self.config.dimensions,
)
embeddings = [item.embedding for item in response.data]
self._total_calls += 1
return embeddings
except Exception as e:
if attempt == self.max_retries - 1:
raise
wait = 2 ** attempt # 1s, 2s, 4s
logger.warning(
f"Embedding API failed (attempt {attempt + 1}/{self.max_retries}): {e}. "
f"Retrying in {wait}s..."
)
time.sleep(wait)
raise RuntimeError("Unreachable")
def _normalize(self, embedding: list[float]) -> list[float]:
"""Normalize to unit length for cosine similarity via dot product."""
if not self.config.normalize:
return embedding
arr = np.array(embedding, dtype=np.float32)
norm = np.linalg.norm(arr)
if norm < 1e-10:
logger.warning("Near-zero embedding vector - possible bad input text")
return embedding
return (arr / norm).tolist()
def embed_single(self, text: str, use_cache: bool = True) -> EmbeddingResult:
"""Embed a single text with caching and retry."""
start = time.time()
if use_cache:
cached = self.cache.get(text, self.config)
if cached is not None:
return EmbeddingResult(
text=text,
embedding=cached,
model_version_key=self.config.version_key,
from_cache=True,
latency_ms=(time.time() - start) * 1000,
)
embeddings = self._call_api([text], input_type="document")
embedding = self._normalize(embeddings[0])
if use_cache:
self.cache.set(text, self.config, embedding)
return EmbeddingResult(
text=text,
embedding=embedding,
model_version_key=self.config.version_key,
from_cache=False,
latency_ms=(time.time() - start) * 1000,
)
def embed_batch(
self,
texts: list[str],
use_cache: bool = True,
show_progress: bool = False,
) -> list[EmbeddingResult]:
"""
Embed a batch of texts efficiently.
Strategy:
1. Check cache for all texts (O(n) cache reads)
2. Make API calls only for cache misses
3. Split misses into sub-batches respecting max_batch_size
4. Return results in original order
"""
results: list[Optional[EmbeddingResult]] = [None] * len(texts)
# Step 1: Cache lookup for all
cache_hits = self.cache.get_batch(texts, self.config) if use_cache else {}
for idx, embedding in cache_hits.items():
results[idx] = EmbeddingResult(
text=texts[idx],
embedding=embedding,
model_version_key=self.config.version_key,
from_cache=True,
latency_ms=0.1,
)
# Step 2: Collect cache misses
miss_indices = [i for i in range(len(texts)) if i not in cache_hits]
miss_texts = [texts[i] for i in miss_indices]
if not miss_texts:
if show_progress:
print(f" 100% cache hit ({len(texts)} texts)")
return results # type: ignore
# Step 3: Batch API calls for misses
batch_size = self.config.max_batch_size
total_batches = (len(miss_texts) + batch_size - 1) // batch_size
for batch_num in range(total_batches):
b_start = batch_num * batch_size
b_end = min(b_start + batch_size, len(miss_texts))
batch_texts = miss_texts[b_start:b_end]
batch_indices = miss_indices[b_start:b_end]
if show_progress:
print(
f" Batch {batch_num + 1}/{total_batches}: "
f"{len(batch_texts)} texts",
end="\r",
)
t0 = time.time()
batch_embeddings = self._call_api(batch_texts, input_type="document")
elapsed_ms = (time.time() - t0) * 1000
per_text_ms = elapsed_ms / len(batch_texts)
for original_idx, text, embedding in zip(
batch_indices, batch_texts, batch_embeddings
):
normalized = self._normalize(embedding)
if use_cache:
self.cache.set(text, self.config, normalized)
results[original_idx] = EmbeddingResult(
text=text,
embedding=normalized,
model_version_key=self.config.version_key,
from_cache=False,
latency_ms=per_text_ms,
)
if show_progress:
print(
f"\n Done: {len(miss_texts)} API calls, "
f"{len(cache_hits)} cache hits"
)
return results # type: ignore
def embed_query(self, query: str) -> list[float]:
"""
Embed a search query.
Uses input_type='query' for models that distinguish
document vs. query representations (Voyage, Cohere).
Query embeddings are also cached - user queries cluster heavily.
"""
cached = self.cache.get(f"query:{query}", self.config)
if cached is not None:
return cached
embeddings = self._call_api([query], input_type="query")
normalized = self._normalize(embeddings[0])
self.cache.set(f"query:{query}", self.config, normalized)
return normalized
# ─────────────────────────────────────────────
# Version Manager
# ─────────────────────────────────────────────
@dataclass
class CorpusMetadata:
"""What model generated the embeddings in a given corpus."""
corpus_id: str
model_version_key: str
dimensions: int
document_count: int
embedded_at: datetime
is_migrating: bool = False
migration_target: Optional[str] = None
class EmbeddingVersionManager:
"""
Manages embedding model version metadata for each corpus.
This is the safety layer that prevents the 2 AM crisis.
Call check_compatibility() before every search operation.
"""
def __init__(self):
# Production: persist in PostgreSQL or DynamoDB
self._corpora: dict[str, CorpusMetadata] = {}
def register(
self,
corpus_id: str,
config: EmbeddingConfig,
document_count: int,
) -> None:
"""Register a corpus with its embedding model configuration."""
self._corpora[corpus_id] = CorpusMetadata(
corpus_id=corpus_id,
model_version_key=config.version_key,
dimensions=config.dimensions,
document_count=document_count,
embedded_at=datetime.utcnow(),
)
logger.info(
f"Corpus '{corpus_id}' registered: {config.version_key}, "
f"{document_count:,} docs"
)
def check_compatibility(
self,
corpus_id: str,
query_config: EmbeddingConfig,
) -> tuple[bool, str]:
"""
Verify query model is compatible with stored corpus.
Returns (is_compatible, human_readable_reason).
ALWAYS call this before ANN search. If it returns False,
NEVER proceed - results will be meaningless.
"""
meta = self._corpora.get(corpus_id)
if meta is None:
return False, f"Corpus '{corpus_id}' is not registered"
if meta.is_migrating:
return False, (
f"Corpus '{corpus_id}' is migrating to {meta.migration_target}. "
f"System is in degraded mode - searches temporarily disabled."
)
if meta.model_version_key != query_config.version_key:
return False, (
f"MODEL MISMATCH - corpus was embedded with "
f"'{meta.model_version_key}' but query uses "
f"'{query_config.version_key}'. "
f"Results would be completely wrong. "
f"Re-embed the corpus or use the original model."
)
return True, "Compatible"
def plan_migration(
self,
corpus_id: str,
new_config: EmbeddingConfig,
document_count: int,
batch_size: int = 128,
api_calls_per_second: float = 2.0,
) -> dict:
"""
Estimate cost and time for migrating a corpus to a new model.
Present to stakeholders BEFORE committing to a migration.
"""
meta = self._corpora.get(corpus_id)
if not meta:
return {"error": f"Corpus '{corpus_id}' not found"}
api_calls = document_count / batch_size
total_secs = api_calls / api_calls_per_second
total_hours = total_secs / 3600
return {
"corpus_id": corpus_id,
"current_model": meta.model_version_key,
"target_model": new_config.version_key,
"documents": f"{document_count:,}",
"estimated_api_calls": f"{int(api_calls):,}",
"estimated_hours": f"{total_hours:.1f}",
"strategy": "dual-index (keep old live during migration)",
"rollback_possible": True,
"recommendation": (
"Benchmark new model on 500 held-out queries before starting. "
"Use A/B traffic split (10/50/100%) to validate improvement. "
"Never hard-cutover without quality gate."
),
}
def begin_migration(self, corpus_id: str, target: str) -> None:
"""Mark corpus as migrating - blocks searches during migration window."""
if corpus_id in self._corpora:
self._corpora[corpus_id].is_migrating = True
self._corpora[corpus_id].migration_target = target
def complete_migration(
self, corpus_id: str, new_config: EmbeddingConfig, document_count: int
) -> None:
"""Update metadata to new model after successful migration."""
self._corpora[corpus_id] = CorpusMetadata(
corpus_id=corpus_id,
model_version_key=new_config.version_key,
dimensions=new_config.dimensions,
document_count=document_count,
embedded_at=datetime.utcnow(),
)
logger.info(f"Migration complete: '{corpus_id}' → {new_config.version_key}")
# ─────────────────────────────────────────────
# Simple In-Memory Vector Store
# ─────────────────────────────────────────────
@dataclass
class DocumentChunk:
chunk_id: str
document_id: str
text: str
embedding: Optional[list[float]] = None
metadata: dict = field(default_factory=dict)
class InMemoryVectorStore:
"""
Brute-force vector store for prototyping and small corpora (< 100K vectors).
In production: use Qdrant, Pinecone, Weaviate, or pgvector.
"""
def __init__(self):
self.chunks: list[DocumentChunk] = []
def add(self, chunk: DocumentChunk) -> None:
assert chunk.embedding is not None, "Chunk must have an embedding before storage"
self.chunks.append(chunk)
def search(
self,
query_embedding: list[float],
k: int = 5,
) -> list[tuple[DocumentChunk, float]]:
"""
Brute-force cosine similarity search.
O(n*d) - fine for prototyping, not for > 100K vectors.
"""
if not self.chunks:
return []
q = np.array(query_embedding, dtype=np.float32)
scores = []
for chunk in self.chunks:
d = np.array(chunk.embedding, dtype=np.float32)
# Both normalized → dot product = cosine similarity
score = float(np.dot(q, d))
scores.append((chunk, score))
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:k]
# ─────────────────────────────────────────────
# Complete RAG Pipeline
# ─────────────────────────────────────────────
class EmbeddingAwareRAGPipeline:
"""
Production RAG pipeline with embedding version safety.
Safety guarantee: if the embedding model used for queries does not
exactly match the model used to build the index, this pipeline
raises a hard error rather than silently returning garbage results.
"""
def __init__(
self,
embedder: ProductionEmbedder,
vector_store: InMemoryVectorStore,
version_manager: EmbeddingVersionManager,
corpus_id: str,
anthropic_api_key: str,
):
self.embedder = embedder
self.vector_store = vector_store
self.version_manager = version_manager
self.corpus_id = corpus_id
self.claude = anthropic.Anthropic(api_key=anthropic_api_key)
def index_documents(self, documents: list[dict]) -> None:
"""
Embed and index a list of documents.
Each document: {"id": str, "text": str, "metadata": dict}
"""
texts = [doc["text"] for doc in documents]
print(f"Embedding {len(texts)} documents...")
results = self.embedder.embed_batch(texts, show_progress=True)
for doc, result in zip(documents, results):
chunk = DocumentChunk(
chunk_id=f"{doc['id']}_chunk_0",
document_id=doc["id"],
text=doc["text"],
embedding=result.embedding,
metadata=doc.get("metadata", {}),
)
self.vector_store.add(chunk)
self.version_manager.register(
corpus_id=self.corpus_id,
config=self.embedder.config,
document_count=len(documents),
)
print(f"Indexed {len(documents)} documents | Model: {self.embedder.config.version_key}")
def retrieve(self, query: str, k: int = 5) -> list[tuple[DocumentChunk, float]]:
"""
Retrieve top-k relevant chunks.
Performs model compatibility check before every search.
If models are incompatible, raises RuntimeError - never silently fails.
"""
# SAFETY CHECK - must pass before any ANN search
ok, reason = self.version_manager.check_compatibility(
corpus_id=self.corpus_id,
query_config=self.embedder.config,
)
if not ok:
raise RuntimeError(
f"Embedding compatibility failure: {reason}\n"
"Search aborted to prevent incorrect results."
)
query_embedding = self.embedder.embed_query(query)
return self.vector_store.search(query_embedding, k=k)
def generate(
self,
query: str,
retrieved: list[tuple[DocumentChunk, float]],
) -> str:
"""Generate an answer using Claude with retrieved context."""
context = "\n\n".join(
f'<document index="{i}" relevance="{score:.3f}">\n{chunk.text}\n</document>'
for i, (chunk, score) in enumerate(retrieved, 1)
)
prompt = f"""You are a helpful assistant. Answer the question using ONLY the provided documents.
If the documents lack sufficient information, say so clearly.
<retrieved_documents>
{context}
</retrieved_documents>
<question>{query}</question>
Provide a clear, accurate answer. Cite document numbers when making specific claims."""
response = self.claude.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
)
return response.content[0].text
def answer(self, query: str, k: int = 5) -> dict:
"""Full pipeline: retrieve + generate. Returns answer with sources."""
t0 = time.time()
retrieved = self.retrieve(query, k=k)
answer = self.generate(query, retrieved)
return {
"query": query,
"answer": answer,
"sources": [
{
"document_id": chunk.document_id,
"preview": chunk.text[:150] + "...",
"relevance_score": round(score, 4),
}
for chunk, score in retrieved
],
"latency_ms": round((time.time() - t0) * 1000, 1),
"model_used": self.embedder.config.version_key,
"cache_stats": self.embedder.cache.stats(),
}
# ─────────────────────────────────────────────
# Demonstration
# ─────────────────────────────────────────────
def demonstrate_safety_layer():
"""
Show the model compatibility safety layer in action.
Demonstrates what the system should do when models are mismatched.
"""
print("=== Embedding Compatibility Safety Demo ===\n")
version_manager = EmbeddingVersionManager()
# Simulate corpus indexed with voyage-3
version_manager._corpora["my_corpus"] = CorpusMetadata(
corpus_id="my_corpus",
model_version_key=VOYAGE_3.version_key,
dimensions=1024,
document_count=2_000_000,
embedded_at=datetime.utcnow(),
)
# Correct model - should pass
ok, reason = version_manager.check_compatibility("my_corpus", VOYAGE_3)
print(f"Query with voyage-3 (correct): compatible={ok}, reason='{reason}'")
# Wrong model - should fail
ok, reason = version_manager.check_compatibility("my_corpus", VOYAGE_3_LITE)
print(f"Query with voyage-3-lite (wrong): compatible={ok}")
print(f" → {reason}\n")
# Migration plan
plan = version_manager.plan_migration(
corpus_id="my_corpus",
new_config=VOYAGE_3_LITE,
document_count=2_000_000,
)
print("Migration plan:")
for k, v in plan.items():
print(f" {k}: {v}")
print("\n=== Cache Model-Awareness Demo ===\n")
cache = EmbeddingCache()
fake_emb = [0.1] * 1024
cache.set("test text", VOYAGE_3, fake_emb)
hit_correct = cache.get("test text", VOYAGE_3)
hit_wrong = cache.get("test text", VOYAGE_3_LITE) # Different model key
print(f" Cache hit with correct model: {'HIT' if hit_correct else 'MISS'}")
print(f" Cache hit with wrong model: {'MISS (correct)' if not hit_wrong else 'HIT (bug!)'}")
print(f" Cache stats: {cache.stats()}")
if __name__ == "__main__":
demonstrate_safety_layer()
Production Metrics and Monitoring
A production embedding system needs observability across four dimensions:
Throughput
Embedding throughput (texts/second): Track over time. A sudden drop indicates API degradation, rate limiting, or infrastructure issues. Alert when throughput falls below 50% of your baseline.
Cache Performance
Cache hit rate: Should reach 40–80% in a stable production system. Below 20% suggests cache TTL is too short or query distribution is too diverse. Above 95% may indicate you are doing redundant re-indexing.
Query Latency Breakdown
| Phase | Typical Target | Alert Threshold |
|---|---|---|
| Query embedding (cache miss) | less than 50ms | greater than 200ms |
| Query embedding (cache hit) | less than 2ms | greater than 20ms |
| ANN search | less than 20ms | greater than 100ms |
| Claude generation | less than 2000ms | greater than 5000ms |
| Total pipeline | less than 2500ms | greater than 6000ms |
Embedding Coverage
Coverage = (chunks with embeddings) / (total chunks in corpus). Should be 100%. Alert at below 99.9%. Coverage drops after system failures or during migration windows - both require investigation.
Common Mistakes
:::danger Model Mismatch is Silent and Catastrophic
Never change the embedding model for queries without re-embedding the entire corpus. The system will not throw an error - it will silently return near-zero similarity scores for everything. Use EmbeddingVersionManager.check_compatibility() before every search, and make incompatibility a hard error.
:::
:::danger Never Skip Normalization If you fail to normalize embeddings before storage, cosine similarity becomes meaningless (you get magnitude-biased scores). Normalize at write time, store normalized vectors, never rely on normalization happening "somewhere downstream." :::
:::warning Benchmark on Your Data, Not MTEB MTEB scores measure academic benchmarks. Your data is different. A model ranking third on MTEB may outperform the top-ranked model on your specific domain. Always run 100–500 representative queries on candidate models before committing to one for production. :::
:::tip Cache Query Embeddings Aggressively User queries cluster around common patterns. In a customer support RAG system, "How do I reset my password?" appears hundreds of times daily. Caching query embeddings costs almost nothing and can reduce query embedding latency from 50ms to under 2ms for common patterns. :::
:::tip Set Different input_type for Query vs. Document
For Voyage and Cohere models, use input_type="query" for search queries and input_type="document" for document chunks. These models apply different learned transformations to optimize query-document matching. Using the wrong type silently degrades retrieval quality by 5–15%.
:::
:::warning Self-Hosted Models Need Warm-Up Time If you switch from managed embedding APIs to self-hosted BGE or E5 models, account for GPU warm-up time (first inference is slow) and GPU memory requirements (bge-large-en needs ~2GB VRAM). Factor this into your capacity planning and health check endpoints. :::
Interview Questions and Answers
Q1: Why can't you use different embedding models for documents and queries?
Each embedding model defines its own geometric space. The axes represent different learned features, clusters form around different semantic concepts, and similarity scores from different models live on different scales. Computing cosine similarity between a vector from Model A and a vector from Model B produces a meaningless number - it is a distance measurement between two incomparable coordinate systems. The embedding space is model-specific: voyage-3 defines one geometry for "semantically similar," and voyage-3-lite defines a completely different geometry. You must use the same model for both documents and queries - this is the fundamental constraint of dense retrieval, and violating it produces silent failures that are catastrophically hard to debug.
Q2: How would you handle an embedding model going deprecated by the provider?
A managed migration: First, measure the migration cost - how many documents, what throughput, over what time window. Second, benchmark the replacement model on your actual retrieval tasks with 500 held-out queries to confirm it meets your quality bar. Third, spin up a new empty index and begin re-embedding documents into it using the replacement model, while keeping the old index live. This is the dual-index strategy - you never take the system down. Fourth, validate the new index on your held-out query set. Fifth, traffic shift: route 10% of queries to the new index, monitor retrieval metrics for 24 hours, then 50%, then 100%. Sixth, decommission the old index. The key principle: never hard-cutover. Always dual-write during migration, always have a rollback path.
Q3: What dimensions would you choose for a production RAG system with 5 million documents?
I would start with 1024 dimensions. The math: 5M vectors at 1024 dims at 4 bytes = 20GB raw, roughly 40GB with HNSW index overhead. That fits on a 64GB server. Moving to 1536 dims gives 30GB raw, 60GB with index - pushing against practical limits. Moving to 3072 dims requires 120GB with index - very expensive hardware or distributed search. The quality improvement from 1024 to higher dimensions is real but diminishing. I would benchmark voyage-3 (1024 dims) against text-embedding-3-large (3072 dims) on our specific retrieval tasks. Unless the quality gap exceeds 5 percentage points on Recall@10, the 19GB index vs. 120GB index makes 1024 dims the clear winner for operational simplicity.
Q4: How would you implement embedding caching for documents that change frequently?
Content-hash-based cache keys. The cache key includes a hash of the document text, so when a document changes, its hash changes, and the old cache entry becomes a miss - automatic invalidation with no explicit cache clear needed. Implementation: key = sha256(text_content + model_version). For the TTL, match it to your document update frequency. If documents change weekly, a 6-day TTL prevents serving stale embeddings. For real-time updated documents, skip caching document embeddings entirely and focus caching effort on query embeddings instead - queries cluster heavily and are usually worth caching even when the corpus is dynamic.
Q5: Design an embedding system that supports multiple clients with their own document corpora using potentially different embedding models.
The key insight is complete isolation between clients at the embedding model level. Each client corpus gets a CorpusConfig locked at creation time - model choice is immutable after corpus creation. Each client gets their own namespace in the vector database (separate collection in Qdrant, separate index in Pinecone). A VersionManager service stores corpus metadata in a shared database: which model, what dimensions, when last re-embedded, who owns it. Before any search, the system validates that the query model matches the corpus model - this check happens in a middleware layer before reaching the vector DB. If a client wants to upgrade their embedding model, they must trigger an explicit migration workflow through the admin API, not by changing a config flag. The embedding service maintains a pool of model clients and routes calls based on the corpus configuration - one client using voyage-3 and another using bge-large-en do not interfere with each other. Rate limits and costs are tracked per client.
Q6: What is the vocabulary mismatch problem and how do embeddings solve it?
Vocabulary mismatch occurs when a user query uses different words than the relevant document, even though they mean the same thing. A user asks about "car insurance" but the document discusses "automobile coverage." Traditional keyword search (BM25) fails completely - it looks for exact term overlap and finds none. Embedding models solve this by encoding semantic meaning rather than surface form. Because they are trained on large text corpora containing phrases like "a car is an automobile" and "insurance is a form of coverage," the model's parameters capture these equivalences. The embedding for "car insurance" ends up geometrically close to the embedding for "automobile coverage" even with zero word overlap. This is the core value proposition of dense retrieval - it handles paraphrasing, synonyms, and concept-level similarity. The limitation is exact term matching: if a user searches for a specific error code like ERRCODE_0x4A3F, embeddings may not help much - that is where hybrid search with BM25 becomes important.
Summary
Embedding models are a permanent architectural commitment, not a swappable component. The key production principles:
- Choose once, carefully - benchmark on your specific data and domain before committing. MTEB is a starting point, not the answer.
- Version everything - tag every stored embedding with the model that generated it. Make model incompatibility a hard error.
- Normalize at write time - normalize when you generate embeddings, store normalized vectors, never rely on downstream normalization.
- Cache aggressively - query embeddings are highly cacheable. Document embeddings use content-hash keys for automatic invalidation.
- Plan migrations carefully - dual-index strategy, quality validation, gradual traffic shifting. Never hard-cutover.
- Monitor continuously - throughput, cache hit rate, latency breakdown, embedding coverage.
The 2 AM crisis from the opening story is entirely preventable. With a VersionManager checking compatibility before every search, the incompatible model deployment would have produced a hard error immediately - not a silent collapse discovered hours later by end users.
