:::tip 🎮 Interactive Playground Visualize this concept: Try the FAISS Index Types demo on the EngineersOfAI Playground - no code required. :::
Building Embedding Pipelines
The Upgrade That Took 3 Days
The retrieval team at a mid-stage startup had 50 million documents indexed with text-embedding-ada-002. A new OpenAI model - text-embedding-3-large - promised significantly higher retrieval quality on their internal benchmarks. The decision to upgrade seemed straightforward until someone asked: "How do we re-index 50 million documents without taking search offline?"
The naive approach - drop the index, re-embed everything, rebuild - would take 3 days of compute and leave search completely unavailable for users during that window. The platform was a revenue-generating product. Three days of downtime was not an option.
What followed was a two-week engineering project to build a zero-downtime re-indexing pipeline. It involved dual-model serving, shadow indexing, a traffic router, incremental cutover, and a rollback mechanism. Along the way, the team discovered three categories of bugs they had never thought about: embedding drift (similarity scores changing even for documents that had not changed), normalization inconsistencies between old and new vectors, and a subtle chunking change that made the new embeddings for old documents incompatible in a way that was invisible from the vector store's perspective.
This lesson gives you the architecture for building embedding pipelines that avoid this pain. You will learn how to make model upgrades a planned, zero-downtime operation rather than an emergency sprint.
Why This Exists - The Pipeline Problem
A vector database is only as good as the embeddings inside it. But embeddings are not static artifacts - they degrade over time. Documents get added. Models improve. Domain language evolves. The embedding pipeline is the continuous operation that keeps your vector index current, accurate, and aligned with the latest model capabilities.
Most teams build this pipeline as an afterthought. They write a one-time ingestion script, dump 1 million documents into the vector database, and then add documents incrementally via a simple "embed and upsert" operation. This works until:
- The embedding model is deprecated and must be upgraded
- The document corpus grows 100× and batch re-embedding is needed
- Embedding quality drifts undetected because no one is monitoring similarity score distributions
- A bug in the chunking logic corrupts a subset of embeddings with no way to identify which ones
Building a production embedding pipeline requires thinking about all four of these scenarios from day one.
Embedding Model Selection
The Model Landscape
The embedding model market has three tiers:
API-based models (OpenAI, Cohere, Voyage AI): High quality, easy to start, no GPU required, but per-token cost and vendor dependency. text-embedding-3-large (3072 dims, also available at 1024 and 256 via MRL) is the current OpenAI standard. Cohere's embed-english-v3.0 and Voyage AI's voyage-3 are strong competitors with competitive pricing.
Open-weight sentence-transformers (all-MiniLM, e5, BGE, mxbai): Free to run, can be self-hosted on CPU or GPU. BAAI/bge-large-en-v1.5 (1024 dims) and intfloat/e5-large-v2 (1024 dims) are competitive with API models on MTEB benchmarks. Run on CPU for small-scale; GPU needed for production throughput.
Task-specific models: For code search, use models trained on code (CodeBERT, UniXcoder, StarCoder embeddings). For multilingual, use LaBSE or e5-multilingual. Domain-specific fine-tuning (fine-tuned on your domain's document pairs) typically yields 5–15% recall improvement over general-purpose models.
The MTEB Benchmark
Massive Text Embedding Benchmark (Muennighoff et al., 2022) is the standard evaluation for embedding models across 56 tasks. Check MTEB leaderboard (https://huggingface.co/spaces/mteb/leaderboard) for current rankings. Look at the specific task category that matches your use case: Retrieval for RAG, STS for semantic similarity, Classification if that matters.
:::warning MTEB scores do not always predict production performance MTEB measures retrieval quality on fixed academic benchmarks. Your production queries may differ significantly in length, formality, domain vocabulary, and language. Always evaluate embedding models on a sample of your actual production data before committing to a model upgrade. :::
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List
def evaluate_embedding_model_on_domain_data(
model_name: str,
query_doc_pairs: List[tuple], # List of (query, relevant_doc, irrelevant_doc)
) -> dict:
"""
Evaluate an embedding model on your domain data using triplet ranking.
relevant_doc should rank higher than irrelevant_doc for each query.
"""
model = SentenceTransformer(model_name)
correct = 0
scores_relevant = []
scores_irrelevant = []
for query, relevant_doc, irrelevant_doc in query_doc_pairs:
texts = [query, relevant_doc, irrelevant_doc]
embeddings = model.encode(texts, normalize_embeddings=True)
q_emb, rel_emb, irr_emb = embeddings
score_relevant = float(np.dot(q_emb, rel_emb))
score_irrelevant = float(np.dot(q_emb, irr_emb))
scores_relevant.append(score_relevant)
scores_irrelevant.append(score_irrelevant)
if score_relevant > score_irrelevant:
correct += 1
return {
"model": model_name,
"triplet_accuracy": correct / len(query_doc_pairs),
"mean_relevant_score": float(np.mean(scores_relevant)),
"mean_irrelevant_score": float(np.mean(scores_irrelevant)),
"mean_score_gap": float(np.mean(scores_relevant) - np.mean(scores_irrelevant)),
}
Batching for Throughput
Embedding 50 million documents one at a time is 50 million API calls. Even at 100ms per call, that is 1400 hours. Batching is not optional - it is the difference between a viable pipeline and one that takes 60 days to run.
API Batching (OpenAI, Cohere)
OpenAI's embedding API accepts batches of up to 2048 inputs per request. The throughput limit is in tokens per minute (TPM), not requests per minute. Maximize throughput by:
- Using large batches to minimize HTTP overhead
- Running concurrent requests up to the rate limit
- Using async/await to overlap I/O
- Using OpenAI's Batch API for non-time-sensitive workloads (50% cheaper, up to 24 hours turnaround)
import asyncio
import openai
import numpy as np
from typing import List
import time
client = openai.AsyncOpenAI()
async def embed_batch_openai(
texts: List[str],
model: str = "text-embedding-3-large",
dimensions: int = 1536,
) -> List[List[float]]:
"""Embed a batch of texts using OpenAI API."""
response = await client.embeddings.create(
input=texts,
model=model,
dimensions=dimensions,
)
return [item.embedding for item in response.data]
async def embed_corpus_concurrent(
texts: List[str],
batch_size: int = 512,
max_concurrent: int = 10,
model: str = "text-embedding-3-large",
) -> np.ndarray:
"""
Embed a large corpus concurrently with rate limiting.
max_concurrent controls how many API requests are in-flight simultaneously.
"""
batches = [texts[i:i+batch_size] for i in range(0, len(texts), batch_size)]
semaphore = asyncio.Semaphore(max_concurrent)
all_embeddings = []
async def embed_with_semaphore(batch: List[str]) -> List[List[float]]:
async with semaphore:
for attempt in range(3):
try:
return await embed_batch_openai(batch, model=model)
except openai.RateLimitError:
await asyncio.sleep(2 ** attempt)
raise RuntimeError("Failed after 3 retries")
tasks = [embed_with_semaphore(batch) for batch in batches]
results = await asyncio.gather(*tasks)
for batch_result in results:
all_embeddings.extend(batch_result)
return np.array(all_embeddings, dtype=np.float32)
### Local model batching (sentence-transformers)
from sentence_transformers import SentenceTransformer
import torch
def embed_corpus_local(
texts: List[str],
model_name: str = "BAAI/bge-large-en-v1.5",
batch_size: int = 256,
device: str = "cuda" if torch.cuda.is_available() else "cpu",
show_progress: bool = True,
) -> np.ndarray:
"""
Embed with a local model. batch_size depends on GPU VRAM.
For A100 40GB: batch_size=512 at d=1024 is feasible.
For CPU: batch_size=32 is typical.
"""
model = SentenceTransformer(model_name, device=device)
embeddings = model.encode(
texts,
batch_size=batch_size,
normalize_embeddings=True, # unit normalize for cosine similarity
show_progress_bar=show_progress,
convert_to_numpy=True,
)
return embeddings.astype(np.float32)
Incremental Indexing
In production, documents are added continuously. You need an incremental pipeline that embeds and indexes new documents without re-processing existing ones.
import hashlib
import json
from datetime import datetime
class EmbeddingMetadataStore:
"""
Track which documents have been embedded with which model version.
Critical for zero-downtime model upgrades.
"""
def __init__(self, db_connection):
self.db = db_connection
def get_embedding_status(self, doc_id: str) -> dict | None:
"""Returns None if document has never been embedded."""
return self.db.get(f"embedding:{doc_id}")
def record_embedding(
self,
doc_id: str,
model_name: str,
model_version: str,
content_hash: str,
dimensions: int,
):
record = {
"doc_id": doc_id,
"model_name": model_name,
"model_version": model_version,
"content_hash": content_hash,
"dimensions": dimensions,
"embedded_at": datetime.utcnow().isoformat(),
}
self.db.set(f"embedding:{doc_id}", record)
def needs_re_embedding(
self,
doc_id: str,
current_content: str,
target_model_version: str,
) -> bool:
"""Return True if document needs to be re-embedded."""
status = self.get_embedding_status(doc_id)
if status is None:
return True # Never embedded
# Re-embed if content changed
current_hash = hashlib.md5(current_content.encode()).hexdigest()
if status["content_hash"] != current_hash:
return True
# Re-embed if model version changed
if status["model_version"] != target_model_version:
return True
return False
def content_hash(text: str) -> str:
return hashlib.md5(text.encode()).hexdigest()
Zero-Downtime Model Upgrades
This is the hardest operational challenge in embedding pipeline management. When you upgrade from model A to model B, you must:
- Re-embed all documents with model B
- Build a new index using model B embeddings
- Switch all queries to use model B for query embedding
- Do steps 1–3 without downtime
The key constraint: during the migration, every query must use the same model as the index it is searching. A model-A query against a model-B index produces garbage.
class DualModelSearchRouter:
"""
Routes queries to the appropriate index during model migration.
Old documents live in index_v1 (model A).
New documents go to index_v2 (model B).
Migration gradually moves documents from v1 to v2.
"""
def __init__(self, index_v1, index_v2, embedder_v1, embedder_v2):
self.index_v1 = index_v1
self.index_v2 = index_v2
self.embedder_v1 = embedder_v1
self.embedder_v2 = embedder_v2
# Migration progress: 0.0 = fully on v1, 1.0 = fully on v2
self.migration_fraction = 0.0
def search(self, query: str, k: int = 10) -> list:
"""
During migration: search both indices and merge results.
After migration (fraction=1.0): only search v2.
"""
if self.migration_fraction >= 1.0:
# Migration complete: query only v2
q_emb = self.embedder_v2.encode(query)
return self.index_v2.search(q_emb, k=k)
elif self.migration_fraction <= 0.0:
# Migration not started: query only v1
q_emb = self.embedder_v1.encode(query)
return self.index_v1.search(q_emb, k=k)
else:
# Mid-migration: query both, merge by score
q_emb_v1 = self.embedder_v1.encode(query)
q_emb_v2 = self.embedder_v2.encode(query)
# Each index has a subset of documents
# Need more candidates to compensate for partial coverage
k_per_index = k * 2
results_v1 = self.index_v1.search(q_emb_v1, k=k_per_index)
results_v2 = self.index_v2.search(q_emb_v2, k=k_per_index)
# Merge: deduplicate by doc_id, take top-k by score
seen_ids = set()
merged = []
for result in sorted(results_v1 + results_v2, key=lambda x: x.score, reverse=True):
if result.doc_id not in seen_ids:
seen_ids.add(result.doc_id)
merged.append(result)
if len(merged) >= k:
break
return merged
def advance_migration(self, fraction_complete: float):
"""Update migration progress. Call this as background re-indexing completes."""
self.migration_fraction = min(1.0, max(0.0, fraction_complete))
Embedding Drift Detection
Embedding drift is silent and insidious. Even without any code changes, similarity score distributions can shift because:
- The corpus changes composition (new document types, different vocabulary distribution)
- The embedding model is updated by the provider without a version change (OpenAI has done this)
- Infrastructure changes alter how text is preprocessed before embedding
- A bug in preprocessing affects a subset of documents
import numpy as np
from scipy.stats import ks_2samp
class EmbeddingDriftMonitor:
"""
Monitor for shifts in embedding quality by tracking similarity score distributions.
"""
def __init__(self, reference_scores: np.ndarray):
"""
reference_scores: distribution of top-1 similarity scores from a healthy baseline period.
"""
self.reference_scores = reference_scores
self.reference_mean = float(np.mean(reference_scores))
self.reference_std = float(np.std(reference_scores))
def check_distribution(self, current_scores: np.ndarray) -> dict:
"""
Compare current similarity score distribution to reference.
Uses Kolmogorov-Smirnov test for distribution shift.
"""
# KS test: p-value < 0.05 indicates significant distribution shift
ks_stat, ks_p_value = ks_2samp(self.reference_scores, current_scores)
current_mean = float(np.mean(current_scores))
mean_shift = current_mean - self.reference_mean
return {
"current_mean": current_mean,
"reference_mean": self.reference_mean,
"mean_shift": mean_shift,
"mean_shift_pct": mean_shift / self.reference_mean * 100,
"ks_statistic": float(ks_stat),
"ks_p_value": float(ks_p_value),
"drift_detected": ks_p_value < 0.05 or abs(mean_shift) > 0.05,
"severity": (
"HIGH" if abs(mean_shift) > 0.10 or ks_p_value < 0.001
else "MEDIUM" if abs(mean_shift) > 0.05 or ks_p_value < 0.01
else "LOW"
),
}
Normalization and Dimensionality Reduction
PCA for Dimensionality Reduction
For cost-sensitive applications, reducing 1536 dimensions to 512 can cut storage and compute costs by 3× while losing less than 5% recall.
from sklearn.decomposition import PCA
import numpy as np
class EmbeddingReducer:
"""Fit PCA on a sample, then apply to all embeddings."""
def __init__(self, target_dims: int):
self.target_dims = target_dims
self.pca = PCA(n_components=target_dims, random_state=42)
self.fitted = False
def fit(self, sample_embeddings: np.ndarray):
"""Fit PCA on a representative sample (100K embeddings is sufficient)."""
self.pca.fit(sample_embeddings)
self.fitted = True
explained_variance = self.pca.explained_variance_ratio_.sum()
print(f"PCA {sample_embeddings.shape[1]}d → {self.target_dims}d")
print(f"Explained variance: {explained_variance:.4f}")
def transform(self, embeddings: np.ndarray) -> np.ndarray:
if not self.fitted:
raise RuntimeError("Call fit() first")
reduced = self.pca.transform(embeddings)
# Re-normalize after PCA (PCA does not preserve unit norm)
norms = np.linalg.norm(reduced, axis=1, keepdims=True) + 1e-8
return (reduced / norms).astype(np.float32)
Production Engineering Notes
Track model version per document. Every document in your vector store should have a metadata field recording which embedding model version produced it. When you run a model upgrade, you can query for documents with the old version and batch re-embed only those. Without this tracking, you cannot do targeted re-embedding.
Use OpenAI Batch API for bulk re-indexing. OpenAI's Batch API (launched 2024) processes large embedding jobs at 50% of the synchronous pricing with up to 24-hour turnaround. For re-indexing 50M documents where freshness is not critical, Batch API cuts embedding cost in half.
Test chunking logic changes as carefully as model changes. Changing how documents are split into chunks changes the content of each chunk, which changes its embedding, which invalidates the entire index. Treat chunking logic changes with the same rigor as model upgrades - they require full re-indexing.
Common Mistakes
:::danger Re-embedding without dual-index serving Stopping the old index while building the new one means search is unavailable during re-embedding. For 50M documents at 500ms per batch of 100, re-embedding takes 70 hours. Always use the dual-model router pattern: keep the old index serving while building the new one in the background. :::
:::warning Not validating recall after a model upgrade A new embedding model can score higher on MTEB while performing worse on your specific domain. Always run recall@K validation on a held-out set of query-document pairs from your production data before cutting traffic to the new index. :::
:::tip MRL models support free dimensionality reduction
OpenAI's text-embedding-3-large and text-embedding-3-small use Matryoshka Representation Learning - you can specify dimensions=512 and get a valid lower-dimensional embedding without running PCA. This is better than post-hoc PCA because the model was trained to produce high-quality embeddings at multiple dimensionalities. Use this before implementing custom PCA reduction.
:::
Interview Questions
Q1: You need to upgrade from OpenAI ada-002 to text-embedding-3-large for 50M documents. Walk through the migration plan.
Phase 1 (1 week): Stand up a new index (index_v2) alongside the existing index (index_v1). Start background re-embedding job using OpenAI Batch API for cost efficiency. All new document insertions go to both indices. All queries still go to index_v1 only. Phase 2 (ongoing): As Batch API jobs complete, upsert completed documents into index_v2 and delete them from index_v1. Track progress via the embedding metadata store. Phase 3 (validation): When migration_fraction reaches 0.8, begin A/B testing: route 10% of queries to the dual-router searching both indices. Measure recall@10 and downstream task performance. Phase 4 (cutover): When validation passes, set migration_fraction=1.0. All queries go to index_v2. Phase 5 (cleanup): After 48 hours monitoring, shut down index_v1.
Q2: How do you detect embedding drift in production?
Monitor the distribution of top-1 similarity scores from production queries using a rolling window. Establish a baseline distribution during a stable period. Use KS test or mean-shift monitoring to detect distribution changes. Alert when: mean top-1 similarity drops more than 0.05 (model behavior change), KS p-value drops below 0.01 (significant distribution shift), or recall@K measured against a fixed evaluation set drops more than 3%. Also monitor embedding norm distributions - unnormalized magnitude changes indicate preprocessing bugs.
Q3: What is the difference between OpenAI Batch API and synchronous embedding API, and when do you use each?
Synchronous API: results in seconds, higher cost (0.065/million tokens). Use for bulk re-indexing, initial corpus ingestion, and offline pipelines where latency to search availability is not critical. For a 50M document re-index, Batch API saves roughly 2000 depending on average document length.
Q4: A colleague proposes storing all embeddings in PostgreSQL as float arrays instead of a dedicated vector DB. When is this acceptable?
Acceptable when: dataset is under 500K documents and you are okay with sequential scan latency (a few seconds), or you plan to add pgvector extension and can accept pgvector's HNSW performance. Not acceptable when: you need ANN search with sub-100ms latency at scale, you need efficient metadata filtering on vectors, or you need horizontal scaling beyond a single PostgreSQL instance. The real advantage of storing in PostgreSQL (without pgvector) is ACID transactions and full SQL expressiveness - but you give up all ANN search performance.
Q5: How do you handle chunking strategy changes without full re-indexing?
You cannot avoid re-indexing if chunks change - a different chunk has a different embedding. What you can do is minimize the scope: maintain a content hash per document (not per chunk). When the chunking logic changes, only re-embed documents that will produce different chunks under the new strategy. For fixed-length chunking with overlap, only documents longer than the chunk size will be affected. Use your embedding metadata store to track which documents were processed with which chunking version. Stage the change - apply new chunking to new ingestion immediately and gradually re-process the historical corpus.
