Skip to main content

RAG Pipeline Ops

The Knowledge Base That Stopped Being Knowledge

The enterprise knowledge management platform had launched with a strong pitch: "Ask any question about company policy, procedures, or products - get an accurate answer instantly." The internal launch was well-received. Employees who previously had to search through SharePoint, email colleagues, or wait for HR to respond were getting answers in seconds.

Six months later, the feedback had shifted. "The system told me the parental leave policy was 12 weeks, but HR said it changed to 16 weeks in January." "I asked about the current vendor list and it gave me names of companies we stopped working with." "The product pricing information is completely wrong - those prices are from last year."

The team investigated. The knowledge base had been indexed once at launch and never refreshed. In six months, the company had updated 23 policy documents, discontinued 4 product lines, changed pricing on 15 SKUs, and added 2 new office locations. None of these changes had propagated to the vector database. The RAG system was confidently answering questions about a company that no longer existed in the form described by its knowledge base.

The failure was not in the retrieval or generation - those worked correctly. The failure was in operations: no refresh pipeline, no staleness monitoring, no alerting when source documents changed, no process for detecting when the vector database had drifted from the truth.


:::tip 🎮 Interactive Playground Visualize this concept: Try the RAG Pipeline demo on the EngineersOfAI Playground - no code required. :::

The RAG Operations Problem

A RAG system has three components that can each fail in different ways:

The source documents: Policies change, products get discontinued, facts become outdated. If your source documents change and your index does not, you are serving stale knowledge.

The chunking and embedding pipeline: The strategy for splitting documents into chunks and embedding them is a design choice with quality implications. A chunk size that worked for short policy documents may fail for long technical manuals. An embedding model that was state-of-the-art at launch may have been superseded.

The vector database: Indexes can drift (documents deleted from source but not from index), become corrupted, or develop performance issues (slow queries, high memory usage) as they grow.

Operational discipline for RAG means treating each of these as a managed system component with monitoring, alerting, and maintenance procedures.


The RAG Pipeline Architecture


Building a Change Detection Pipeline

The foundation of RAG operations: detect when source documents change and trigger index updates.

import hashlib
import json
import os
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Optional, Callable
from pathlib import Path

@dataclass
class DocumentRecord:
"""Tracks the state of a source document for change detection."""
doc_id: str
source_path: str
content_hash: str
last_modified: str
indexed_at: str
chunk_ids: List[str] = field(default_factory=list) # vector DB chunk IDs
metadata: Dict = field(default_factory=dict)


class DocumentChangeDetector:
"""
Detects changes in source documents by comparing content hashes.
Maintains a state file of known document states.

Supports:
- New documents (added to source, not yet indexed)
- Modified documents (content changed, need re-indexing)
- Deleted documents (removed from source, must be deleted from index)
"""

def __init__(self, state_path: str = "rag_document_state.json"):
self.state_path = state_path
self._records: Dict[str, DocumentRecord] = {}
self._load_state()

def _load_state(self):
"""Load tracked document states from persistent storage."""
try:
with open(self.state_path) as f:
data = json.load(f)
self._records = {
doc_id: DocumentRecord(**record)
for doc_id, record in data.items()
}
except FileNotFoundError:
self._records = {}

def _save_state(self):
"""Persist document states."""
with open(self.state_path, "w") as f:
json.dump(
{doc_id: record.__dict__ for doc_id, record in self._records.items()},
f, indent=2
)

def compute_content_hash(self, content: bytes) -> str:
"""SHA256 hash of document content."""
return hashlib.sha256(content).hexdigest()

def scan_directory(self, source_dir: str, extensions: List[str] = None) -> dict:
"""
Scan source directory for document changes.

Returns:
{
"new": [doc_id, ...], # not yet indexed
"modified": [doc_id, ...], # content changed
"deleted": [doc_id, ...], # removed from source
"unchanged": [doc_id, ...], # no changes
}
"""
extensions = extensions or [".pdf", ".md", ".txt", ".docx"]
source_path = Path(source_dir)

