Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the A/B Testing demo on the EngineersOfAI Playground - no code required. :::

AI Feature Flags and Rollouts

The $2 Million Prompt Bug

A major legal tech company was rolling out a new AI contract analysis feature to their enterprise customers. The feature scanned uploaded agreements, flagged potentially problematic clauses, and generated a risk summary. It had passed internal testing, cleared the QA process, and received enthusiastic feedback from pilot customers. Product leadership had pushed hard for a full rollout. The engineering team convinced them to start with a 5% canary.

Within 36 hours of the 5% canary going live, their analytics flagged an anomaly: the AI was consistently rating clauses related to GDPR compliance as "low risk." The legal team investigated and discovered the problem - the system prompt had been updated with a new compliance template, but the template referenced an outdated GDPR framework. The AI was confidently assessing clause compliance against rules that had been superseded 14 months earlier.

The impact of the 5% canary: 23 contracts incorrectly assessed. The legal team could review and correct all of them within a day. Estimated cost: approximately $8,000 in legal review time. The estimated cost of a full rollout with the same bug: thousands of contracts across hundreds of enterprise customers, potential legal liability, and the kind of trust destruction that ends products. The canary saved them. The kill switch that disabled the feature in 12 seconds saved them twice - they flipped it while the investigation was still ongoing, preventing any further incorrect analyses from being stored or acted on.

AI feature flags are not the same as traditional software feature flags. They need to control not just whether a feature is enabled, but which model version, which prompt version, what response format, what cost budget, and whether quality metrics are passing. This lesson covers the full system.


Why AI Rollouts Are Different

Traditional feature flags answer one question: is this feature on or off for this user? AI feature flags need to manage a much richer set of variables simultaneously.

An AI feature flag bundle includes all of:

  • Enabled: master on/off switch
  • Model: which LLM version to use for this cohort
  • Prompt version: which prompt template from the registry
  • Rollout percentage: fraction of users who see this flag
  • Cost limit: maximum tokens per day before auto-disable
  • Quality gate: minimum eval score required to stay active
  • Kill switch: override that disables in under 60 seconds

The AI Feature Flag System

# feature_flags/ai_flags.py
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
import hashlib
import time
import logging

logger = logging.getLogger(__name__)


class RolloutStrategy(str, Enum):
PERCENTAGE = "percentage" # Random X% of users (deterministic hash)
USER_LIST = "user_list" # Specific user IDs
TENANT_LIST = "tenant_list" # Specific tenant/org IDs
TENANT_TIER = "tenant_tier" # By subscription tier (e.g., enterprise only)
INTERNAL_ONLY = "internal_only" # Only employees/beta testers
BETA_USERS = "beta_users" # Opted-in beta users


class FlagStatus(str, Enum):
DRAFT = "draft" # Not yet live
ACTIVE = "active" # Live, receiving traffic
PAUSED = "paused" # Temporarily disabled (quality gate failure)
KILLED = "killed" # Emergency disabled
COMPLETED = "completed" # Rollout complete - 100%


@dataclass
class AIFeatureFlag:
flag_id: str
name: str
description: str

# Rollout control
enabled: bool = False
rollout_percent: float = 0.0 # 0.0 to 1.0
strategy: RolloutStrategy = RolloutStrategy.PERCENTAGE
allowed_users: list[str] = field(default_factory=list)
allowed_tenants: list[str] = field(default_factory=list)
allowed_tiers: list[str] = field(default_factory=list)

# AI-specific configuration
model: str = "claude-opus-4-6"
prompt_version: Optional[str] = None # Semver: "2.1.0"
max_tokens: int = 2048
temperature_override: Optional[float] = None

# Cost controls
daily_token_budget: int = 5_000_000 # Auto-disable when exceeded
tokens_used_today: int = 0 # Updated by cost tracker

# Quality gates
min_eval_score: float = 0.70 # Rollout paused below this
min_satisfaction_rate: float = 0.65 # Thumbs-up rate threshold
eval_failure_action: str = "pause" # "pause" or "kill"
quality_check_interval_minutes: int = 30

