:::tip 🎮 Interactive Playground Visualize this concept: Try the Multi-Tenant ML Platform demo on the EngineersOfAI Playground - no code required. :::
Multi-Tenant AI Systems
The 3 AM Incident
It started with a Slack message at 3:07 AM: "Enterprise customer Acme Corp says they're seeing responses referencing another company's data."
Your on-call engineer sits up, heart pounding. Your AI assistant product serves 400 companies. Each has uploaded their internal documents - proprietary contracts, technical specifications, financial data. The system uses a shared vector database with metadata filtering to isolate tenant data. But now one of your largest customers is claiming they saw a competitor's document in a response.
You trace the incident for ninety minutes. A vector similarity search, under high load, had a metadata filter that wasn't applied atomically - the embedding similarity ran first, retrieved candidates, then the tenant filter was applied in application code. A race condition during a database index rebuild briefly caused cross-tenant document leakage. One response, one customer, catastrophic breach of trust.
The enterprise deal is worth $400K ARR. Their legal team calls at 9 AM. Your CTO is in damage-control mode.
This is the defining challenge of multi-tenant AI systems: you're not just building a product for one user - you're building a product for thousands of users who each believe they have an isolated, private AI. The technical requirements are severe. Tenant isolation must be absolute, not approximate. Cost attribution must be exact. Rate limiting must be per-tenant, not global. And when something goes wrong - and it will - you need audit trails that can answer "who saw what" to within the millisecond.
This lesson teaches you to build multi-tenant AI systems that are architecturally sound from the start.
Why This Exists
Early SaaS AI products made the simplest possible choice: one shared system, one shared database, one shared LLM context. This worked fine for demos and small betas. It failed spectacularly at scale for three reasons.
Data isolation failures. Shared vector databases with application-layer tenant filtering are fragile. Any bug in the filter logic causes cross-tenant leakage. Any database operation that bypasses the application layer (admin scripts, migrations, index rebuilds) risks exposing data. Enterprise customers discovered this quickly and demanded architectural guarantees, not application-layer promises.
Cost attribution impossibility. When all tenants share a single LLM billing account, you can't answer "how much did Tenant X cost us this month?" without heroic logging efforts. This makes pricing opaque and prevents you from identifying which customers are unprofitable.
Noisy neighbor problems. One power user hammering your API slows everyone else down. Without per-tenant rate limiting, a single customer's bulk import job could degrade the entire product for all other tenants.
The solution is a layered architecture where tenant isolation is enforced at every layer: request routing, prompt construction, vector storage, LLM calls, and cost tracking.
The Tenant Isolation Spectrum
Multi-tenancy exists on a spectrum from "shared everything" to "dedicated everything":
| Level | Cost | Security | Complexity |
|---|---|---|---|
| Shared everything | Low | Low | Low |
| Row-level security | Medium | Medium | Medium |
| Schema-level isolation | High | High | High |
| Full isolation | Very High | Very High | Very High |
Most production AI SaaS products target Row-level security with namespace isolation - enforced at the database layer, not the application layer. Full isolation is reserved for enterprise tier customers who pay for it explicitly.
Architecture: The Tenant-Aware Request Pipeline
Every request in a multi-tenant AI system flows through tenant context at every stage:
Core Implementation
1. Tenant Context and Configuration
Every request must carry a TenantContext object that flows through the entire pipeline:
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
import time
class TenantTier(str, Enum):
FREE = "free"
STARTER = "starter"
PROFESSIONAL = "professional"
ENTERPRISE = "enterprise"
@dataclass
class TenantConfig:
tenant_id: str
tier: TenantTier
# Rate limits
requests_per_minute: int
tokens_per_day: int
max_context_tokens: int
# LLM config
allowed_models: list[str]
default_model: str
# Data isolation
vector_namespace: str
db_schema: str
# Budget
monthly_token_budget: int
custom_system_prompt_prefix: Optional[str] = None
data_retention_days: int = 90
# Default configs by tier
TIER_CONFIGS = {
TenantTier.FREE: TenantConfig(
tenant_id="", # set per tenant
tier=TenantTier.FREE,
requests_per_minute=10,
tokens_per_day=50_000,
max_context_tokens=8_000,
allowed_models=["claude-haiku-4-5-20251001"],
default_model="claude-haiku-4-5-20251001",
vector_namespace="", # set per tenant
db_schema="", # set per tenant
monthly_token_budget=1_000_000,
data_retention_days=30,
),
TenantTier.PROFESSIONAL: TenantConfig(
tenant_id="",
tier=TenantTier.PROFESSIONAL,
requests_per_minute=60,
tokens_per_day=500_000,
max_context_tokens=32_000,
allowed_models=["claude-haiku-4-5-20251001", "claude-sonnet-4-6"],
default_model="claude-sonnet-4-6",
vector_namespace="",
db_schema="",
monthly_token_budget=10_000_000,
data_retention_days=90,
),
TenantTier.ENTERPRISE: TenantConfig(
tenant_id="",
tier=TenantTier.ENTERPRISE,
requests_per_minute=600,
tokens_per_day=10_000_000,
max_context_tokens=200_000,
allowed_models=["claude-haiku-4-5-20251001", "claude-sonnet-4-6", "claude-opus-4-6"],
default_model="claude-sonnet-4-6",
vector_namespace="",
db_schema="",
monthly_token_budget=200_000_000,
data_retention_days=365,
),
}
@dataclass
class TenantContext:
tenant_id: str
user_id: str
config: TenantConfig
request_id: str
request_start: float = field(default_factory=time.time)
@classmethod
def from_api_key(cls, api_key: str, user_id: str, request_id: str) -> "TenantContext":
"""Extract tenant context from API key (in production: verify against DB)."""
# API key format: "ten_{tenant_id}_{secret}"
parts = api_key.split("_")
tenant_id = parts[1] if len(parts) >= 2 else "unknown"
# Load from DB in production
config = _load_tenant_config(tenant_id)
return cls(
tenant_id=tenant_id,
user_id=user_id,
config=config,
request_id=request_id,
)
def _load_tenant_config(tenant_id: str) -> TenantConfig:
"""Load tenant config from database (simplified)."""
import copy
# In production: SELECT * FROM tenants WHERE tenant_id = ?
tier = TenantTier.PROFESSIONAL # fetched from DB
config = copy.deepcopy(TIER_CONFIGS[tier])
config.tenant_id = tenant_id
config.vector_namespace = f"tenant_{tenant_id}"
config.db_schema = f"tenant_{tenant_id}"
return config
2. Per-Tenant Rate Limiting
Rate limiting must be enforced per-tenant at the infrastructure layer, not the application layer. Use Redis with sliding window counters:
import redis.asyncio as redis
import time
from typing import Optional
class TenantRateLimiter:
"""
Sliding window rate limiter per tenant.
Uses Redis sorted sets for O(log N) performance.
"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def check_rate_limit(
self,
tenant_id: str,
limit: int,
window_seconds: int = 60,
) -> tuple[bool, dict]:
"""
Returns (allowed, info_dict).
Uses atomic Lua script to avoid race conditions.
"""
now = time.time()
window_start = now - window_seconds
key = f"rate_limit:{tenant_id}:{window_seconds}"
lua_script = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window_start = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local ttl = tonumber(ARGV[4])
-- Remove expired entries
redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)
-- Count current requests in window
local count = redis.call('ZCARD', key)
if count < limit then
-- Add current request
redis.call('ZADD', key, now, now .. math.random())
redis.call('EXPIRE', key, ttl)
return {1, count + 1, limit}
else
return {0, count, limit}
end
"""
result = await self.redis.eval(
lua_script,
1,
key,
now,
window_start,
limit,
window_seconds * 2,
)
allowed = bool(result[0])
current = int(result[1])
max_limit = int(result[2])
return allowed, {
"allowed": allowed,
"current": current,
"limit": max_limit,
"window_seconds": window_seconds,
"retry_after": window_seconds if not allowed else None,
}
async def check_daily_token_budget(
self,
tenant_id: str,
estimated_tokens: int,
daily_limit: int,
) -> tuple[bool, int]:
"""Check if tenant has sufficient token budget for today."""
import datetime
today = datetime.date.today().isoformat()
key = f"tokens:{tenant_id}:{today}"
pipe = self.redis.pipeline()
pipe.get(key)
results = await pipe.execute()
current_tokens = int(results[0] or 0)
would_be = current_tokens + estimated_tokens
allowed = would_be <= daily_limit
return allowed, daily_limit - current_tokens
async def record_tokens_used(
self,
tenant_id: str,
tokens_used: int,
) -> None:
"""Record actual tokens used after an LLM call."""
import datetime
today = datetime.date.today().isoformat()
key = f"tokens:{tenant_id}:{today}"
ttl = 86400 * 2 # 2 days
pipe = self.redis.pipeline()
pipe.incrby(key, tokens_used)
pipe.expire(key, ttl)
await pipe.execute()
3. Tenant-Isolated Vector Search
The most dangerous failure mode is cross-tenant vector leakage. The fix: use database-enforced namespace isolation, not application-layer filtering.
from typing import Any
import asyncio
class TenantVectorStore:
"""
Tenant-isolated vector storage using namespace partitioning.
Supports Pinecone, Weaviate, Qdrant, or pgvector.
"""
def __init__(self, backend: str = "qdrant"):
self.backend = backend
# In production: initialize actual client
self._collections: dict[str, list] = {}
def _get_collection_name(self, tenant_id: str) -> str:
"""
Each tenant gets their own collection/namespace.
This is isolation at the database level, not application level.
"""
# Sanitize tenant_id to prevent injection
safe_id = "".join(c for c in tenant_id if c.isalnum() or c == "_")
return f"tenant_{safe_id}_documents"
async def upsert_documents(
self,
tenant_id: str,
documents: list[dict],
embeddings: list[list[float]],
) -> None:
"""Insert documents into tenant's isolated collection."""
collection = self._get_collection_name(tenant_id)
# Qdrant example: each tenant has their own collection
# await self.client.upsert(
# collection_name=collection,
# points=[
# PointStruct(id=doc["id"], vector=emb, payload=doc)
# for doc, emb in zip(documents, embeddings)
# ]
# )
# Store metadata with tenant_id embedded (defense in depth)
for doc, emb in zip(documents, embeddings):
doc["_tenant_id"] = tenant_id # defense-in-depth metadata
if collection not in self._collections:
self._collections[collection] = []
self._collections[collection].append({
"embedding": emb,
"payload": doc,
})
print(f"Upserted {len(documents)} docs to collection {collection}")
async def search(
self,
tenant_id: str,
query_embedding: list[float],
top_k: int = 5,
filters: Optional[dict] = None,
) -> list[dict]:
"""
Search within tenant's isolated collection.
Tenant isolation guaranteed by collection-level partitioning.
"""
collection = self._get_collection_name(tenant_id)
# The isolation guarantee: we ONLY search within this tenant's collection
# No metadata filter needed - the collection itself is the boundary
# await self.client.search(
# collection_name=collection, # ← isolation enforced here
# query_vector=query_embedding,
# limit=top_k,
# query_filter=filters,
# )
# Simplified similarity search (replace with actual client)
results = self._mock_search(collection, query_embedding, top_k)
# Defense-in-depth: verify tenant_id in every result
verified = []
for r in results:
if r.get("_tenant_id") == tenant_id:
verified.append(r)
else:
# Log security alert - this should never happen
print(f"SECURITY ALERT: Cross-tenant result detected! "
f"Expected {tenant_id}, got {r.get('_tenant_id')}")
return verified
def _mock_search(
self, collection: str, query: list[float], top_k: int
) -> list[dict]:
"""Mock search - replace with actual vector DB call."""
docs = self._collections.get(collection, [])
return [d["payload"] for d in docs[:top_k]]
async def delete_tenant_data(self, tenant_id: str) -> None:
"""
Complete tenant data deletion (GDPR/CCPA right to erasure).
Collection-level isolation makes this a single operation.
"""
collection = self._get_collection_name(tenant_id)
# await self.client.delete_collection(collection_name=collection)
self._collections.pop(collection, None)
print(f"Deleted all data for tenant {tenant_id}")
4. Tenant-Aware Prompt Construction
System prompts must be tenant-aware. Each tenant may have custom instructions, persona settings, and access restrictions:
import anthropic
from dataclasses import dataclass
@dataclass
class ConversationMessage:
role: str
content: str
class TenantPromptBuilder:
"""
Builds tenant-aware system prompts and conversation contexts.
Prevents prompt injection and context leakage between tenants.
"""
BASE_SYSTEM_PROMPT = """You are an AI assistant for {company_name}.
You have access to the company's internal knowledge base. Only reference
information from the provided context. Do not reveal information from
your training about competitor companies or sensitive topics not in context.
Company: {company_name}
Assistant Name: {assistant_name}
Tenant ID: {tenant_id}"""
def build_system_prompt(
self,
ctx: TenantContext,
company_name: str,
assistant_name: str = "AI Assistant",
) -> str:
"""Build tenant-specific system prompt."""
base = self.BASE_SYSTEM_PROMPT.format(
company_name=company_name,
assistant_name=assistant_name,
tenant_id=ctx.tenant_id,
)
# Append tenant's custom instructions (validated, not raw user input)
if ctx.config.custom_system_prompt_prefix:
base = ctx.config.custom_system_prompt_prefix + "\n\n" + base
# Append tier-specific capabilities
if ctx.config.tier == TenantTier.FREE:
base += "\n\nNote: You are operating in free tier mode. Responses are limited."
return base
def build_rag_context(
self,
documents: list[dict],
tenant_id: str,
max_tokens: int = 4000,
) -> str:
"""
Build RAG context from retrieved documents.
Always verify documents belong to this tenant.
"""
if not documents:
return ""
context_parts = ["## Relevant Documents from Knowledge Base\n"]
token_estimate = 0
for i, doc in enumerate(documents, 1):
# Defense-in-depth: verify tenant ownership
if doc.get("_tenant_id") != tenant_id:
print(f"SECURITY: Skipping doc from tenant {doc.get('_tenant_id')}")
continue
doc_text = f"### Document {i}: {doc.get('title', 'Untitled')}\n"
doc_text += f"Source: {doc.get('source', 'Internal')}\n"
doc_text += f"{doc.get('content', '')}\n\n"
# Rough token estimate (4 chars ≈ 1 token)
token_estimate += len(doc_text) // 4
if token_estimate > max_tokens:
context_parts.append("*(Additional documents omitted due to context limit)*")
break
context_parts.append(doc_text)
return "".join(context_parts)
def sanitize_user_input(self, user_input: str, max_length: int = 4000) -> str:
"""
Basic prompt injection defense for user inputs.
"""
# Truncate to prevent token exhaustion
sanitized = user_input[:max_length]
# Detect obvious injection attempts
injection_patterns = [
"ignore previous instructions",
"ignore all instructions",
"disregard the above",
"new instructions:",
"system: you are now",
"act as if you have no restrictions",
]
lower = sanitized.lower()
for pattern in injection_patterns:
if pattern in lower:
# Log attempt and sanitize
print(f"Prompt injection attempt detected from user")
sanitized = sanitized.replace(pattern, "[redacted]")
return sanitized
5. Per-Tenant Cost Tracking
import time
from collections import defaultdict
from typing import Optional
import anthropic
# Token costs per 1M tokens (as of 2025)
MODEL_COSTS = {
"claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.00},
"claude-sonnet-4-6": {"input": 3.00, "output": 15.00},
"claude-opus-4-6": {"input": 15.00, "output": 75.00},
}
class TenantCostTracker:
"""
Tracks LLM costs per tenant with monthly budget enforcement.
In production: persist to database, not in-memory.
"""
def __init__(self):
self._usage: dict[str, dict] = defaultdict(lambda: {
"total_input_tokens": 0,
"total_output_tokens": 0,
"total_cost_usd": 0.0,
"request_count": 0,
"model_breakdown": defaultdict(lambda: {
"input_tokens": 0, "output_tokens": 0, "cost_usd": 0.0
}),
})
def record_usage(
self,
tenant_id: str,
model: str,
input_tokens: int,
output_tokens: int,
) -> float:
"""Record usage and return cost in USD."""
costs = MODEL_COSTS.get(model, {"input": 3.00, "output": 15.00})
cost = (
input_tokens * costs["input"] / 1_000_000
+ output_tokens * costs["output"] / 1_000_000
)
usage = self._usage[tenant_id]
usage["total_input_tokens"] += input_tokens
usage["total_output_tokens"] += output_tokens
usage["total_cost_usd"] += cost
usage["request_count"] += 1
model_data = usage["model_breakdown"][model]
model_data["input_tokens"] += input_tokens
model_data["output_tokens"] += output_tokens
model_data["cost_usd"] += cost
return cost
def get_tenant_report(self, tenant_id: str) -> dict:
"""Get cost report for a specific tenant."""
usage = self._usage.get(tenant_id, {})
return {
"tenant_id": tenant_id,
"total_requests": usage.get("request_count", 0),
"total_input_tokens": usage.get("total_input_tokens", 0),
"total_output_tokens": usage.get("total_output_tokens", 0),
"total_cost_usd": round(usage.get("total_cost_usd", 0.0), 4),
"model_breakdown": dict(usage.get("model_breakdown", {})),
}
def check_budget(
self,
tenant_id: str,
monthly_budget_tokens: int,
) -> tuple[bool, int]:
"""
Check if tenant has remaining monthly token budget.
Returns (within_budget, remaining_tokens).
"""
used = self._usage[tenant_id]["total_input_tokens"] + \
self._usage[tenant_id]["total_output_tokens"]
remaining = monthly_budget_tokens - used
return remaining > 0, max(0, remaining)
### 6. The Multi-Tenant AI Service
class MultiTenantAIService:
"""
Main orchestrator for multi-tenant AI requests.
Enforces isolation, rate limiting, and cost tracking at every layer.
"""
def __init__(
self,
redis_client,
vector_store: TenantVectorStore,
cost_tracker: TenantCostTracker,
):
self.client = anthropic.Anthropic()
self.rate_limiter = TenantRateLimiter(redis_client)
self.vector_store = vector_store
self.cost_tracker = cost_tracker
self.prompt_builder = TenantPromptBuilder()
async def handle_request(
self,
ctx: TenantContext,
user_message: str,
conversation_history: list[ConversationMessage],
company_name: str,
) -> dict:
"""
Handle a tenant request with full isolation guarantees.
"""
start_time = time.time()
# 1. Rate limit check (per-tenant, per-minute)
allowed, rate_info = await self.rate_limiter.check_rate_limit(
tenant_id=ctx.tenant_id,
limit=ctx.config.requests_per_minute,
window_seconds=60,
)
if not allowed:
return {
"error": "rate_limit_exceeded",
"retry_after": rate_info["retry_after"],
"tenant_id": ctx.tenant_id,
}
# 2. Budget check
within_budget, remaining_tokens = self.cost_tracker.check_budget(
ctx.tenant_id,
ctx.config.monthly_token_budget,
)
if not within_budget:
return {
"error": "monthly_budget_exhausted",
"tenant_id": ctx.tenant_id,
"message": "Monthly token budget reached. Contact support to upgrade.",
}
# 3. Sanitize user input
safe_input = self.prompt_builder.sanitize_user_input(
user_message,
max_length=ctx.config.max_context_tokens // 2,
)
# 4. Vector search (tenant-isolated)
query_embedding = await self._embed(safe_input)
documents = await self.vector_store.search(
tenant_id=ctx.tenant_id,
query_embedding=query_embedding,
top_k=5,
)
# 5. Build tenant-aware system prompt
system_prompt = self.prompt_builder.build_system_prompt(
ctx, company_name
)
rag_context = self.prompt_builder.build_rag_context(
documents,
ctx.tenant_id,
max_tokens=ctx.config.max_context_tokens // 3,
)
if rag_context:
system_prompt += f"\n\n{rag_context}"
# 6. Build message history (within token budget)
messages = self._build_message_history(
conversation_history,
safe_input,
ctx.config.max_context_tokens,
)
# 7. Select model (respect tenant's tier)
model = ctx.config.default_model
# 8. LLM call
response = self.client.messages.create(
model=model,
max_tokens=min(2048, ctx.config.max_context_tokens // 4),
system=system_prompt,
messages=messages,
)
# 9. Record cost (per tenant)
cost = self.cost_tracker.record_usage(
tenant_id=ctx.tenant_id,
model=model,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
)
# 10. Record tokens for daily limit
await self.rate_limiter.record_tokens_used(
ctx.tenant_id,
response.usage.input_tokens + response.usage.output_tokens,
)
elapsed = time.time() - start_time
return {
"response": response.content[0].text,
"tenant_id": ctx.tenant_id,
"model": model,
"usage": {
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"cost_usd": round(cost, 6),
},
"latency_ms": round(elapsed * 1000),
"documents_retrieved": len(documents),
}
def _build_message_history(
self,
history: list[ConversationMessage],
new_message: str,
max_context_tokens: int,
) -> list[dict]:
"""Build message list within token budget."""
messages = []
token_budget = max_context_tokens - 2048 # reserve for response
# Add history (most recent first, then reverse)
recent = []
token_count = len(new_message) // 4 # rough estimate
for msg in reversed(history):
msg_tokens = len(msg.content) // 4
if token_count + msg_tokens > token_budget:
break
recent.append({"role": msg.role, "content": msg.content})
token_count += msg_tokens
messages = list(reversed(recent))
messages.append({"role": "user", "content": new_message})
return messages
async def _embed(self, text: str) -> list[float]:
"""Generate embedding for vector search."""
# In production: use actual embedding model
import hashlib
hash_bytes = hashlib.sha256(text.encode()).digest()
return [float(b) / 255 for b in hash_bytes[:128]]
Tenant Data Lifecycle
Multi-tenant AI systems must handle the full data lifecycle, especially deletion:
class TenantLifecycleManager:
"""Manages tenant provisioning and deprovisioning."""
def __init__(self, vector_store: TenantVectorStore, redis_client):
self.vector_store = vector_store
self.redis = redis_client
async def provision_tenant(
self,
tenant_id: str,
tier: TenantTier,
company_name: str,
) -> dict:
"""
Provision all resources for a new tenant.
Idempotent - safe to call multiple times.
"""
# 1. Create vector namespace (collection per tenant)
namespace = f"tenant_{tenant_id}"
# await vector_client.create_collection(namespace, vector_size=1536)
# 2. Create DB schema (if using PostgreSQL with schemas)
# await db.execute(f"CREATE SCHEMA IF NOT EXISTS {namespace}")
# await db.execute(f"CREATE TABLE IF NOT EXISTS {namespace}.conversations ...")
# 3. Generate API key
import secrets
api_key = f"ten_{tenant_id}_{secrets.token_urlsafe(32)}"
# 4. Store tenant config in DB
# await db.execute("INSERT INTO tenants ...", ...)
print(f"Provisioned tenant {tenant_id} (tier: {tier.value})")
return {
"tenant_id": tenant_id,
"api_key": api_key, # Show ONCE - not stored in plaintext
"vector_namespace": namespace,
"tier": tier.value,
}
async def deprovision_tenant(self, tenant_id: str) -> None:
"""
Complete tenant removal. GDPR right to erasure.
Creates audit trail before deletion.
"""
# 1. Disable API keys immediately (prevent new requests)
# await db.execute("UPDATE tenant_keys SET active=false WHERE tenant_id=?", tenant_id)
# 2. Delete vector data
await self.vector_store.delete_tenant_data(tenant_id)
# 3. Delete conversation history from DB
# await db.execute(f"DROP SCHEMA IF EXISTS tenant_{tenant_id} CASCADE")
# 4. Delete Redis keys (rate limits, token counts, cache)
pattern = f"*:{tenant_id}:*"
# keys = await self.redis.keys(pattern)
# if keys: await self.redis.delete(*keys)
# 5. Create deletion audit record (retain for 7 years for compliance)
print(f"Deprovisioned tenant {tenant_id} - all data deleted")
# await audit_log.record("TENANT_DELETED", tenant_id=tenant_id)
Multi-Tenant Conversation Isolation
Conversations must be stored and retrieved with tenant-scoped keys. Never allow conversation IDs to be guessable:
import hashlib
import secrets
def generate_conversation_id(tenant_id: str) -> str:
"""
Generate non-guessable, tenant-scoped conversation ID.
Never use sequential integers - predictable IDs enable IDOR attacks.
"""
random_bytes = secrets.token_bytes(32)
# Include tenant_id in hash to make cross-tenant ID reuse obvious
combined = f"{tenant_id}:{random_bytes.hex()}".encode()
return hashlib.sha256(combined).hexdigest()[:32]
async def get_conversation(
db,
conversation_id: str,
tenant_id: str, # Always enforce tenant scope
) -> Optional[dict]:
"""
Fetch conversation, enforcing tenant ownership in the query.
Never trust client-provided tenant_id alone - use authenticated context.
"""
# SQL: always AND tenant_id = ? - prevents IDOR
result = await db.fetchone(
"SELECT * FROM conversations WHERE id = ? AND tenant_id = ?",
(conversation_id, tenant_id),
)
return result
:::danger IDOR Vulnerability Pattern Never look up a resource by ID alone without also filtering by tenant:
# WRONG - IDOR vulnerability
conversation = await db.get("conversations", id=conversation_id)
# CORRECT - always scope to tenant
conversation = await db.get("conversations", id=conversation_id, tenant_id=ctx.tenant_id)
An attacker can enumerate conversation IDs to access other tenants' data. :::
Noisy Neighbor Mitigation
One tenant's heavy use must not degrade others. Implement request queuing with per-tenant priority:
import asyncio
from collections import defaultdict
class TenantAwareQueue:
"""
Fair-queue scheduler - prevents any single tenant from monopolizing resources.
Uses weighted fair queuing: each tenant gets equal share of capacity.
"""
def __init__(self, max_concurrent: int = 50):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self._tenant_active: dict[str, int] = defaultdict(int)
self._per_tenant_limit = 10 # max concurrent per tenant
async def execute(
self,
tenant_id: str,
coro,
):
"""Execute a coroutine with fair queuing."""
# Per-tenant concurrency limit
if self._tenant_active[tenant_id] >= self._per_tenant_limit:
raise Exception(f"Tenant {tenant_id} at concurrency limit")
self._tenant_active[tenant_id] += 1
async with self.semaphore:
try:
return await coro
finally:
self._tenant_active[tenant_id] -= 1
def get_tenant_utilization(self) -> dict:
"""Report per-tenant resource utilization."""
return {
"global_active": self.max_concurrent - self.semaphore._value,
"per_tenant": dict(self._tenant_active),
}
Audit Logging
Every AI response in a multi-tenant system needs an immutable audit trail:
import json
import time
from dataclasses import dataclass, asdict
@dataclass
class AuditLogEntry:
timestamp: float
request_id: str
tenant_id: str
user_id: str
action: str # "llm_call", "document_access", "rate_limited", etc.
model: Optional[str]
input_tokens: Optional[int]
output_tokens: Optional[int]
cost_usd: Optional[float]
documents_accessed: list[str]
latency_ms: Optional[int]
error: Optional[str]
class TenantAuditLogger:
"""
Append-only audit log for compliance and security forensics.
Write to immutable storage (S3, CloudWatch, etc.) - never edit/delete.
"""
def __init__(self, storage_backend):
self.storage = storage_backend
self._buffer: list[AuditLogEntry] = []
self._flush_threshold = 100
async def log(self, entry: AuditLogEntry) -> None:
"""Log an audit entry. Buffer and flush periodically."""
self._buffer.append(entry)
if len(self._buffer) >= self._flush_threshold:
await self.flush()
async def flush(self) -> None:
"""Flush buffer to immutable storage."""
if not self._buffer:
return
# In production: write to S3/CloudWatch/Kafka
for entry in self._buffer:
print(json.dumps(asdict(entry)))
self._buffer.clear()
async def query_tenant_audit(
self,
tenant_id: str,
start_time: float,
end_time: float,
) -> list[AuditLogEntry]:
"""
Query audit log for a tenant (for compliance reports).
In production: query S3 or time-series DB.
"""
# SELECT * FROM audit_log WHERE tenant_id = ? AND timestamp BETWEEN ? AND ?
return []
Production Architecture Overview
Production Engineering Notes
Vector DB choice for multi-tenancy:
- Qdrant: Native collection-per-tenant support, efficient. Best for most use cases.
- Pinecone: Namespace isolation (shared index, logical partition) - weaker guarantee.
- Weaviate: Multi-tenancy built-in since v1.20 - strong isolation with shared infrastructure.
- pgvector: Schema-per-tenant in PostgreSQL - strong SQL guarantees, full ACID.
API key design:
- Encode tier in key prefix:
ten_pro_...vsten_ent_...- enables fast routing without DB lookup. - Rotate keys without service interruption: support multiple active keys per tenant.
- Hash keys before storing: never store plaintext API secrets.
Token counting before the call:
- Use
tiktokenor Anthropic's token counting API to estimate cost before making the call. - Block requests that would exceed budget before spending any money.
Conversation storage:
- Store per-tenant, with tenant_id as partition key in DynamoDB or as PostgreSQL partition.
- Cap conversation history length - enforce
max_context_tokensserver-side, not client-side.
:::tip Separate Billing Accounts for Enterprise For truly large enterprise tenants (>$10K/month), provision a separate Anthropic API account. This gives you hard per-tenant spending limits enforced by the API provider, and completely eliminates the risk of one tenant's runaway costs affecting others. :::
:::warning Context Window Leakage Always build system prompts programmatically - never concatenate user-provided strings into the system prompt without sanitization. A malicious tenant could inject instructions that affect other users' sessions if shared infrastructure is misconfigured. :::
:::danger Never Rely Solely on Application-Layer Filtering
If your tenant isolation depends on WHERE tenant_id = ? in application code but the underlying vector search runs first, you have a race condition. Enforce isolation at the storage layer with separate collections/schemas per tenant. Application-layer filtering is defense-in-depth, not the primary guarantee.
:::
Interview Questions
Q: How do you architect a multi-tenant AI system to prevent cross-tenant data leakage?
Use collection-level or schema-level isolation at the storage layer, not application-layer metadata filtering. For vector databases: each tenant gets their own collection in Qdrant or Weaviate (not shared index with metadata filters). For relational data: PostgreSQL schemas per tenant or a dedicated RDS instance for enterprise. Application-layer filtering is defense-in-depth - the primary isolation must be at the storage boundary. Also: never trust client-provided tenant IDs; always derive tenant identity from authenticated API key or JWT.
Q: How do you implement per-tenant rate limiting that can't be circumvented?
Use Redis sliding window counters with atomic Lua scripts (ZADD + ZCARD in one transaction). The key insight: rate limit enforcement must happen before the request reaches your application servers. Implement it as middleware that reads authenticated tenant_id (not a client-provided header). Use sliding windows, not fixed windows - fixed windows allow burst attacks at window boundaries (e.g., 100 requests at 11:59:59 + 100 at 12:00:01).
Q: A tenant claims they're being charged for other tenants' usage. How do you investigate?
Pull the audit log filtered by tenant_id for the disputed period. Audit logs must record: timestamp, tenant_id, model, input_tokens, output_tokens, request_id. Compare against your cost_tracker records. If there's a discrepancy, check if any background jobs (embedding pipelines, batch jobs) are running under the wrong tenant context. Also verify your cost allocation doesn't have a bug where tokens from one tenant's request are attributed to another's open connection.
Q: How do you handle GDPR right to erasure for a tenant's data?
Collection-level isolation makes this straightforward: DELETE COLLECTION tenant_{id} removes all vectors. For relational data: DROP SCHEMA tenant_{id} CASCADE. For Redis: delete all keys matching *:{tenant_id}:*. For audit logs: retain per legal requirement but mark as deleted in your application layer - audit logs may need to be retained for 7 years regardless of GDPR. The key is that complete tenant removal must be a single, automated operation, not a multi-step manual process that can be forgotten.
Q: How do you prevent a "noisy neighbor" tenant from degrading service for others?
Three layers: (1) Per-tenant concurrency limits in your async executor - no single tenant can hold more than N concurrent LLM connections. (2) Per-tenant rate limiting at the API gateway. (3) Priority queuing - paid tiers get higher queue priority. For extreme cases: tenant-level circuit breakers that temporarily disable a tenant's access if they've exceeded their fair share. Monitor p99 latency per tenant, not just global p99.
Q: How do you design API keys for a multi-tenant system?
Format: {prefix}_{tenant_id}_{random_secret}. Store only the hash (bcrypt or SHA-256). Include tier prefix for fast routing without DB lookup. Support multiple active keys per tenant (key rotation without downtime). Store: tenant_id, key_hash, created_at, last_used_at, expires_at, scopes. Never log the full key - only the prefix. For enterprise: support IP allowlisting per key.
