Episodic Memory with Vector Store
The Returning User Problem
The agent helps a user. They spend forty minutes together. The user says: "I'm Sarah, startup CTO, Python only, hate verbose responses. We're building a fintech API using FastAPI and PostgreSQL." They debug a JWT validation bug. They agree on a code style. They settle on a deployment approach.
Next session, the agent greets them: "Hello! How can I help you today?"
Everything is gone. The agent does not know Sarah is a CTO. Does not know she hates verbose responses. Does not remember the JWT bug or that they agreed to use Pydantic V2 for validation. Sarah has to re-explain everything. This happens every session.
This is not a model limitation. It is an architecture failure. The information was available - it just was not stored anywhere that survives session boundaries.
Episodic memory solves this. Not by making the context window larger, but by giving the agent a persistent store of past experiences that it can retrieve selectively. When Sarah returns, the agent queries its episodic store, finds the memories of past interactions with Sarah, and injects them into context. The conversation picks up where it left off.
This lesson covers episodic memory completely: the memory schema, when to create memories, how to retrieve them with relevance and recency weighting, how to consolidate duplicates, and how to implement forgetting. You will build a complete episodic memory system backed by ChromaDB.
:::tip 🎮 Interactive Playground Visualize this concept: Try the Episodic Memory with Vector Store demo on the EngineersOfAI Playground - no code required. :::
Why Episodic Memory Exists
The concept comes from cognitive psychology. Endel Tulving (1972) distinguished episodic memory - autobiographical, time-stamped, experiential ("I ate at that restaurant last Thursday") - from semantic memory - factual, decontextualized ("restaurants serve food").
Episodic memory is unique in that it:
- Is tied to a specific time, place, and context
- Is personally experienced by the rememberer
- Allows "mental time travel" - you can re-experience the event
For AI agents, episodic memory serves a different but analogous purpose: storing interaction records that are personalized, time-stamped, and retrievable for future reference.
Before episodic stores: Agents used long static system prompts to maintain user context. This worked for one user but did not scale. Adding user-specific context for thousands of users was impossible.
The MemGPT insight (2023): Packer et al. introduced the idea of using external memory storage for LLM agents, treating the context window as RAM and external storage as disk. Episodic memory is the "disk" for interaction history.
Production adoption: By 2024, every major agent framework (LangChain, LlamaIndex, CrewAI, AutoGen) had explicit episodic memory components backed by vector stores.
Memory Schema Design
A well-designed episodic memory schema is the foundation of a useful system. Include too little and retrieval is imprecise. Include too much and storage and retrieval become slow.
from dataclasses import dataclass, field
from typing import Optional
import time
import uuid
@dataclass
class EpisodicMemoryRecord:
"""
Schema for a single episodic memory.
Every field serves a specific retrieval or management purpose.
"""
# Identity
id: str = field(default_factory=lambda: str(uuid.uuid4()))
# Core content - what was stored
content: str = "" # The memory text (what to store)
embedding: Optional[list[float]] = None # Vector representation for similarity search
# Temporal metadata
timestamp: float = field(default_factory=time.time) # Unix timestamp of creation
last_accessed: float = field(default_factory=time.time)
access_count: int = 0
# Importance and quality
importance: float = 0.5 # 0.0 (trivial) to 1.0 (critical)
novelty: float = 0.5 # How different from existing memories
# Attribution
user_id: str = "default"
session_id: str = "default"
agent_id: str = "default"
# Classification
tags: list[str] = field(default_factory=list)
memory_type: str = "observation" # observation | preference | fact | error | decision
# Lifecycle
is_archived: bool = False # Soft delete - archived but not removed
superseded_by: Optional[str] = None # ID of newer memory that replaces this one
def age_hours(self) -> float:
return (time.time() - self.timestamp) / 3600
def age_days(self) -> float:
return self.age_hours() / 24
def to_metadata(self) -> dict:
"""Convert to ChromaDB-compatible metadata dict (no nested objects)."""
return {
"user_id": self.user_id,
"session_id": self.session_id,
"agent_id": self.agent_id,
"timestamp": self.timestamp,
"importance": self.importance,
"novelty": self.novelty,
"memory_type": self.memory_type,
"tags": ",".join(self.tags), # ChromaDB requires string values
"is_archived": str(self.is_archived),
"access_count": self.access_count,
}
The Episodic Memory Lifecycle
When to Create Memories
Not everything that happens in an agent interaction deserves to be stored. Indiscriminate storage creates noise that degrades retrieval quality. Use a combination of rules and scoring.
Rule-Based Triggers
class MemoryFormationTrigger:
"""Determines whether an observation warrants episodic storage."""
# Patterns that strongly indicate storage-worthy content
HIGH_VALUE_PATTERNS = [
"i prefer", "i always", "i never", "i hate", "i love",
"we decided", "the plan is", "going forward",
"my name is", "i am a", "i work at", "i'm the",
"remember that", "important:", "don't forget",
"error:", "failed", "bug:", "issue:",
]
# Low-value noise to avoid storing
LOW_VALUE_PATTERNS = [
"thank you", "thanks", "you're welcome", "ok",
"sounds good", "got it", "sure", "yes", "no",
"alright", "great", "perfect", "awesome",
]
def __init__(self, importance_threshold: float = 0.4):
self.importance_threshold = importance_threshold
def should_store(self, content: str, importance: float) -> tuple[bool, str]:
"""
Returns (should_store, reason).
"""
content_lower = content.lower()
# Always store explicit user instructions
if any(p in content_lower for p in ["remember", "don't forget", "important:"]):
return True, "explicit_instruction"
# Never store pure acknowledgments
words = content_lower.split()
if len(words) <= 5 and any(p in content_lower for p in self.LOW_VALUE_PATTERNS):
return False, "low_value_acknowledgment"
# Score-based decision
base_score = importance
# Boost for high-value patterns
pattern_hits = sum(1 for p in self.HIGH_VALUE_PATTERNS if p in content_lower)
base_score += pattern_hits * 0.1
# Boost for substantive content (length proxy)
if len(words) > 30:
base_score += 0.1
if base_score >= self.importance_threshold:
return True, f"score_{base_score:.2f}"
return False, f"below_threshold_{base_score:.2f}"
LLM-Based Importance Scoring
For higher accuracy, use a cheap LLM to score importance:
import anthropic
class LLMImportanceScorer:
"""Use a fast model to score memory importance."""
SCORING_PROMPT = """Rate the importance of this information for an AI assistant to remember about a user.
Score from 0.0 to 1.0:
- 0.0-0.2: Trivial (greetings, filler words, obvious facts)
- 0.2-0.4: Mildly useful (temporary preferences, passing comments)
- 0.4-0.6: Useful (user preferences, project context, decisions)
- 0.6-0.8: Important (user identity, key requirements, errors found)
- 0.8-1.0: Critical (explicit instructions, hard constraints, major failures)
Respond with ONLY a number between 0.0 and 1.0. Nothing else."""
def __init__(self):
self.client = anthropic.Anthropic()
def score(self, content: str) -> float:
try:
response = self.client.messages.create(
model="claude-haiku-4-5",
max_tokens=10,
system=self.SCORING_PROMPT,
messages=[{"role": "user", "content": content}],
)
return float(response.content[0].text.strip())
except (ValueError, IndexError):
return 0.5 # Default if scoring fails
Retrieval: Relevance + Recency Weighting
Pure semantic similarity is insufficient for episodic retrieval. A memory from two years ago that is semantically similar may be less useful than a moderately-similar memory from last week. Good retrieval combines:
- Semantic similarity: How related is the memory to the current query?
- Recency: How recent is the memory?
- Importance: How important was this memory when stored?
- Access frequency: Memories accessed often are probably useful
Composite scoring formula:
Where:
- (semantic similarity weight)
- (recency weight)
- (importance weight)
- (exponential decay)
import math
def compute_retrieval_score(
similarity: float, # Cosine similarity from vector search (0-1)
age_hours: float, # Age of the memory in hours
importance: float, # Stored importance score (0-1)
access_count: int = 0, # How many times retrieved before
w_similarity: float = 0.50,
w_recency: float = 0.30,
w_importance: float = 0.20,
) -> float:
"""
Composite retrieval score for episodic memory ranking.
"""
# Exponential recency decay: half-life of ~14 hours
recency = 1.0 / (1.0 + age_hours * 0.05)
# Mild boost for frequently-accessed memories (they've proven useful)
frequency_boost = math.log1p(access_count) * 0.05
score = (
similarity * w_similarity
+ recency * w_recency
+ importance * w_importance
+ frequency_boost
)
return min(score, 1.0) # Cap at 1.0
Full Implementation: ChromaDB Episodic Memory System
"""
Production-grade episodic memory system using ChromaDB.
Install: pip install chromadb anthropic
"""
from __future__ import annotations
import json
import math
import time
import uuid
from dataclasses import dataclass, field, asdict
from typing import Optional
import chromadb
from chromadb.utils import embedding_functions
import anthropic
# ─────────────────────────────────────────────
# CONFIGURATION
# ─────────────────────────────────────────────
EMBEDDING_MODEL = "text-embedding-3-small" # OpenAI embedding model
CHROMA_COLLECTION = "episodic_memories"
IMPORTANCE_THRESHOLD = 0.35 # Minimum importance to store
TOP_K_DEFAULT = 5 # Default memories to retrieve
DECAY_DAYS_HALF_LIFE = 30 # Importance half-life in days
# ─────────────────────────────────────────────
# MEMORY SCHEMA
# ─────────────────────────────────────────────
@dataclass
class MemoryRecord:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
content: str = ""
timestamp: float = field(default_factory=time.time)
last_accessed: float = field(default_factory=time.time)
access_count: int = 0
importance: float = 0.5
user_id: str = "default"
session_id: str = "default"
memory_type: str = "observation" # observation | preference | decision | error
tags: list[str] = field(default_factory=list)
is_archived: bool = False
def age_hours(self) -> float:
return (time.time() - self.timestamp) / 3600
def to_metadata(self) -> dict:
"""ChromaDB metadata - all values must be str, int, float, or bool."""
return {
"user_id": self.user_id,
"session_id": self.session_id,
"timestamp": self.timestamp,
"last_accessed": self.last_accessed,
"access_count": self.access_count,
"importance": self.importance,
"memory_type": self.memory_type,
"tags": json.dumps(self.tags), # serialize list as JSON string
"is_archived": self.is_archived,
}
@classmethod
def from_metadata(cls, id: str, content: str, metadata: dict) -> MemoryRecord:
tags = json.loads(metadata.get("tags", "[]"))
return cls(
id=id,
content=content,
timestamp=float(metadata.get("timestamp", time.time())),
last_accessed=float(metadata.get("last_accessed", time.time())),
access_count=int(metadata.get("access_count", 0)),
importance=float(metadata.get("importance", 0.5)),
user_id=str(metadata.get("user_id", "default")),
session_id=str(metadata.get("session_id", "default")),
memory_type=str(metadata.get("memory_type", "observation")),
tags=tags,
is_archived=bool(metadata.get("is_archived", False)),
)
# ─────────────────────────────────────────────
# EPISODIC MEMORY SYSTEM
# ─────────────────────────────────────────────
class EpisodicMemorySystem:
"""
Persistent episodic memory using ChromaDB for vector storage.
Features:
- Semantic similarity retrieval with recency + importance weighting
- Importance-based storage filtering
- Memory consolidation (deduplication)
- Time-based importance decay
- Soft deletion (archiving)
- Per-user memory isolation
"""
def __init__(
self,
persist_dir: str = "./episodic_memory_db",
use_openai_embeddings: bool = False,
):
# Initialize ChromaDB with persistence
self.chroma_client = chromadb.PersistentClient(path=persist_dir)
# Embedding function - defaults to ChromaDB's built-in (no API key needed)
# Set use_openai_embeddings=True for better quality in production
if use_openai_embeddings:
ef = embedding_functions.OpenAIEmbeddingFunction(
model_name="text-embedding-3-small"
)
else:
ef = embedding_functions.DefaultEmbeddingFunction()
self.collection = self.chroma_client.get_or_create_collection(
name=CHROMA_COLLECTION,
embedding_function=ef,
metadata={"hnsw:space": "cosine"},
)
self.anthropic_client = anthropic.Anthropic()
self.trigger = MemoryFormationTrigger()
# ─── STORAGE ────────────────────────────────────────────────
def store(
self,
content: str,
user_id: str,
session_id: str,
importance: float = 0.5,
memory_type: str = "observation",
tags: list[str] | None = None,
force: bool = False,
) -> Optional[MemoryRecord]:
"""
Store a new episodic memory.
Returns the stored MemoryRecord or None if below threshold.
"""
if not force:
should_store, reason = self.trigger.should_store(content, importance)
if not should_store:
return None
record = MemoryRecord(
content=content,
timestamp=time.time(),
importance=importance,
user_id=user_id,
session_id=session_id,
memory_type=memory_type,
tags=tags or [],
)
# Check for similar existing memories before storing
existing = self._find_similar(content, user_id, threshold=0.92)
if existing:
# Consolidate: update existing memory instead of creating duplicate
self._consolidate(existing, record)
return existing
# Store new memory in ChromaDB
self.collection.add(
ids=[record.id],
documents=[record.content],
metadatas=[record.to_metadata()],
)
return record
def _find_similar(
self,
content: str,
user_id: str,
threshold: float = 0.90,
) -> Optional[MemoryRecord]:
"""Find existing memory with very high similarity to the new content."""
try:
results = self.collection.query(
query_texts=[content],
n_results=1,
where={"user_id": user_id, "is_archived": False},
)
except Exception:
return None
if not results["ids"] or not results["ids"][0]:
return None
distance = results["distances"][0][0]
similarity = 1.0 - distance # ChromaDB cosine distance → similarity
if similarity >= threshold:
return MemoryRecord.from_metadata(
id=results["ids"][0][0],
content=results["documents"][0][0],
metadata=results["metadatas"][0][0],
)
return None
def _consolidate(self, existing: MemoryRecord, new: MemoryRecord) -> None:
"""
Merge a new memory into an existing similar one.
Update importance to the max of both, refresh timestamp.
"""
updated_importance = max(existing.importance, new.importance)
# Slightly boost importance of confirmed/repeated information
updated_importance = min(updated_importance + 0.05, 1.0)
self.collection.update(
ids=[existing.id],
metadatas=[{
**existing.to_metadata(),
"importance": updated_importance,
"last_accessed": time.time(),
"access_count": existing.access_count + 1,
}],
)
# ─── RETRIEVAL ───────────────────────────────────────────────
def retrieve(
self,
query: str,
user_id: str,
top_k: int = TOP_K_DEFAULT,
memory_types: list[str] | None = None,
min_importance: float = 0.0,
max_age_days: float | None = None,
) -> list[tuple[MemoryRecord, float]]:
"""
Retrieve relevant memories with composite scoring.
Returns list of (MemoryRecord, score) sorted by score descending.
"""
# Build ChromaDB where filter
where_filter: dict = {"user_id": user_id, "is_archived": False}
if min_importance > 0:
where_filter["importance"] = {"$gte": min_importance}
if memory_types:
where_filter["memory_type"] = {"$in": memory_types}
# Fetch more than top_k from vector DB; re-rank with composite score
fetch_k = min(top_k * 3, 30)
try:
results = self.collection.query(
query_texts=[query],
n_results=fetch_k,
where=where_filter,
include=["documents", "metadatas", "distances"],
)
except Exception as e:
print(f"[EpisodicStore] Retrieval error: {e}")
return []
if not results["ids"] or not results["ids"][0]:
return []
scored_memories = []
for i, mem_id in enumerate(results["ids"][0]):
similarity = 1.0 - results["distances"][0][i]
record = MemoryRecord.from_metadata(
id=mem_id,
content=results["documents"][0][i],
metadata=results["metadatas"][0][i],
)
# Apply age filter if specified
if max_age_days and record.age_hours() > max_age_days * 24:
continue
# Composite score: similarity + recency + importance
age_h = record.age_hours()
recency = 1.0 / (1.0 + age_h * 0.05)
freq_boost = math.log1p(record.access_count) * 0.05
composite = (
similarity * 0.50
+ recency * 0.30
+ record.importance * 0.20
+ freq_boost
)
scored_memories.append((record, composite))
# Sort by composite score
scored_memories.sort(key=lambda x: x[1], reverse=True)
# Update access metadata for returned memories
returned = scored_memories[:top_k]
for record, _ in returned:
self._update_access(record)
return returned
def _update_access(self, record: MemoryRecord) -> None:
"""Mark a memory as accessed (recency for frequent access boost)."""
try:
self.collection.update(
ids=[record.id],
metadatas=[{
**record.to_metadata(),
"last_accessed": time.time(),
"access_count": record.access_count + 1,
}],
)
except Exception:
pass # Non-critical update
# ─── CONSOLIDATION & FORGETTING ─────────────────────────────
def decay_old_memories(
self,
user_id: str,
half_life_days: float = DECAY_DAYS_HALF_LIFE,
) -> int:
"""
Apply time-based importance decay to old memories.
Memories with importance below 0.1 are archived.
Returns number of archived memories.
"""
try:
results = self.collection.get(
where={"user_id": user_id, "is_archived": False},
include=["metadatas", "documents"],
)
except Exception:
return 0
if not results["ids"]:
return 0
archived_count = 0
for i, mem_id in enumerate(results["ids"]):
metadata = results["metadatas"][i]
record = MemoryRecord.from_metadata(
id=mem_id,
content=results["documents"][i],
metadata=metadata,
)
# Compute decay: exponential with specified half-life
age_days = record.age_hours() / 24
decay_factor = math.pow(0.5, age_days / half_life_days)
new_importance = record.importance * decay_factor
if new_importance < 0.10:
# Archive (soft delete) - preserve for audit but exclude from retrieval
self.collection.update(
ids=[mem_id],
metadatas=[{**record.to_metadata(), "is_archived": True}],
)
archived_count += 1
elif abs(new_importance - record.importance) > 0.05:
# Update importance if decay is significant
self.collection.update(
ids=[mem_id],
metadatas=[{**record.to_metadata(), "importance": new_importance}],
)
return archived_count
def get_user_summary(self, user_id: str) -> dict:
"""Get memory statistics for a user."""
try:
all_results = self.collection.get(
where={"user_id": user_id},
include=["metadatas"],
)
total = len(all_results["ids"])
active = sum(
1 for m in all_results["metadatas"] if not m.get("is_archived", False)
)
types: dict[str, int] = {}
for m in all_results["metadatas"]:
if not m.get("is_archived", False):
t = m.get("memory_type", "unknown")
types[t] = types.get(t, 0) + 1
return {
"user_id": user_id,
"total_memories": total,
"active_memories": active,
"archived_memories": total - active,
"memory_types": types,
}
except Exception:
return {"user_id": user_id, "error": "Could not retrieve stats"}
def format_for_context(
self,
memories: list[tuple[MemoryRecord, float]],
max_chars: int = 2000,
) -> str:
"""
Format retrieved memories for injection into system prompt.
Respects character budget.
"""
if not memories:
return ""
lines = ["## Relevant Past Experiences"]
char_budget = max_chars - len(lines[0])
for record, score in memories:
age_desc = self._age_description(record.age_hours())
line = f"- [{age_desc}, importance:{record.importance:.1f}] {record.content}"
if char_budget - len(line) < 0:
break
lines.append(line)
char_budget -= len(line)
return "\n".join(lines)
@staticmethod
def _age_description(age_hours: float) -> str:
if age_hours < 1:
return "just now"
if age_hours < 24:
return f"{int(age_hours)}h ago"
days = int(age_hours / 24)
return f"{days}d ago"
# ─────────────────────────────────────────────
# AGENT WITH EPISODIC MEMORY
# ─────────────────────────────────────────────
class EpisodicAgent:
"""
Agent that builds episodic memory from every interaction.
"""
MODEL = "claude-opus-4-6"
def __init__(self, user_id: str):
self.user_id = user_id
self.session_id = str(uuid.uuid4())
self.client = anthropic.Anthropic()
self.memory = EpisodicMemorySystem(
persist_dir=f"./memory/{user_id}",
)
self.conversation: list[dict] = []
def respond(self, user_message: str) -> str:
# Retrieve relevant memories
relevant = self.memory.retrieve(
query=user_message,
user_id=self.user_id,
top_k=4,
)
memory_context = self.memory.format_for_context(relevant)
# Build system prompt with memory
system = (
"You are a personalized AI assistant with memory of past conversations.\n"
"Use the provided past experiences to give contextually-aware responses.\n"
"Reference specific past interactions when relevant - this builds trust.\n"
)
if memory_context:
system += f"\n{memory_context}"
# Add user message and call LLM
self.conversation.append({"role": "user", "content": user_message})
response = self.client.messages.create(
model=self.MODEL,
max_tokens=1024,
system=system,
messages=self.conversation,
)
assistant_text = response.content[0].text
self.conversation.append({"role": "assistant", "content": assistant_text})
# Extract and store memories from this interaction
self._extract_and_store_memories(user_message, assistant_text)
return assistant_text
def _extract_and_store_memories(
self,
user_message: str,
assistant_response: str,
) -> None:
"""
Use LLM to extract memorable facts from the interaction.
Store each as a separate episodic memory.
"""
extraction_prompt = f"""Extract memorable facts about this user from the conversation below.
Return a JSON list of objects with fields: content (string), importance (0.0-1.0), memory_type (observation|preference|decision|error), tags (list of strings).
Only extract information that would be useful in future conversations.
Return [] if nothing is worth storing.
User: {user_message}
Assistant: {assistant_response}
Return only valid JSON, nothing else."""
try:
result = self.client.messages.create(
model="claude-haiku-4-5",
max_tokens=400,
messages=[{"role": "user", "content": extraction_prompt}],
)
extracted = json.loads(result.content[0].text)
for item in extracted:
self.memory.store(
content=item["content"],
user_id=self.user_id,
session_id=self.session_id,
importance=item.get("importance", 0.5),
memory_type=item.get("memory_type", "observation"),
tags=item.get("tags", []),
)
except (json.JSONDecodeError, KeyError, Exception) as e:
# Non-critical - just store a simple summary if extraction fails
summary = f"User discussed: {user_message[:200]}"
self.memory.store(
content=summary,
user_id=self.user_id,
session_id=self.session_id,
importance=0.3,
)
def remember(self, content: str, importance: float = 0.7) -> None:
"""Explicitly store a memory about this user."""
self.memory.store(
content=content,
user_id=self.user_id,
session_id=self.session_id,
importance=importance,
memory_type="preference",
force=True,
)
# ─────────────────────────────────────────────
# DEMONSTRATION
# ─────────────────────────────────────────────
def demo():
print("=" * 60)
print("EPISODIC MEMORY SYSTEM - FULL DEMONSTRATION")
print("=" * 60)
# Session 1: First interaction with Sarah
print("\n=== SESSION 1: First interaction ===")
agent_s1 = EpisodicAgent(user_id="sarah_chen")
agent_s1.remember(
"Sarah Chen is CTO at Finvault, a B2B fintech startup. "
"Python-only developer. Hates verbose responses. "
"Uses macOS with zsh.",
importance=0.95,
)
agent_s1.remember(
"Sarah's team uses FastAPI + PostgreSQL + Redis stack.",
importance=0.80,
)
r1 = agent_s1.respond(
"Hi! I need to optimize our authentication service. "
"We're seeing JWT validation taking 200ms which is too slow."
)
print(f"Agent (Session 1): {r1[:300]}...")
# Simulate end of session 1 - apply memory decay maintenance
stats_s1 = agent_s1.memory.get_user_summary("sarah_chen")
print(f"\nSession 1 memory stats: {stats_s1}")
# Session 2: Sarah returns - agent should remember context
print("\n\n=== SESSION 2: Returning user ===")
agent_s2 = EpisodicAgent(user_id="sarah_chen") # New session, same user_id
r2 = agent_s2.respond(
"Hey, I implemented the JWT caching we talked about. "
"Now I need help with our Redis connection pooling."
)
print(f"Agent (Session 2): {r2[:300]}...")
# Check memory stats after both sessions
stats_s2 = agent_s2.memory.get_user_summary("sarah_chen")
print(f"\nSession 2 memory stats: {stats_s2}")
# Demonstrate memory decay
print("\n=== MEMORY DECAY DEMONSTRATION ===")
archived = agent_s2.memory.decay_old_memories("sarah_chen", half_life_days=0.001) # Very short for demo
print(f"Memories archived by decay: {archived}")
final_stats = agent_s2.memory.get_user_summary("sarah_chen")
print(f"Final memory stats: {final_stats}")
if __name__ == "__main__":
demo()
Tool Comparison: ChromaDB vs Pinecone vs pgvector
| Feature | ChromaDB | Pinecone | pgvector |
|---|---|---|---|
| Deployment | Local or Docker | Fully managed cloud | Self-hosted (Postgres extension) |
| Setup complexity | Low (pip install) | Medium (API key, index config) | Medium (Postgres + extension) |
| Scale | Millions of records | Billions of records | Depends on Postgres size |
| Cost | Free (self-hosted) | $0.096+/month per unit | Postgres hosting cost |
| Filtering | Metadata filters | Metadata filters | Full SQL + vector search |
| Production readiness | Good for single-server | Enterprise production | Production (existing Postgres) |
| Best for | Local dev, small agents | High-scale cloud agents | Agents already using Postgres |
Recommendation: ChromaDB for development and small deployments. pgvector if you are already running PostgreSQL - no new infrastructure. Pinecone for high-scale production with dedicated search infrastructure.
:::danger User Data Isolation
Episodic memory contains personal user information. A bug that allows user A to retrieve user B's memories is a serious privacy violation. Always pass user_id to both storage and retrieval operations and enforce it at the database layer via where filters. Never trust user-supplied user_id values - derive them from authenticated session tokens only.
:::
:::warning Memory Extraction Quality LLM-based memory extraction (using a model to decide what to remember from an interaction) is an excellent pattern but requires careful prompting. A poorly-prompted extraction model can: (1) store too much, creating noise; (2) store too little, losing valuable context; (3) hallucinate memories not actually present in the conversation. Validate extracted content against the source conversation before storing. Use importance thresholds to filter out low-confidence extractions. :::
Interview Questions and Answers
Q: How do you implement episodic memory retrieval that balances relevance with recency?
A: Pure semantic similarity fails because a highly relevant but year-old memory might be less useful than a moderately-similar memory from yesterday. The solution is a composite scoring formula that combines cosine similarity from vector search with time-based recency decay and stored importance scores. A typical formula: score = similarity × 0.5 + recency × 0.3 + importance × 0.2, where recency = 1/(1 + age_hours × 0.05). This gives exponential decay with a half-life of about 14 hours. The weights are tunable - for tasks where historical accuracy matters more than recency, increase the importance weight. For rapidly-changing environments, increase the recency weight. Always fetch more candidates from the vector store than you need (3× to 5×) and re-rank with the composite formula.
Q: How do you prevent episodic memory from filling up with noise over time?
A: Multiple strategies working together. (1) Importance thresholds at storage: only store memories that score above a minimum (0.3–0.4). This filters out greetings, acknowledgments, and low-value exchanges at ingestion time. (2) Deduplication: before storing a new memory, check for highly similar existing memories (cosine similarity > 0.9). If found, consolidate - update the existing memory's importance and timestamp rather than creating a duplicate. (3) Time-based decay: periodically reduce the importance of old memories using exponential decay. Archive memories that fall below a minimum importance threshold (0.1). (4) LLM-based filtering: use a cheap model to score importance before storage. This is more accurate than heuristic scoring but adds latency. (5) Maximum capacity enforcement: if memory count exceeds a limit per user, prune the lowest-importance non-archived memories.
Q: What is the difference between episodic memory consolidation and summarization?
A: Consolidation is merging two similar memories about the same fact or event into one more accurate record. Example: memory A says "User prefers Python" (importance 0.6) and memory B says "User works exclusively in Python" (importance 0.7) - these should consolidate into a single stronger memory. Consolidation preserves specific information while removing redundancy. Summarization is compressing a collection of diverse memories into a higher-level abstract summary, often losing specific details in exchange for breadth. Example: summarizing 20 conversation memories into "User is a Python developer debugging authentication issues in a FastAPI service." Consolidation is best for duplicate or near-duplicate memories about the same subject. Summarization is best for archiving old memories while preserving a gist.
Q: How would you structure episodic memory for a multi-agent system where multiple agents interact with the same user?
A: The key design decision is whether memories are agent-scoped or user-scoped. For memories that are inherently agent-specific (what this specific agent did or said), scope them to agent_id. For memories about the user (preferences, context, history), scope them to user_id and share across agents. Implementation: add agent_id and user_id as separate metadata fields. User-scoped memories use user_id filter only. Agent-scoped memories use agent_id filter. For shared memories, ensure writes go through a coordination layer to prevent race conditions - two agents writing the same user preference simultaneously can create conflicting records. In practice, designate one agent as the "memory coordinator" that handles all writes, or use a message queue to serialize memory updates.
Q: How do you test episodic memory retrieval quality?
A: Build an evaluation dataset of (query, relevant_memory_id) pairs - essentially a retrieval ground truth. Then measure recall@k (what fraction of relevant memories appear in the top-k results) and precision@k (what fraction of returned memories are actually relevant). For composite scoring specifically: run ablations where you test pure similarity, pure recency, and pure importance separately, then compare with the composite. The composite should outperform any single signal. For production monitoring: track the distribution of retrieval scores over time - if average scores drop, it means retrieval quality is degrading (possibly because memory volume has grown but embedding quality has not scaled). Also monitor memory access frequency: if most retrieved memories have access_count of 1, they are being fetched but not reinforced, suggesting retrieval relevance is low.