# Kill switch
status: FlagStatus = FlagStatus.DRAFT
killed_at: Optional[float] = None
killed_reason: Optional[str] = None
killed_by: Optional[str] = None

# Rollout metadata
created_at: float = field(default_factory=time.time)
last_updated_at: float = field(default_factory=time.time)
launched_at: Optional[float] = None
rollout_stage: int = 0 # Which stage in the progressive rollout


class AIFlagEvaluator:
"""
Evaluates AI feature flags for a given user/tenant context.

Design principles:
- Deterministic: same input → same result every time (critical for UX consistency)
- Fast: evaluated on every request, must be < 1ms
- Safe: kill switch must take effect immediately, not on next deploy

In production: store flags in Redis with TTL=0 so all instances
see the same state. Read from Redis on every evaluation.
"""

def __init__(
self,
flags: dict[str, AIFeatureFlag],
internal_user_ids: set[str] = None,
beta_user_ids: set[str] = None,
):
self.flags = flags
self._internal_users = internal_user_ids or set()
self._beta_users = beta_user_ids or set()

def evaluate(
self,
flag_id: str,
user_id: str,
tenant_id: str = "",
tenant_tier: str = "free",
) -> tuple[bool, Optional[AIFeatureFlag]]:
"""
Evaluate flag for a user.

Returns:
(is_active, flag_config_if_active)

Never raises - returns (False, None) on any error.
"""
try:
flag = self.flags.get(flag_id)
if not flag:
return False, None

# Kill switch and status checks first - highest priority
if flag.status == FlagStatus.KILLED:
return False, None
if flag.status == FlagStatus.PAUSED:
return False, None
if not flag.enabled:
return False, None

# Budget check
if flag.tokens_used_today >= flag.daily_token_budget:
logger.warning(f"Flag {flag_id} over daily budget - blocking request")
return False, None

# Strategy-specific check
active = self._evaluate_strategy(flag, user_id, tenant_id, tenant_tier)
return active, flag if active else None

except Exception as e:
logger.error(f"Flag evaluation error for {flag_id}: {e}")
return False, None # Fail safe - feature off on any error

def _evaluate_strategy(
self,
flag: AIFeatureFlag,
user_id: str,
tenant_id: str,
tenant_tier: str,
) -> bool:
strategy = flag.strategy

if strategy == RolloutStrategy.USER_LIST:
return user_id in flag.allowed_users

elif strategy == RolloutStrategy.TENANT_LIST:
return tenant_id in flag.allowed_tenants

elif strategy == RolloutStrategy.TENANT_TIER:
return tenant_tier in flag.allowed_tiers

elif strategy == RolloutStrategy.INTERNAL_ONLY:
return user_id in self._internal_users

elif strategy == RolloutStrategy.BETA_USERS:
return user_id in self._beta_users

else: # PERCENTAGE - deterministic hash-based bucketing
return self._hash_bucket(user_id, flag.flag_id) < flag.rollout_percent

def _hash_bucket(self, user_id: str, flag_id: str) -> float:
"""
Deterministic bucket assignment: same user always gets the same bucket.
Using flag_id in the hash ensures users are independently bucketed
across different flags - user A is in the 5% for flag X but not flag Y.

Hash → int → float in [0.0, 1.0)
"""
key = f"{flag_id}:{user_id}"
hash_int = int(hashlib.sha256(key.encode()).hexdigest(), 16)
return (hash_int % 10_000) / 10_000.0

def kill(
self,
flag_id: str,
reason: str,
operator: str,
) -> None:
"""
Emergency kill switch.
Takes effect immediately on the next flag evaluation.

In production: also write to Redis and emit kill_switch_activated event
to all monitoring systems simultaneously.
"""
flag = self.flags.get(flag_id)
if not flag:
raise ValueError(f"Flag not found: {flag_id}")

flag.status = FlagStatus.KILLED
flag.killed_at = time.time()
flag.killed_reason = reason
flag.killed_by = operator
flag.enabled = False

logger.critical(
f"KILL SWITCH ACTIVATED | flag={flag_id} | "
f"reason={reason} | operator={operator}"
)
# Production: persist to DB, emit alert, notify on-call, post to Slack