current_docs = {}
for file_path in source_path.rglob("*"):
if file_path.is_file() and file_path.suffix.lower() in extensions:
doc_id = str(file_path.relative_to(source_path))
try:
with open(file_path, "rb") as f:
content = f.read()
content_hash = self.compute_content_hash(content)
last_modified = datetime.fromtimestamp(
file_path.stat().st_mtime
).isoformat()
current_docs[doc_id] = {
"path": str(file_path),
"hash": content_hash,
"modified": last_modified,
}
except (PermissionError, OSError):
continue

tracked_doc_ids = set(self._records.keys())
current_doc_ids = set(current_docs.keys())

changes = {
"new": [],
"modified": [],
"deleted": [],
"unchanged": [],
}

for doc_id, info in current_docs.items():
if doc_id not in tracked_doc_ids:
changes["new"].append(doc_id)
elif self._records[doc_id].content_hash != info["hash"]:
changes["modified"].append(doc_id)
else:
changes["unchanged"].append(doc_id)

for doc_id in tracked_doc_ids:
if doc_id not in current_doc_ids:
changes["deleted"].append(doc_id)

return changes, current_docs

def mark_indexed(
self,
doc_id: str,
source_path: str,
content_hash: str,
last_modified: str,
chunk_ids: List[str],
metadata: Dict = None
):
"""Record that a document has been successfully indexed."""
self._records[doc_id] = DocumentRecord(
doc_id=doc_id,
source_path=source_path,
content_hash=content_hash,
last_modified=last_modified,
indexed_at=datetime.now().isoformat(),
chunk_ids=chunk_ids,
metadata=metadata or {}
)
self._save_state()

def get_chunk_ids_for_doc(self, doc_id: str) -> List[str]:
"""Get vector DB chunk IDs for a document (needed to delete it)."""
if doc_id in self._records:
return self._records[doc_id].chunk_ids
return []

def mark_deleted(self, doc_id: str):
"""Remove document record after deletion from vector DB."""
if doc_id in self._records:
del self._records[doc_id]
self._save_state()

Chunking Strategy: The Hidden Quality Lever

The chunking strategy - how you split documents into pieces for embedding - has an outsized impact on retrieval quality that is often underestimated.

from typing import List, Tuple
import re

class ChunkingStrategy:
"""
Different chunking strategies for different document types.

The right chunk size depends on:
- Document structure (headers, sections, tables)
- Query type (short factual queries vs long analytical queries)
- LLM context window (how many chunks can fit in the context)
- Embedding model (optimal input length for your embedding model)
"""

@staticmethod
def recursive_character_split(
text: str,
chunk_size: int = 512,
chunk_overlap: int = 50,
separators: List[str] = None
) -> List[str]:
"""
LangChain-style recursive character splitting.
Tries to split on semantic boundaries (paragraphs, sentences, words)
before falling back to raw character splitting.
"""
separators = separators or ["\n\n", "\n", ". ", " ", ""]

def split_on_separator(text: str, separator: str) -> List[str]:
if separator == "":
return list(text)
return text.split(separator)

def merge_splits(splits: List[str], separator: str) -> List[str]:
chunks = []
current_chunk = ""

for split in splits:
if len(current_chunk) + len(split) + len(separator) <= chunk_size:
if current_chunk:
current_chunk += separator + split
else:
current_chunk = split
else:
if current_chunk:
chunks.append(current_chunk)
# Handle overlap: keep the end of the previous chunk
if chunk_overlap > 0 and len(current_chunk) > chunk_overlap:
overlap_text = current_chunk[-chunk_overlap:]
current_chunk = overlap_text + separator + split
else:
current_chunk = split

if current_chunk:
chunks.append(current_chunk)

return chunks

if len(text) <= chunk_size:
return [text]

for separator in separators:
splits = split_on_separator(text, separator)
if len(splits) > 1:
good_splits = []
bad_splits = []
for s in splits:
if len(s) <= chunk_size:
good_splits.append(s)
else:
bad_splits.extend(ChunkingStrategy.recursive_character_split(
s, chunk_size, chunk_overlap, separators[separators.index(separator)+1:]
))
return merge_splits(good_splits + bad_splits, separator)