def pause(self, flag_id: str, reason: str) -> None:
"""Pause (auto-restart possible). Different from kill (manual restart required)."""
flag = self.flags.get(flag_id)
if flag:
flag.status = FlagStatus.PAUSED
logger.warning(f"Flag PAUSED | flag={flag_id} | reason={reason}")

def set_rollout(self, flag_id: str, percent: float, operator: str = "system") -> None:
"""Adjust rollout percentage. Log every change for audit trail."""
flag = self.flags.get(flag_id)
if not flag:
raise ValueError(f"Flag not found: {flag_id}")

if not 0.0 <= percent <= 1.0:
raise ValueError(f"Rollout percent must be 0.0-1.0, got {percent}")

old = flag.rollout_percent
flag.rollout_percent = percent
flag.last_updated_at = time.time()

logger.info(
f"Rollout updated | flag={flag_id} | "
f"{old:.1%}{percent:.1%} | operator={operator}"
)

Quality-Gated Rollouts

The defining feature of AI rollouts: the ability to automatically pause or rollback based on quality metrics, not just error rates.

# feature_flags/quality_gate.py
import anthropic
import asyncio
from dataclasses import dataclass, field
from collections import deque
from typing import Optional
import time
import json
import statistics


@dataclass
class QualitySample:
timestamp: float
flag_id: str
user_id: str
model: str
prompt_version: str
response_length: int
thumbs_up: Optional[bool] = None # Explicit user feedback
llm_judge_score: Optional[float] = None # Automated evaluation
copy_event: bool = False # Implicit quality signal
response_time_ms: float = 0.0


class QualityGate:
"""
Monitors rolling quality metrics for an active flag.
Automatically pauses the flag if quality drops below thresholds.

Sampling strategy:
- Collect explicit feedback from all users who provide it
- Run LLM-as-judge on a random 10% sample for automated scoring
- Track implicit signals (copy rate) on all responses

Alert thresholds (configurable per flag):
- satisfaction_rate < 0.65: pause flag
- llm_judge_score < 0.70: pause flag
- hallucination_rate > 0.05: kill flag
"""

def __init__(
self,
evaluator: AIFlagEvaluator,
window_size: int = 300, # Rolling window (last N samples)
judge_sample_rate: float = 0.10, # Run LLM judge on 10% of responses
):
self.evaluator = evaluator
self.window_size = window_size
self.judge_sample_rate = judge_sample_rate
self._samples: dict[str, deque] = {}
self._judge_client = anthropic.Anthropic()

def record(
self,
flag_id: str,
user_id: str,
model: str,
prompt_version: str,
response: str,
user_query: str,
response_time_ms: float,
thumbs_up: Optional[bool] = None,
) -> None:
"""Record a response for quality monitoring."""
sample = QualitySample(
timestamp=time.time(),
flag_id=flag_id,
user_id=user_id,
model=model,
prompt_version=prompt_version,
response_length=len(response),
thumbs_up=thumbs_up,
response_time_ms=response_time_ms,
)

if flag_id not in self._samples:
self._samples[flag_id] = deque(maxlen=self.window_size)
self._samples[flag_id].append(sample)

# Run LLM judge asynchronously on sample fraction
import random
if random.random() < self.judge_sample_rate:
asyncio.create_task(
self._run_judge_async(sample, user_query, response)
)

async def _run_judge_async(
self,
sample: QualitySample,
query: str,
response: str,
) -> None:
"""Run LLM-as-judge evaluation asynchronously."""
try:
judge_response = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._judge_client.messages.create(
model="claude-haiku-4-5-20251001", # Use different model family for judge
max_tokens=150,
messages=[{
"role": "user",
"content": (
f"Rate this AI response (0.0-1.0):\n"
f"Query: {query[:300]}\n"
f"Response: {response[:500]}\n\n"
f"Score criteria: accuracy, helpfulness, safety.\n"
f"Output JSON: {{\"score\": 0.0-1.0, \"issues\": []}}"
),
}],
)
)

result = json.loads(judge_response.content[0].text)
sample.llm_judge_score = float(result.get("score", 0.5))

except Exception as e:
pass # Don't block on judge failures

def check_and_maybe_gate(self, flag_id: str) -> dict:
"""
Evaluate current quality metrics for a flag.
Auto-pauses or kills the flag if thresholds are violated.

Returns quality report dict for monitoring dashboard.
"""
flag = self.evaluator.flags.get(flag_id)
if not flag or not flag.enabled:
return {"status": "inactive"}

samples = list(self._samples.get(flag_id, []))
if len(samples) < 20:
return {
"status": "insufficient_data",
"sample_size": len(samples),
"message": "Need at least 20 samples to evaluate quality",
}

# Satisfaction rate from explicit feedback
feedback_samples = [s for s in samples if s.thumbs_up is not None]
satisfaction_rate = (
sum(1 for s in feedback_samples if s.thumbs_up) / len(feedback_samples)
if feedback_samples else None
)

# Average LLM judge score
judge_samples = [s for s in samples if s.llm_judge_score is not None]
avg_judge = (
statistics.mean(s.llm_judge_score for s in judge_samples)
if judge_samples else None
)

# Copy rate (implicit quality signal)
copy_rate = sum(1 for s in samples if s.copy_event) / len(samples)

# Evaluate against flag thresholds
violations = []
severity = "ok"

if satisfaction_rate is not None and satisfaction_rate < flag.min_satisfaction_rate:
violations.append(
f"Low satisfaction: {satisfaction_rate:.1%} < {flag.min_satisfaction_rate:.1%}"
)
severity = "pause"

if avg_judge is not None and avg_judge < flag.min_eval_score:
violations.append(
f"Low LLM judge score: {avg_judge:.2f} < {flag.min_eval_score:.2f}"
)
severity = "pause"

if violations:
action = flag.eval_failure_action
if action == "pause":
self.evaluator.pause(flag_id, f"Quality gate: {'; '.join(violations)}")
elif action == "kill":
self.evaluator.kill(
flag_id,
reason=f"Quality gate (auto): {'; '.join(violations)}",
operator="system",
)

return {
"flag_id": flag_id,
"status": severity,
"sample_size": len(samples),
"satisfaction_rate": satisfaction_rate,
"avg_judge_score": avg_judge,
"copy_rate": copy_rate,
"violations": violations,
"flag_status": flag.status.value,
}

A/B Testing Prompt Versions

The most common AI A/B test: comparing two versions of a system prompt to determine which produces better user outcomes.

# feature_flags/ab_test.py
import anthropic
import hashlib
from dataclasses import dataclass
from typing import Optional
import logging
import json

logger = logging.getLogger(__name__)


@dataclass
class PromptVariant:
variant_id: str # "control", "treatment_a", "treatment_b"
prompt_version: str # Semver from prompt registry
model: str # Model to use with this variant
traffic_fraction: float # Must sum to 1.0 across all variants


class PromptABTest:
"""
A/B test between prompt versions.

Critical requirement: deterministic user assignment.
The same user must always see the same variant.
If a user switches between variants mid-experiment:
- Their experience is inconsistent (trust broken)
- The data is contaminated (contamination bias)

Implementation: hash(test_id + user_id) → deterministic bucket.
"""

def __init__(self, test_id: str, variants: list[PromptVariant]):
total_traffic = sum(v.traffic_fraction for v in variants)
if abs(total_traffic - 1.0) > 0.001:
raise ValueError(f"Variant fractions must sum to 1.0, got {total_traffic:.3f}")
self.test_id = test_id
self.variants = variants

def assign_variant(self, user_id: str) -> PromptVariant:
"""Deterministically assign user to variant."""
key = f"{self.test_id}:{user_id}"
bucket = int(hashlib.sha256(key.encode()).hexdigest(), 16) % 10_000 / 10_000.0

cumulative = 0.0
for variant in self.variants:
cumulative += variant.traffic_fraction
if bucket < cumulative:
return variant

return self.variants[-1] # Fallback (floating point safety)

async def run(
self,
user_id: str,
messages: list[dict],
prompt_registry,
) -> tuple[str, str]:
"""
Run the A/B test for a user.
Returns (response, variant_id) for tracking.
"""
variant = self.assign_variant(user_id)