return [text]

@staticmethod
def semantic_chunk(
text: str,
embedding_fn: Callable,
similarity_threshold: float = 0.85,
max_chunk_size: int = 1000,
) -> List[str]:
"""
Split text into semantically coherent chunks by detecting topic shifts.
Uses embedding similarity between consecutive sentences to find boundaries.
"""
# Split into sentences
sentences = re.split(r'(?<=[.!?])\s+', text)
if len(sentences) <= 1:
return [text]

# Embed all sentences
embeddings = embedding_fn(sentences)

# Find semantic break points
import numpy as np

def cosine_sim(a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

chunks = []
current_chunk_sentences = [sentences[0]]

for i in range(1, len(sentences)):
sim = cosine_sim(embeddings[i-1], embeddings[i])

# Also check length constraint
current_text = " ".join(current_chunk_sentences)
would_exceed_max = len(current_text) + len(sentences[i]) > max_chunk_size

if sim < similarity_threshold or would_exceed_max:
# Semantic break - save current chunk and start new one
chunks.append(" ".join(current_chunk_sentences))
current_chunk_sentences = [sentences[i]]
else:
current_chunk_sentences.append(sentences[i])

if current_chunk_sentences:
chunks.append(" ".join(current_chunk_sentences))

return chunks

@staticmethod
def parent_child_chunk(
document: str,
parent_chunk_size: int = 2000,
child_chunk_size: int = 200,
) -> List[Tuple[str, str, List[str]]]:
"""
Parent-child chunking: embed small chunks for retrieval,
return large chunks for context.

Returns: [(parent_id, parent_text, [child_chunks]), ...]
The vector DB stores child_chunks with a pointer to parent_id.
At query time: retrieve by child similarity, return parent context.
"""
parent_chunks = ChunkingStrategy.recursive_character_split(
document, parent_chunk_size, 100
)

result = []
for i, parent in enumerate(parent_chunks):
parent_id = f"parent_{i:04d}"
children = ChunkingStrategy.recursive_character_split(
parent, child_chunk_size, 20
)
result.append((parent_id, parent, children))

return result


# Compare chunking strategies on a sample document
sample_policy = """
# Parental Leave Policy

Effective January 1, 2024, all full-time employees are eligible for parental leave.

## Primary Caregiver
Employees designated as the primary caregiver are eligible for 16 weeks of paid parental leave following the birth, adoption, or foster placement of a child. Leave may be taken continuously or intermittently within 12 months of the qualifying event.

## Secondary Caregiver
Employees designated as the secondary caregiver are eligible for 4 weeks of paid parental leave. This leave must be taken within 6 months of the qualifying event.

## Eligibility Requirements
To be eligible for paid parental leave, employees must:
- Have completed 90 days of employment at the time of the qualifying event
- Be employed in a full-time position (minimum 30 hours per week)
- Notify HR at least 30 days before the anticipated start of leave when possible

## How to Apply
Complete the Parental Leave Request form in the HR portal at least 30 days before your anticipated leave start date. The form requires documentation of the qualifying event (birth certificate, adoption papers, or foster placement letter).

Contact HR at [email protected] or extension 1234 for questions.
"""

chunks_512 = ChunkingStrategy.recursive_character_split(sample_policy, chunk_size=512, chunk_overlap=50)
chunks_256 = ChunkingStrategy.recursive_character_split(sample_policy, chunk_size=256, chunk_overlap=25)

print(f"Original document: {len(sample_policy)} characters")
print(f"Chunks at 512 chars: {len(chunks_512)} chunks")
print(f"Chunks at 256 chars: {len(chunks_256)} chunks")
print(f"\nFirst chunk (512):\n{chunks_512[0][:300]}...")

Evaluating Retrieval Quality

Before you can evaluate generation quality, you need to know whether retrieval is working. Poor retrieval quality is the root cause of most RAG failures.

import numpy as np
from typing import Callable

class RetrievalQualityEvaluator:
"""
Measure retrieval quality independently from generation quality.
This is critical for debugging: when RAG fails, is it retrieval or generation?
"""

def __init__(self, retriever_fn: Callable[[str], List[dict]]):
"""
Args:
retriever_fn: function that takes a query string and returns
list of retrieved chunks with metadata
"""
self.retrieve = retriever_fn

def hit_rate_at_k(
self,
queries: List[str],
relevant_doc_ids: List[List[str]], # ground truth relevant doc IDs per query
k: int = 5
) -> float:
"""
Hit rate @ k: fraction of queries where at least one relevant
document is in the top-k retrieved results.

This is the most important retrieval metric for RAG:
if the answer is not in the context, the LLM cannot generate it correctly.
"""
hits = 0
for query, relevant_ids in zip(queries, relevant_doc_ids):
retrieved = self.retrieve(query)[:k]
retrieved_ids = {chunk.get("doc_id") for chunk in retrieved}
if retrieved_ids & set(relevant_ids):
hits += 1

return hits / len(queries)

def mean_reciprocal_rank(
self,
queries: List[str],
relevant_doc_ids: List[List[str]],
) -> float:
"""
MRR: mean of 1/rank of first relevant document.
Higher is better. 1.0 means relevant doc is always at position 1.
"""
reciprocal_ranks = []
for query, relevant_ids in zip(queries, relevant_doc_ids):
retrieved = self.retrieve(query)
retrieved_ids = [chunk.get("doc_id") for chunk in retrieved]

rr = 0.0
for rank, doc_id in enumerate(retrieved_ids, 1):
if doc_id in relevant_ids:
rr = 1.0 / rank
break
reciprocal_ranks.append(rr)

return np.mean(reciprocal_ranks)

def context_relevance(
self,
query: str,
retrieved_chunks: List[str],
judge_fn: Callable[[str, str], float]
) -> float:
"""
For each retrieved chunk, score how relevant it is to the query.
Uses LLM judge. Returns mean relevance score (0-1).
"""
if not retrieved_chunks:
return 0.0

scores = [judge_fn(query, chunk) for chunk in retrieved_chunks]
return np.mean(scores)

def evaluate_full_suite(
self,
eval_queries: List[dict], # [{"query": str, "relevant_doc_ids": [str, ...]}]
) -> dict:
"""Run all retrieval metrics and return summary."""
queries = [ex["query"] for ex in eval_queries]
relevant = [ex["relevant_doc_ids"] for ex in eval_queries]

hit_1 = self.hit_rate_at_k(queries, relevant, k=1)
hit_3 = self.hit_rate_at_k(queries, relevant, k=3)
hit_5 = self.hit_rate_at_k(queries, relevant, k=5)
mrr = self.mean_reciprocal_rank(queries, relevant)

return {
"hit_rate@1": hit_1,
"hit_rate@3": hit_3,
"hit_rate@5": hit_5,
"mrr": mrr,
"n_queries": len(queries),
}

Monitoring Embedding Drift

The embedding model is a hidden dependency. When you upgrade it (or the provider changes the model behavior), the embeddings in your vector database become incompatible with new query embeddings - queries and documents are now in different embedding spaces.

import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class EmbeddingDriftDetector:
"""
Detect when a new embedding model produces significantly different
embeddings than the model used to build the current index.

If drift is detected, a full re-indexing is required before deploying
the new embedding model.
"""

def __init__(self, reference_embedding_fn: Callable, candidate_embedding_fn: Callable):
self.reference_fn = reference_embedding_fn
self.candidate_fn = candidate_embedding_fn

def measure_drift(
self,
sample_texts: List[str],
n_samples: int = 200
) -> dict:
"""
Measure embedding drift between reference and candidate models.

Key metrics:
- Alignment (CKA): how similar are the embedding spaces?
- Retrieval consistency: do the two models agree on which docs are similar?
- Nearest neighbor overlap: for each text, do the two models agree on its
nearest neighbors?
"""
texts = sample_texts[:n_samples]

# Embed with both models
ref_embeddings = np.array(self.reference_fn(texts))
cand_embeddings = np.array(self.candidate_fn(texts))

# Normalize
ref_norm = ref_embeddings / np.linalg.norm(ref_embeddings, axis=1, keepdims=True)
cand_norm = cand_embeddings / np.linalg.norm(cand_embeddings, axis=1, keepdims=True)

# Compute similarity matrices
ref_sim_matrix = cosine_similarity(ref_norm)
cand_sim_matrix = cosine_similarity(cand_norm)

# Centered Kernel Alignment (CKA): measures structural similarity of embedding spaces
def cka(K, L):
def center(M):
n = M.shape[0]
H = np.eye(n) - np.ones((n, n)) / n
return H @ M @ H

Kc = center(K)
Lc = center(L)
return np.sum(Kc * Lc) / (np.linalg.norm(Kc, 'fro') * np.linalg.norm(Lc, 'fro'))

cka_score = cka(ref_sim_matrix, cand_sim_matrix)

# Nearest-neighbor rank correlation: for each text, do both models rank other texts similarly?
rank_correlations = []
for i in range(min(50, len(texts))):
ref_ranks = np.argsort(ref_sim_matrix[i])[::-1]
cand_ranks = np.argsort(cand_sim_matrix[i])[::-1]

# Top-10 overlap
top10_overlap = len(set(ref_ranks[:10]) & set(cand_ranks[:10])) / 10
rank_correlations.append(top10_overlap)

nn_consistency = np.mean(rank_correlations)

# Absolute cosine similarity difference (how different are pairwise similarities?)
sim_diff = np.abs(ref_sim_matrix - cand_sim_matrix)
mean_sim_diff = np.mean(sim_diff)

drift_level = "low" if cka_score > 0.90 else ("medium" if cka_score > 0.75 else "high")

return {
"cka_alignment": cka_score,
"nn_consistency_top10": nn_consistency,
"mean_similarity_difference": mean_sim_diff,
"drift_level": drift_level,
"requires_reindexing": drift_level == "high",
"recommendation": (
"Safe to deploy new embedding model, no re-indexing needed"
if drift_level == "low" else
"Monitor carefully - some retrieval degradation expected"
if drift_level == "medium" else
"MUST re-index before deploying new embedding model"
)
}

Index Refresh Strategies

Different document types and update frequencies require different refresh strategies:

from enum import Enum

class RefreshStrategy(Enum):
FULL_REBUILD = "full_rebuild" # Re-index everything from scratch
INCREMENTAL_UPDATE = "incremental" # Only re-index changed documents
APPEND_ONLY = "append_only" # Only add new documents, never delete
SCHEDULED_NIGHTLY = "nightly" # Batch all changes, apply nightly


class IndexRefreshOrchestrator:
"""
Orchestrates index refresh based on the appropriate strategy
for each document collection.
"""

def __init__(
self,
change_detector: DocumentChangeDetector,
vector_db_client,
embedding_fn: Callable,
chunker: ChunkingStrategy
):
self.detector = change_detector
self.vector_db = vector_db_client
self.embed = embedding_fn
self.chunk = chunker

def refresh_incremental(self, source_dir: str) -> dict:
"""
Incremental refresh: only process changed documents.
Most efficient for large stable knowledge bases with frequent small updates.
"""
changes, current_docs = self.detector.scan_directory(source_dir)
stats = {"processed": 0, "errors": 0, "deleted": 0}

# 1. Delete removed documents from vector DB
for doc_id in changes["deleted"]:
chunk_ids = self.detector.get_chunk_ids_for_doc(doc_id)
if chunk_ids:
try:
self.vector_db.delete(ids=chunk_ids)
self.detector.mark_deleted(doc_id)
stats["deleted"] += 1
except Exception as e:
print(f"Error deleting {doc_id}: {e}")
stats["errors"] += 1

# 2. Delete old chunks for modified documents, then re-index
for doc_id in changes["modified"]:
old_chunk_ids = self.detector.get_chunk_ids_for_doc(doc_id)
if old_chunk_ids:
self.vector_db.delete(ids=old_chunk_ids)

# 3. Index new and modified documents
docs_to_index = changes["new"] + changes["modified"]
for doc_id in docs_to_index:
doc_info = current_docs[doc_id]
try:
chunk_ids = self._index_document(doc_id, doc_info["path"])
self.detector.mark_indexed(
doc_id=doc_id,
source_path=doc_info["path"],
content_hash=doc_info["hash"],
last_modified=doc_info["modified"],
chunk_ids=chunk_ids,
)
stats["processed"] += 1
except Exception as e:
print(f"Error indexing {doc_id}: {e}")
stats["errors"] += 1

return {
**stats,
"new": len(changes["new"]),
"modified": len(changes["modified"]),
"deleted": len(changes["deleted"]),
"unchanged": len(changes["unchanged"]),
}

def _index_document(self, doc_id: str, file_path: str) -> List[str]:
"""
Process a single document: extract text, chunk, embed, store.
Returns list of chunk IDs stored in vector DB.
"""
# Extract text (simplified - production uses UnstructuredIO or similar)
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
text = f.read()

# Chunk the document
chunks = ChunkingStrategy.recursive_character_split(text, chunk_size=512, chunk_overlap=50)

# Embed chunks
embeddings = self.embed(chunks)

# Store in vector DB with metadata
chunk_ids = []
for i, (chunk_text, embedding) in enumerate(zip(chunks, embeddings)):
chunk_id = f"{doc_id}_chunk_{i:04d}"
self.vector_db.upsert(
id=chunk_id,
embedding=embedding,
metadata={
"doc_id": doc_id,
"chunk_index": i,
"text": chunk_text,
"source_path": file_path,
"indexed_at": datetime.now().isoformat(),
}
)
chunk_ids.append(chunk_id)

return chunk_ids

Production Engineering Notes

Index freshness SLA: Define what "fresh" means for your use case. A customer support knowledge base may require updates within 1 hour of document changes. A research knowledge base may be acceptable with weekly refreshes. Document your freshness SLA and monitor against it: alert when the gap between source document modification time and index update time exceeds 2x your SLA.

Metadata filtering: Always store rich metadata with each chunk: document ID, creation date, document type, department, access level. This enables metadata-filtered retrieval - "find policies from HR department updated after January 2024" - which dramatically improves precision for well-structured knowledge bases.

Hybrid search: Pure vector search misses exact matches (product names, error codes, IDs). Pure keyword search misses semantic similarity. Combine both with reciprocal rank fusion (RRF): independently rank by vector similarity and BM25 keyword relevance, then merge ranked lists. This consistently outperforms either alone by 5–15% on hit rate metrics.

Embedding model pinning: Pin the exact version of your embedding model and do not update it without re-indexing. text-embedding-3-small version 1 and version 2 may produce incompatible embedding spaces. Include the embedding model version as metadata on every chunk so you can identify which chunks need re-indexing after an upgrade.


Common Mistakes

:::danger Deleting and Re-Indexing Documents While Serving Queries If you delete old chunks and add new chunks for a modified document while the system is serving queries, there is a window where the document is completely absent from the index. Users querying during this window get zero retrieval for that document's content. Use a two-phase approach: add new chunks first (with a new version tag), verify them, then delete old chunks. Or maintain a blue-green index where you build the new index in the background and atomically swap to it. :::

:::danger Not Handling Document Deletions "We only ever add documents, we never delete them." This is never true. Products get discontinued, policies get superseded, employees leave and their documents become outdated. If you do not track and propagate document deletions, your index accumulates stale content indefinitely. Users get confident answers about discontinued products and superseded policies. Always track the mapping from source documents to vector DB chunk IDs so you can delete cleanly. :::

:::warning Chunking Without Overlap A chunk boundary that falls in the middle of a key sentence means the embedding of one chunk does not capture the full semantic context of that sentence. A query about that concept may not retrieve the chunk, because the concept is split across a boundary. Chunk overlap (50–100 characters) ensures every sentence's context is captured in at least one chunk's embedding. The cost is slightly larger index size - worth it for retrieval quality. :::

:::warning Not Testing Retrieval Quality Separately from Generation Quality When a RAG system gives a wrong answer, the failure could be in retrieval (wrong documents retrieved) or generation (correct documents retrieved, but LLM misreads them). If you only evaluate end-to-end accuracy, you cannot distinguish these failure modes and cannot fix them. Always measure hit rate and MRR on your retrieval component independently using a labeled query-document relevance dataset. This is the most actionable diagnostic in RAG debugging. :::


Interview Q&A

Q: What operational problems are specific to RAG pipelines that do not apply to traditional ML?

A: Several. First, index staleness: RAG depends on a vector index of source documents. When source documents change (policy updates, product changes, new information), the index must be updated. Traditional models do not have this problem - the knowledge is baked into weights, which change only when the model is retrained. Second, chunk boundary issues: the way documents are chunked affects retrieval quality in ways that are hard to anticipate. A chunk that splits a key sentence across a boundary may never be retrieved for relevant queries. Chunking strategy is a hyperparameter that requires empirical evaluation. Third, embedding model upgrades: when you upgrade your embedding model, query and document embeddings may be in incompatible spaces. You must re-index the entire knowledge base before deploying a new embedding model. Fourth, retrieval evaluation: measuring whether retrieval is working requires a labeled query-document relevance dataset, which is expensive to curate. Teams often skip this and only measure end-to-end quality, making retrieval failures invisible.

Q: How do you detect and handle index drift in a RAG system?

A: Index drift happens when the vector index no longer accurately reflects the source documents. Two types: content drift (documents changed but index not updated) and coverage drift (documents deleted from source but chunks remain in index). Detection: hash-based change detection - compute a hash of each source document, compare to the hash stored when it was last indexed. Any mismatch indicates content drift. For coverage drift: periodically scan the index for chunk IDs that reference documents no longer in the source. Handling: use an incremental update strategy that (1) detects new and modified documents on every scan, (2) deletes old chunks before adding new ones (or uses versioning to avoid query gaps), (3) deletes chunks for removed documents. Alert when source-to-index lag exceeds your freshness SLA. Log a "staleness score" per document (hours since source modification vs hours since last index update) and monitor it as an operational metric.

Q: Why is hybrid search (vector + keyword) better than pure vector search for RAG?

A: Pure vector search excels at semantic similarity: finding documents that are about the same concept even if they use different words. But it fails for exact matches - product codes like "SKU-4892", error messages, proper nouns, and technical identifiers. A user querying "error code E-2041" wants exact keyword matching, not semantic approximation. Pure keyword search (BM25) excels at exact matching but fails for semantic queries where vocabulary differs between the query and the document. Hybrid search combines both: independently rank by vector similarity and BM25 keyword relevance, then merge using reciprocal rank fusion (each document's combined score is 1/rank_vector + 1/rank_keyword). This consistently outperforms either method alone by 5–15% on hit rate, because it handles both the semantic and exact-match use cases. The cost is running two search pipelines and a merge step, which adds roughly 20–30ms latency versus pure vector search.

Q: What is parent-child chunking and when should you use it?

A: Parent-child chunking creates two levels of chunks: small child chunks for retrieval and large parent chunks for context injection. At indexing time, embed the small child chunks (e.g., 200 characters) and store them with a pointer to their parent chunk (e.g., 1500 characters). At query time: find relevant child chunks by vector similarity, then return the parent chunk as context. The intuition: smaller chunks produce more precise embeddings (a 200-character chunk is about exactly one topic, so its embedding is a better semantic representation). Larger context windows give the LLM more surrounding information to generate an accurate response. Use parent-child chunking when: your documents have a clear hierarchical structure (paragraphs within sections), precise retrieval is important (factual knowledge bases), and context length is not severely constrained. It is more complex to implement than flat chunking but consistently improves both retrieval precision and generation quality for structured documents.

© 2026 EngineersOfAI. All rights reserved.