system_prompt = await prompt_registry.get(
template_id="base_assistant",
version=variant.prompt_version,
)

client = anthropic.AsyncAnthropic()
response = await client.messages.create(
model=variant.model,
max_tokens=2048,
system=system_prompt,
messages=messages,
)

logger.info(
f"A/B test | test={self.test_id} | "
f"user={user_id[:8]} | variant={variant.variant_id} | "
f"model={variant.model} | prompt_v={variant.prompt_version}"
)

return response.content[0].text, variant.variant_id

def analyze_results(self, variant_metrics: dict[str, dict]) -> dict:
"""
Analyze A/B test results and recommend a winner.

Expected input format:
{
"control": {"satisfaction_rate": 0.72, "sample_size": 450},
"treatment": {"satisfaction_rate": 0.79, "sample_size": 448}
}
"""
import math

results = {}
baseline = None

for variant_id, metrics in variant_metrics.items():
n = metrics["sample_size"]
rate = metrics["satisfaction_rate"]

# 95% confidence interval using Wilson score
z = 1.96
center = (rate + z**2 / (2 * n)) / (1 + z**2 / n)
margin = (z * math.sqrt(rate * (1 - rate) / n + z**2 / (4 * n**2))) / (1 + z**2 / n)

results[variant_id] = {
"satisfaction_rate": rate,
"sample_size": n,
"ci_lower": round(center - margin, 3),
"ci_upper": round(center + margin, 3),
}

if variant_id == "control":
baseline = rate

# Compute lift vs control
if baseline:
for vid, r in results.items():
if vid != "control":
lift = (r["satisfaction_rate"] - baseline) / baseline
results[vid]["lift_vs_control"] = round(lift * 100, 1)
# CIs overlap = not statistically significant
results[vid]["significant"] = (
r["ci_lower"] > results["control"]["ci_upper"] or
r["ci_upper"] < results["control"]["ci_lower"]
)

# Recommend winner
best = max(results.items(), key=lambda x: x[1]["satisfaction_rate"])
significant_wins = [
vid for vid, r in results.items()
if vid != "control" and r.get("significant", False)
and r["satisfaction_rate"] > baseline
]

return {
"test_id": self.test_id,
"variants": results,
"recommendation": (
significant_wins[0] if significant_wins
else "control" # No significant winner - keep control
),
"reason": (
f"Treatment shows {results[significant_wins[0]].get('lift_vs_control', 0):.1f}% lift with statistical significance"
if significant_wins
else "No statistically significant improvement found - recommend keeping control"
),
}

Progressive Rollout Automation

# feature_flags/progressive_rollout.py
import asyncio
import logging
from typing import Optional

logger = logging.getLogger(__name__)

# Standard progressive rollout stages
# Each stage needs minimum time AND minimum sample size before advancing
ROLLOUT_STAGES = [
{"percent": 0.01, "label": "Internal canary (1%)", "min_hours": 24, "min_samples": 50},
{"percent": 0.05, "label": "Expanded canary (5%)", "min_hours": 48, "min_samples": 100},
{"percent": 0.10, "label": "10% rollout", "min_hours": 48, "min_samples": 200},
{"percent": 0.25, "label": "25% rollout", "min_hours": 48, "min_samples": 300},
{"percent": 0.50, "label": "50% rollout", "min_hours": 24, "min_samples": 400},
{"percent": 1.00, "label": "Full rollout (100%)", "min_hours": 0, "min_samples": 0},
]


class ProgressiveRollout:
"""
Manages automated progressive rollout with quality gates.

Rollout philosophy:
- Start at 1% (catch show-stoppers with minimal blast radius)
- Advance only when quality gates pass AND min samples are collected
- Rollback automatically on quality gate failure
- Never advance more than one stage at a time
- Always require manual approval to go from 50% → 100%
"""

def __init__(
self,
flag_id: str,
evaluator: AIFlagEvaluator,
quality_gate: QualityGate,
require_approval_for_full: bool = True,
):
self.flag_id = flag_id
self.evaluator = evaluator
self.quality = quality_gate
self.require_approval = require_approval_for_full
self._current_stage = 0
self._approval_received = False

async def start(self) -> None:
"""Start the rollout at stage 0 (1%)."""
flag = self.evaluator.flags.get(self.flag_id)
if not flag:
raise ValueError(f"Flag not found: {self.flag_id}")

flag.enabled = True
flag.status = FlagStatus.ACTIVE
flag.launched_at = time.time()
self._current_stage = 0

first_stage = ROLLOUT_STAGES[0]
self.evaluator.set_rollout(
self.flag_id,
first_stage["percent"],
operator="progressive_rollout",
)
logger.info(f"Rollout started | flag={self.flag_id} | stage=0 | {first_stage['label']}")

async def advance(self, approval_by: Optional[str] = None) -> dict:
"""
Try to advance to the next rollout stage.
Returns status report.
"""
if self._current_stage >= len(ROLLOUT_STAGES) - 1:
return {"status": "complete", "message": "Already at 100%"}

next_stage_idx = self._current_stage + 1
next_stage = ROLLOUT_STAGES[next_stage_idx]

# Final stage requires manual approval
if next_stage["percent"] == 1.0 and self.require_approval and not approval_by:
return {
"status": "approval_required",
"message": "Manual approval required before 100% rollout",
"current_percent": ROLLOUT_STAGES[self._current_stage]["percent"],
}

# Check quality gate
quality_report = self.quality.check_and_maybe_gate(self.flag_id)

if quality_report.get("status") == "insufficient_data":
return {
"status": "waiting_for_data",
"message": quality_report["message"],
"current_stage": self._current_stage,
}

if quality_report.get("violations"):
# Quality gate failed - do not advance, rollout already paused
return {
"status": "quality_gate_failed",
"violations": quality_report["violations"],
"action": "Flag auto-paused. Fix quality issues before resuming.",
"current_stage": self._current_stage,
}

# Advance to next stage
self._current_stage = next_stage_idx
self.evaluator.set_rollout(
self.flag_id,
next_stage["percent"],
operator=approval_by or "progressive_rollout",
)

logger.info(
f"Rollout advanced | flag={self.flag_id} | "
f"stage={next_stage_idx} | {next_stage['label']}"
)

return {
"status": "advanced",
"new_stage": next_stage_idx,
"new_percent": next_stage["percent"],
"label": next_stage["label"],
"quality_report": quality_report,
}

async def rollback(self, reason: str) -> None:
"""Roll back to the previous stage."""
if self._current_stage == 0:
# At stage 0 - kill entirely
self.evaluator.kill(self.flag_id, reason=reason, operator="progressive_rollout")
return

prev_stage = self._current_stage - 1
self._current_stage = prev_stage
self.evaluator.set_rollout(
self.flag_id,
ROLLOUT_STAGES[prev_stage]["percent"],
operator="progressive_rollout_rollback",
)
logger.warning(
f"Rollout rolled back | flag={self.flag_id} | "
f"reason={reason} | stage → {prev_stage}"
)

Cost Budget Enforcement

# feature_flags/cost_guard.py
import time
import logging
from typing import Optional

logger = logging.getLogger(__name__)


class FlagCostGuard:
"""
Enforce per-flag daily token budgets.
Auto-disables flags that exceed their budget.

Use case: A/B test accidentally uses a much more expensive model,
or a prompt change causes dramatically longer responses.
Budget guard provides automatic cost protection.

Production implementation: use Redis INCR with daily key expiry.
"""

def __init__(self):
self._usage: dict[str, int] = {} # flag_id → tokens_today (in-memory for demo)

def _today_key(self, flag_id: str) -> str:
date = time.strftime("%Y-%m-%d")
return f"{flag_id}:{date}"

def check_budget(
self,
flag: AIFeatureFlag,
estimated_tokens: int,
) -> bool:
"""
Returns True if budget allows this request.
Atomically reserves the tokens if allowed.

In production:
key = f"flag_cost:{flag_id}:{today}"
current = redis.get(key) or 0
if current + estimated_tokens > flag.daily_token_budget:
return False
redis.incrby(key, estimated_tokens)
redis.expire(key, 86400 * 2)
return True
"""
key = self._today_key(flag.flag_id)
current = self._usage.get(key, 0)

if current + estimated_tokens > flag.daily_token_budget:
logger.warning(
f"Flag {flag.flag_id} exceeded daily budget "
f"({current + estimated_tokens} > {flag.daily_token_budget} tokens)"
)
return False

self._usage[key] = current + estimated_tokens
return True

def get_usage_percent(self, flag: AIFeatureFlag) -> float:
key = self._today_key(flag.flag_id)
return self._usage.get(key, 0) / max(flag.daily_token_budget, 1)

Pre-Rollout Checklist

# feature_flags/rollout_checklist.py
from dataclasses import dataclass
from typing import Callable


@dataclass
class CheckResult:
name: str
passed: bool
blocking: bool
message: str


class AIRolloutChecklist:
"""
Automated validation before any AI feature rollout.
Run before increasing rollout percentage.
Blocks rollout if any BLOCKING check fails.
"""

def __init__(self, flag: AIFeatureFlag, prompt_registry, eval_pipeline):
self.flag = flag
self.prompt_registry = prompt_registry
self.eval_pipeline = eval_pipeline

async def run(self) -> dict:
checks = [
CheckResult(
name="Prompt version exists in registry",
passed=await self.prompt_registry.version_exists(
self.flag.prompt_version
) if self.flag.prompt_version else True,
blocking=True,
message="Specified prompt version not found in registry",
),
CheckResult(
name="Eval suite passes at 95%+",
passed=await self.eval_pipeline.run_suite(
prompt_version=self.flag.prompt_version,
min_pass_rate=0.95,
),
blocking=True,
message="Eval suite below 95% pass rate - investigate failures before rollout",
),
CheckResult(
name="Kill switch verified functional",
passed=self._verify_kill_switch(),
blocking=True,
message="Kill switch not working - do not rollout without verified kill switch",
),
CheckResult(
name="Daily cost estimate within budget",
passed=self._estimate_daily_cost() < self.flag.daily_token_budget,
blocking=False,
message=f"Estimated daily cost ({self._estimate_daily_cost()} tokens) exceeds budget",
),
CheckResult(
name="Monitoring dashboards configured",
passed=True, # Check if Grafana dashboard exists for this flag
blocking=False,
message="No monitoring dashboard found for this flag",
),
CheckResult(
name="Rollback procedure documented",
passed=bool(self.flag.description),
blocking=False,
message="Add rollback instructions to flag description",
),
]

all_blocking_passed = all(c.passed for c in checks if c.blocking)

return {
"flag_id": self.flag.flag_id,
"approved": all_blocking_passed,
"checks": [
{
"name": c.name,
"passed": c.passed,
"blocking": c.blocking,
"message": c.message if not c.passed else "OK",
}
for c in checks
],
"blocking_failures": [c.name for c in checks if not c.passed and c.blocking],
}

def _verify_kill_switch(self) -> bool:
"""Test that kill switch can be activated (without actually killing)."""
# In production: send a test signal through the kill switch mechanism
return True

def _estimate_daily_cost(self) -> int:
"""Estimate daily token usage at current rollout %."""
# 1000 daily active users × rollout% × 2000 tokens avg
dau = 1000
avg_tokens = 2000
return int(dau * self.flag.rollout_percent * avg_tokens)

Production Engineering Notes

Kill switches must work in under 60 seconds. Store flag state in Redis with no TTL (always fresh). All flag evaluations read from Redis, not in-memory app state. Never rely on deploy-based kill switches - deployment takes minutes. On kill: set flag.status = KILLED in Redis → every instance's next request evaluation sees the flag as disabled immediately.

Canary at 1% before any AI feature. Even when you're confident the prompt is correct. AI systems fail in unexpected ways with real user inputs that never appeared in your test set. 1% exposure catches the long tail of failure modes with minimal blast radius. 48 hours at 1% is not optional.

Tag every response with flag metadata. Log {request_id, flag_id, prompt_version, model, variant_id} on every request. This makes it possible to replay or re-evaluate a specific cohort when investigating quality issues weeks later. Without this metadata, you cannot answer "did this prompt version cause the quality drop we saw on Tuesday?"

:::tip Decouple Prompt Rollouts from Code Rollouts Store prompt templates in a versioned registry separate from your code. This lets you: (1) Roll back a bad prompt without a code deploy. (2) A/B test prompt variants without code changes. (3) Update prompts instantly via flag update, not a 10-minute deploy pipeline. Any significant prompt change should go through its own canary rollout, independent of the code release cycle. :::

:::warning Never Test Rollout Cohorts and Eval Sets on the Same Users If you run your eval suite on the same users who are in the canary rollout, you're measuring quality on a biased sample. Eval suites should use a static golden dataset with known good answers. Rollout quality monitoring should measure the actual production user population separately. Mixing these creates circular evaluation that misses real-world failure modes. :::


Interview Q&A

Q1: How are AI feature flags different from traditional feature flags?

Traditional flags are binary: feature X is on or off for user Y. AI feature flags additionally control: which model to use, which prompt version, token budgets, quality thresholds, and A/B variants with different prompts. They also need quality-gated rollouts - the ability to automatically pause or rollback based on satisfaction scores and LLM judge scores, not just error rates. They need kill switches that work in under 60 seconds, since AI quality issues (hallucinations, stale knowledge, wrong format) can affect user trust much faster than code bugs. And they need to be coupled with a prompt version registry so you can roll back a bad prompt without a code deploy.

Q2: How do you safely roll out a new system prompt?

Five-stage process: (1) Update the prompt in the versioned prompt registry. (2) Run the eval suite against the new prompt version - block rollout if pass rate is below 95%. (3) Create a feature flag pointing to the new prompt version and start at 1% canary rollout. (4) Monitor quality signals for 24-48 hours: satisfaction rate, LLM judge score, refusal rate, response length distribution. (5) If signals hold, advance through stages: 5% → 10% → 25% → 50% → 100%, with quality gate checks at each stage. Kill switch available at any point. Every stage should be documented in a rollout log with who advanced it and what the quality signals showed.

Q3: How do you implement a kill switch that works in under 60 seconds?

Store flag state in Redis with TTL=0 (no expiry, always fresh). All application instances evaluate flags by reading from Redis on every request - no local cache. On kill: (1) API call → write flag.status = KILLED to Redis. (2) Every instance's next request reads from Redis and sees the flag as killed. (3) The feature is effectively off within seconds. Also: send a kill_switch_activated event to monitoring systems, post to an incident Slack channel, and trigger an on-call alert. Never store flag state in-process memory only - each instance would have independent state, and a kill switch would only affect the instance that received the command.

Q4: How do you A/B test two different prompt versions?

Assign users to variants deterministically: hash(user_id + test_id) → bucket → variant. Same user always gets same variant across sessions (critical for UX consistency and data integrity). Track on every response: variant_id, model, prompt_version, response_length. Measure: satisfaction rate (thumbs up/down), LLM judge score, copy rate, refusal rate, response time. Run for sufficient duration (minimum 200 samples per variant, minimum 2 weeks calendar time for weekly patterns). Use Wilson score confidence intervals to determine statistical significance. Roll out the winner; archive the loser with its results. Never conclude a test early because one variant looks better - regression to the mean is real and early leads often disappear.

Q5: What do you do when a quality gate fails during rollout?

Immediate action: the quality gate auto-pauses the flag (stops new traffic from hitting the feature). No new users see the broken behavior. Existing users in the canary: if they're mid-session, they complete normally. On next session, they get the fallback (previous behavior). Investigation: pull all responses from the affected cohort using the flag_id + timestamp metadata. Compare response samples from before and after the quality drop. Look for: prompt version that changed, model behavior change, new user input patterns that weren't in your eval set. Fix: update the prompt, run eval suite, verify fix in staging, restart rollout from 1%. Never re-enable the flag at the percentage it was at when it failed - always restart from 1%.

© 2026 EngineersOfAI. All rights reserved.