Skip to main content

04 - Guardrails and Action Validation

:::info Reading time: ~25 minutes | Last line of defense before irreversible harm :::

The Agent That Wanted to Run rm -rf

An agent with shell access was asked to "clean up temporary build files." It correctly identified that /tmp/build-cache was the target. It then composed the command:

rm -rf /tmp/build-cache /src/build /app/dist

Three directories. The first was the correct target. The second contained uncommitted changes. The third was the production distribution directory. Before the guardrail caught it.

The agent did not intend to cause harm. It identified a reasonable interpretation of "clean up build files" and constructed a plausible command. Without the guardrail, that command would have executed. With it, the action was blocked, the user was shown what the agent was about to do, and no data was lost.

This is what guardrails do. They are the last validation layer before an action causes real harm.


:::tip 🎮 Interactive Playground Visualize this concept: Try the LLM Guardrails demo on the EngineersOfAI Playground - no code required. :::

What Guardrails Are

Guardrails are validation checks that run before and after agent actions. They operate independently of the LLM - they do not ask the model to reconsider, they validate inputs and outputs programmatically.

The distinction matters: an LLM instructed to "always check before deleting files" may forget this instruction, have it overridden by injection, or reason its way around it. A guardrail implemented in Python cannot be reasoned around. It either passes or it does not.

Pre-action guardrails: Run before a tool executes. Questions: Is this input well-formed? Is this action on the allowlist? Is this action rate-limited? Has the user confirmed irreversible actions?

Post-action guardrails: Run after a tool returns output, before the output is passed back to the LLM. Questions: Does this output contain sensitive data that should be filtered? Does this output contain injection patterns? Is the output within expected bounds?


The Guardrail Pipeline Architecture


Guardrail Categories

1. Input Validation

The most basic guardrail: ensure tool inputs are well-formed before executing.

  • Type validation: Is the path a string? Is the count an integer? Is the email a valid email format?
  • Schema validation: Does the input match the expected JSON schema?
  • Bounds validation: Is the file size within limits? Is the query within length limits?
  • Encoding validation: Does the input contain null bytes or control characters?

Why this matters for agents: Agents construct tool inputs from LLM output. LLMs hallucinate. An agent might construct a database query with a placeholder like {user_id} still in it, or generate a file path with an invalid character. Input validation catches these before they cause errors or security issues.

2. Allowlist and Denylist

Allowlist (whitelist): Only these specific actions or patterns are permitted. Denylist (blacklist): These specific actions or patterns are forbidden.

Allowlist is stronger: if something is not explicitly allowed, it is denied. Denylist is more flexible: most things are allowed, specific things are denied.

For agent safety, use denylist for known-dangerous patterns and allowlist for high-privilege operations.

Shell denylist examples:

rm -rf /
rm -rf ~
chmod 777 /
curl * | bash
wget * | sh
:(){ :|:& };: # fork bomb
nc -e /bin/sh # reverse shell

Email allowlist example:

Only allow sending email to addresses in the @yourcompany.com domain.

3. Rate Limiting

Prevent runaway agent execution by limiting how frequently tools can be called.

Rate limiting serves two purposes:

  1. Resource protection: Prevent the agent from exhausting API quotas or compute budgets
  2. Loop detection: A rapidly repeated tool call pattern often indicates the agent is stuck in a loop (possibly injection-induced)

Implement rate limits at multiple time granularities:

  • Per-second (burst protection)
  • Per-minute (sustained use)
  • Per-session (total budget)
  • Per-task (expected usage for this task type)

4. Semantic Validation

Check whether the requested action is semantically coherent with the stated task.

This is the most sophisticated guardrail category because it requires understanding intent. Implementation approaches:

Rule-based: If the task is "summarize emails", block any call to send_email. If the task is "read logs", block any write operation.

LLM-based: Ask a guard model "Given this task, is this tool call expected behavior?" This is expensive but catches subtle semantic violations.

Statistical: Compare current tool call patterns against expected patterns for this task type, flag significant deviations.

5. Confirmation Gate

For high-risk actions (irreversible, broad scope), pause execution and require explicit user confirmation before proceeding.

A confirmation gate is not a guardrail that blocks - it is one that pauses. The action may proceed after confirmation, but the user has had a chance to review it.

Design considerations:

  • Show the user enough context to make an informed decision
  • Do not show so much context that they cannot process it
  • Set a timeout (if the user does not respond, the action is cancelled)
  • Log whether the user confirmed or cancelled - this is an audit trail

Hard Stops vs Soft Interventions

A critical design choice for every guardrail: should it block (hard stop) or warn and pause (soft intervention)?

ScenarioHard StopSoft Intervention
rm -rf /Yes - categorically forbiddenNo
delete_file("/important.db")Maybe - depends on contextYes - confirm first
send_email(to="[email protected]")Yes if out-of-scopeYes if in-scope but unexpected
Rate limit exceededYesMaybe - depends on how far over
Ambiguous file scopeNoYes - clarify first
Unknown tool callYesMaybe - flag for review

Rule of thumb: Hard stop for actions that are categorically dangerous regardless of context. Soft intervention for actions that might be legitimate but warrant review.


Python: Composable Guardrail System

"""
guardrail_pipeline.py

A composable, production-ready guardrail system for agent tool calls.
Pre-action and post-action validation pipelines with pluggable validators.

Features:
- Schema-based input validation
- Configurable allowlist/denylist
- Multi-granularity rate limiting
- Semantic coherence check (rule-based + optional LLM)
- PII/secret detection in outputs
- Comprehensive audit logging
"""

import re
import json
import time
import logging
from abc import ABC, abstractmethod
from collections import defaultdict, deque
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type
import anthropic

logger = logging.getLogger(__name__)
client = anthropic.Anthropic()


# ─────────────────────────────────────────────
# Core Types
# ─────────────────────────────────────────────

class GuardrailResult(Enum):
PASS = "pass"
SOFT_BLOCK = "soft_block" # Warn + require confirmation
HARD_BLOCK = "hard_block" # Blocked, no override


@dataclass
class GuardrailDecision:
result: GuardrailResult
guardrail_name: str
message: str
details: Optional[Dict] = None
requires_confirmation: bool = False


@dataclass
class PipelineDecision:
"""Aggregated decision from running a full guardrail pipeline."""
approved: bool
hard_blocked: bool
soft_blocked: bool
requires_confirmation: bool
decisions: List[GuardrailDecision] = field(default_factory=list)
block_reason: Optional[str] = None

def summary(self) -> str:
lines = [
f"Approved: {self.approved}",
f"Hard blocked: {self.hard_blocked}",
f"Soft blocked / confirmation required: {self.soft_blocked or self.requires_confirmation}",
]
if self.block_reason:
lines.append(f"Block reason: {self.block_reason}")
for d in self.decisions:
lines.append(f" [{d.result.value}] {d.guardrail_name}: {d.message}")
return "\n".join(lines)


# ─────────────────────────────────────────────
# Base Guardrail
# ─────────────────────────────────────────────

class Guardrail(ABC):
"""Base class for all guardrails."""

@property
@abstractmethod
def name(self) -> str:
pass

@abstractmethod
def check(
self,
tool_name: str,
data: Any,
context: Optional[Dict] = None,
) -> GuardrailDecision:
pass


# ─────────────────────────────────────────────
# Pre-Action Guardrails
# ─────────────────────────────────────────────

class SchemaValidator(Guardrail):
"""
Validate tool inputs against expected schemas.
"""

def __init__(self, schemas: Dict[str, Dict[str, type]]):
"""
schemas: {tool_name: {param_name: expected_type}}
Example: {"delete_file": {"path": str, "confirm": bool}}
"""
self._schemas = schemas

@property
def name(self) -> str:
return "SchemaValidator"

def check(
self,
tool_name: str,
data: Any,
context: Optional[Dict] = None,
) -> GuardrailDecision:
schema = self._schemas.get(tool_name)
if not schema:
return GuardrailDecision(
result=GuardrailResult.PASS,
guardrail_name=self.name,
message=f"No schema defined for '{tool_name}' - skipping",
)

if not isinstance(data, dict):
return GuardrailDecision(
result=GuardrailResult.HARD_BLOCK,
guardrail_name=self.name,
message=f"Tool '{tool_name}' params must be a dict, got {type(data).__name__}",
)

errors = []
for param, expected_type in schema.items():
if param not in data:
errors.append(f"Missing required param '{param}'")
elif not isinstance(data[param], expected_type):
actual_type = type(data[param]).__name__
errors.append(
f"Param '{param}' expected {expected_type.__name__}, got {actual_type}"
)

# Check for null bytes and control characters in string params
for k, v in data.items():
if isinstance(v, str):
if '\x00' in v:
errors.append(f"Null byte detected in param '{k}'")
if any(ord(c) < 8 for c in v):
errors.append(f"Control characters detected in param '{k}'")

if errors:
return GuardrailDecision(
result=GuardrailResult.HARD_BLOCK,
guardrail_name=self.name,
message=f"Schema validation failed: {'; '.join(errors)}",
)

return GuardrailDecision(
result=GuardrailResult.PASS,
guardrail_name=self.name,
message="Schema validation passed",
)


class DenylistGuardrail(Guardrail):
"""
Block specific tools, parameter patterns, or combinations.
"""

def __init__(
self,
forbidden_tools: Optional[Set[str]] = None,
forbidden_patterns: Optional[Dict[str, List[str]]] = None,
global_forbidden_patterns: Optional[List[str]] = None,
):
self._forbidden_tools = forbidden_tools or set()
self._forbidden_patterns = forbidden_patterns or {}
self._global_patterns = [
re.compile(p, re.IGNORECASE | re.DOTALL)
for p in (global_forbidden_patterns or [])
]
self._tool_patterns = {
tool: [re.compile(p, re.IGNORECASE) for p in patterns]
for tool, patterns in (forbidden_patterns or {}).items()
}

@property
def name(self) -> str:
return "DenylistGuardrail"

def check(
self,
tool_name: str,
data: Any,
context: Optional[Dict] = None,
) -> GuardrailDecision:
# 1. Forbidden tool
if tool_name in self._forbidden_tools:
return GuardrailDecision(
result=GuardrailResult.HARD_BLOCK,
guardrail_name=self.name,
message=f"Tool '{tool_name}' is on the denylist",
)

param_str = json.dumps(data, default=str) if data else ""

# 2. Global forbidden patterns (apply to any tool)
for pattern in self._global_patterns:
if pattern.search(param_str):
return GuardrailDecision(
result=GuardrailResult.HARD_BLOCK,
guardrail_name=self.name,
message=(
f"Global forbidden pattern matched in '{tool_name}' params: "
f"'{pattern.pattern[:40]}'"
),
)

# 3. Tool-specific forbidden patterns
tool_specific = self._tool_patterns.get(tool_name, [])
for pattern in tool_specific:
if pattern.search(param_str):
return GuardrailDecision(
result=GuardrailResult.HARD_BLOCK,
guardrail_name=self.name,
message=(
f"Tool-specific forbidden pattern matched in '{tool_name}': "
f"'{pattern.pattern[:40]}'"
),
)

return GuardrailDecision(
result=GuardrailResult.PASS,
guardrail_name=self.name,
message="No denylist matches",
)


class AllowlistGuardrail(Guardrail):
"""
For high-privilege tools, only allow if action matches an allowlist.
"""

def __init__(
self,
high_privilege_tools: Set[str],
allowed_patterns: Dict[str, List[str]],
):
"""
high_privilege_tools: tools that require explicit allowlist match
allowed_patterns: {tool_name: [regex patterns that are allowed]}
"""
self._high_privilege_tools = high_privilege_tools
self._allowed_patterns = {
tool: [re.compile(p, re.IGNORECASE) for p in patterns]
for tool, patterns in allowed_patterns.items()
}

@property
def name(self) -> str:
return "AllowlistGuardrail"

def check(
self,
tool_name: str,
data: Any,
context: Optional[Dict] = None,
) -> GuardrailDecision:
if tool_name not in self._high_privilege_tools:
return GuardrailDecision(
result=GuardrailResult.PASS,
guardrail_name=self.name,
message=f"'{tool_name}' is not a high-privilege tool - no allowlist check",
)

allowed = self._allowed_patterns.get(tool_name, [])
if not allowed:
return GuardrailDecision(
result=GuardrailResult.HARD_BLOCK,
guardrail_name=self.name,
message=(
f"Tool '{tool_name}' is high-privilege with empty allowlist - blocked"
),
)

param_str = json.dumps(data, default=str) if data else ""
for pattern in allowed:
if pattern.search(param_str):
return GuardrailDecision(
result=GuardrailResult.PASS,
guardrail_name=self.name,
message=f"Allowlist match for '{tool_name}'",
)

return GuardrailDecision(
result=GuardrailResult.HARD_BLOCK,
guardrail_name=self.name,
message=(
f"Tool '{tool_name}' has no matching allowlist pattern. "
"Action not explicitly permitted."
),
)


class RateLimitGuardrail(Guardrail):
"""
Multi-granularity rate limiting: per-second, per-minute, per-session.
"""

def __init__(
self,
limits: Dict[str, Dict[str, int]],
global_session_limit: int = 1000,
):
"""
limits: {
tool_name: {
"per_second": int,
"per_minute": int,
"per_session": int,
}
}
"""
self._limits = limits
self._global_session_limit = global_session_limit
self._call_times: Dict[str, deque] = defaultdict(deque)
self._session_counts: Dict[str, int] = defaultdict(int)
self._global_session_count: int = 0

@property
def name(self) -> str:
return "RateLimitGuardrail"

def check(
self,
tool_name: str,
data: Any,
context: Optional[Dict] = None,
) -> GuardrailDecision:
now = time.time()
tool_limits = self._limits.get(tool_name, {})
times = self._call_times[tool_name]

# Prune times older than 60 seconds
while times and now - times[0] > 60:
times.popleft()

# Per-second check
per_second_limit = tool_limits.get("per_second", 100)
recent_1s = sum(1 for t in times if now - t < 1)
if recent_1s >= per_second_limit:
return GuardrailDecision(
result=GuardrailResult.HARD_BLOCK,
guardrail_name=self.name,
message=(
f"Rate limit exceeded: '{tool_name}' called {recent_1s} times "
f"in last second (limit: {per_second_limit})"
),
)

# Per-minute check
per_minute_limit = tool_limits.get("per_minute", 1000)
if len(times) >= per_minute_limit:
return GuardrailDecision(
result=GuardrailResult.HARD_BLOCK,
guardrail_name=self.name,
message=(
f"Rate limit exceeded: '{tool_name}' called {len(times)} times "
f"in last minute (limit: {per_minute_limit})"
),
)

# Per-session check
session_count = self._session_counts[tool_name]
per_session_limit = tool_limits.get("per_session", 10000)
if session_count >= per_session_limit:
return GuardrailDecision(
result=GuardrailResult.HARD_BLOCK,
guardrail_name=self.name,
message=(
f"Session limit exceeded: '{tool_name}' called {session_count} times "
f"this session (limit: {per_session_limit})"
),
)

# Global session check
if self._global_session_count >= self._global_session_limit:
return GuardrailDecision(
result=GuardrailResult.HARD_BLOCK,
guardrail_name=self.name,
message=(
f"Global session limit exceeded: {self._global_session_count} "
f"total tool calls (limit: {self._global_session_limit})"
),
)

# Record this call
times.append(now)
self._session_counts[tool_name] += 1
self._global_session_count += 1

return GuardrailDecision(
result=GuardrailResult.PASS,
guardrail_name=self.name,
message=(
f"Rate check passed: {len(times)}/min, "
f"{session_count + 1} session calls for '{tool_name}'"
),
)


class ConfirmationGuardrail(Guardrail):
"""
Soft intervention: require user confirmation for high-risk actions.
"""

def __init__(
self,
high_risk_tools: Set[str],
confirmation_fn: Optional[Callable[[str, str, Dict], bool]] = None,
):
self._high_risk_tools = high_risk_tools
self._confirmation_fn = confirmation_fn
self._confirmed_cache: Set[str] = set()

@property
def name(self) -> str:
return "ConfirmationGuardrail"

def _default_confirm(self, tool_name: str, preview: str, params: Dict) -> bool:
print(f"\n[CONFIRMATION REQUIRED]\n{preview}")
resp = input("Approve this action? [yes/no]: ").strip().lower()
return resp in ("yes", "y")

def check(
self,
tool_name: str,
data: Any,
context: Optional[Dict] = None,
) -> GuardrailDecision:
if tool_name not in self._high_risk_tools:
return GuardrailDecision(
result=GuardrailResult.PASS,
guardrail_name=self.name,
message=f"'{tool_name}' does not require confirmation",
)

# Generate a cache key for this specific call
cache_key = f"{tool_name}:{json.dumps(data, sort_keys=True, default=str)}"
if cache_key in self._confirmed_cache:
return GuardrailDecision(
result=GuardrailResult.PASS,
guardrail_name=self.name,
message=f"Previously confirmed action for '{tool_name}'",
)

preview = (
f"High-risk action requested:\n"
f"Tool: {tool_name}\n"
f"Parameters: {json.dumps(data, indent=2, default=str)}\n"
f"This action may be irreversible."
)

confirm_fn = self._confirmation_fn or self._default_confirm
confirmed = confirm_fn(tool_name, preview, data if isinstance(data, dict) else {})

if confirmed:
self._confirmed_cache.add(cache_key)
return GuardrailDecision(
result=GuardrailResult.PASS,
guardrail_name=self.name,
message=f"User confirmed action for '{tool_name}'",
requires_confirmation=True,
)
else:
return GuardrailDecision(
result=GuardrailResult.SOFT_BLOCK,
guardrail_name=self.name,
message=f"User declined action for '{tool_name}'",
requires_confirmation=True,
)


# ─────────────────────────────────────────────
# Post-Action Guardrails
# ─────────────────────────────────────────────

PII_PATTERNS = {
"email": re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'),
"ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
"credit_card": re.compile(r'\b(?:\d{4}[-\s]?){3}\d{4}\b'),
"phone_us": re.compile(r'\b(?:\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b'),
"api_key": re.compile(r'(?:api[_-]?key|bearer|token)[=:\s]+[a-zA-Z0-9_\-\.]{20,}', re.IGNORECASE),
}


class OutputPIIFilter(Guardrail):
"""
Filter or flag PII/secrets in tool outputs before they reach the LLM.
"""

def __init__(self, redact: bool = True, max_emails: int = 3):
self._redact = redact
self._max_emails = max_emails

@property
def name(self) -> str:
return "OutputPIIFilter"

def check(
self,
tool_name: str,
data: Any,
context: Optional[Dict] = None,
) -> GuardrailDecision:
output_str = str(data) if not isinstance(data, str) else data
found = {}

for pii_type, pattern in PII_PATTERNS.items():
matches = pattern.findall(output_str)
if matches:
found[pii_type] = matches[:3] # Show sample, not all

if not found:
return GuardrailDecision(
result=GuardrailResult.PASS,
guardrail_name=self.name,
message="No PII detected in output",
)

return GuardrailDecision(
result=GuardrailResult.SOFT_BLOCK,
guardrail_name=self.name,
message=f"PII detected in tool output from '{tool_name}': {list(found.keys())}",
details={"pii_types": found},
requires_confirmation=False,
)


class OutputBoundsGuardrail(Guardrail):
"""
Check tool outputs are within expected bounds (size, type, schema).
"""

def __init__(
self,
max_output_size: int = 100_000,
expected_types: Optional[Dict[str, type]] = None,
):
self._max_output_size = max_output_size
self._expected_types = expected_types or {}

@property
def name(self) -> str:
return "OutputBoundsGuardrail"

def check(
self,
tool_name: str,
data: Any,
context: Optional[Dict] = None,
) -> GuardrailDecision:
output_str = json.dumps(data, default=str)

if len(output_str) > self._max_output_size:
return GuardrailDecision(
result=GuardrailResult.SOFT_BLOCK,
guardrail_name=self.name,
message=(
f"Tool '{tool_name}' output is {len(output_str)} chars, "
f"exceeding limit of {self._max_output_size}"
),
)

expected_type = self._expected_types.get(tool_name)
if expected_type and not isinstance(data, expected_type):
return GuardrailDecision(
result=GuardrailResult.SOFT_BLOCK,
guardrail_name=self.name,
message=(
f"Tool '{tool_name}' returned {type(data).__name__}, "
f"expected {expected_type.__name__}"
),
)

return GuardrailDecision(
result=GuardrailResult.PASS,
guardrail_name=self.name,
message="Output within bounds",
)


# ─────────────────────────────────────────────
# Pipeline Orchestrator
# ─────────────────────────────────────────────

class GuardrailPipeline:
"""
Composable pipeline of guardrails for pre- and post-action validation.

Usage:
pipeline = GuardrailPipeline(
pre_guardrails=[schema_validator, denylist, rate_limiter],
post_guardrails=[pii_filter, bounds_check],
)

pre_decision = pipeline.pre_check("delete_file", {"path": "/data/file.csv"})
if not pre_decision.approved:
print(pre_decision.block_reason)
return

result = execute_tool("delete_file", {"path": "/data/file.csv"})

post_decision = pipeline.post_check("delete_file", result)
if not post_decision.approved:
result = "[OUTPUT_FILTERED]"
"""

def __init__(
self,
pre_guardrails: Optional[List[Guardrail]] = None,
post_guardrails: Optional[List[Guardrail]] = None,
audit_log: Optional[List[Dict]] = None,
stop_on_first_hard_block: bool = True,
):
self.pre_guardrails = pre_guardrails or []
self.post_guardrails = post_guardrails or []
self.audit_log = audit_log if audit_log is not None else []
self.stop_on_first_hard_block = stop_on_first_hard_block

def _run_pipeline(
self,
guardrails: List[Guardrail],
tool_name: str,
data: Any,
context: Optional[Dict] = None,
) -> PipelineDecision:
decisions = []
hard_blocked = False
soft_blocked = False
requires_confirmation = False
block_reason = None

for guardrail in guardrails:
try:
decision = guardrail.check(tool_name, data, context)
decisions.append(decision)

if decision.result == GuardrailResult.HARD_BLOCK:
hard_blocked = True
block_reason = decision.message
if self.stop_on_first_hard_block:
break
elif decision.result == GuardrailResult.SOFT_BLOCK:
soft_blocked = True
if not block_reason:
block_reason = decision.message

if decision.requires_confirmation:
requires_confirmation = True

except Exception as e:
logger.error(f"Guardrail {guardrail.name} threw exception: {e}")
# Fail safe: treat guardrail failure as soft block
decisions.append(GuardrailDecision(
result=GuardrailResult.SOFT_BLOCK,
guardrail_name=guardrail.name,
message=f"Guardrail exception (fail-safe soft block): {e}",
))
soft_blocked = True

approved = not hard_blocked and not soft_blocked

return PipelineDecision(
approved=approved,
hard_blocked=hard_blocked,
soft_blocked=soft_blocked,
requires_confirmation=requires_confirmation,
decisions=decisions,
block_reason=block_reason,
)

def pre_check(
self,
tool_name: str,
params: Dict[str, Any],
context: Optional[Dict] = None,
) -> PipelineDecision:
"""Run pre-action guardrails."""
decision = self._run_pipeline(
self.pre_guardrails, tool_name, params, context
)
self._log("pre_action", tool_name, params, decision)
return decision

def post_check(
self,
tool_name: str,
output: Any,
context: Optional[Dict] = None,
) -> PipelineDecision:
"""Run post-action guardrails on tool output."""
decision = self._run_pipeline(
self.post_guardrails, tool_name, output, context
)
self._log("post_action", tool_name, output, decision)
return decision

def _log(
self,
phase: str,
tool_name: str,
data: Any,
decision: PipelineDecision,
) -> None:
entry = {
"timestamp": time.time(),
"phase": phase,
"tool": tool_name,
"approved": decision.approved,
"hard_blocked": decision.hard_blocked,
"block_reason": decision.block_reason,
"guardrails_run": [d.guardrail_name for d in decision.decisions],
}
self.audit_log.append(entry)
if not decision.approved:
logger.warning(f"[{phase}] {tool_name} blocked: {decision.block_reason}")
else:
logger.debug(f"[{phase}] {tool_name} approved")


# ─────────────────────────────────────────────
# Factory: Build a Standard Guardrail Pipeline
# ─────────────────────────────────────────────

def build_standard_pipeline(
task_type: str = "general",
confirmation_fn: Optional[Callable] = None,
) -> GuardrailPipeline:
"""
Build a standard guardrail pipeline for common agent tasks.

task_type: "file_management" | "email" | "shell" | "database" | "general"
"""
SHELL_DENYLIST = [
r"rm\s+-rf\s+/",
r"rm\s+-rf\s+~",
r"chmod\s+777\s+/",
r":\(\)\s*\{", # fork bomb
r"curl\s+\S+\s*\|\s*(?:sh|bash)",
r"wget\s+\S+\s*-O\s*-\s*\|\s*(?:sh|bash)",
r"nc\s+.*-e\s+/bin", # reverse shell
r"python\s+-c\s+['\"]import\s+socket", # python shell
]

schema_by_task = {
"file_management": {
"read_file": {"path": str},
"write_file": {"path": str, "content": str},
"delete_file": {"path": str},
"move_file": {"src": str, "dst": str},
},
"shell": {
"run_shell": {"command": str},
},
"email": {
"send_email": {"to": str, "subject": str, "body": str},
"read_email": {"mailbox": str},
},
"database": {
"query_database": {"query": str},
"update_database": {"query": str, "params": dict},
},
"general": {},
}

schemas = schema_by_task.get(task_type, {})

rate_limits = {
"run_shell": {"per_second": 2, "per_minute": 20, "per_session": 100},
"delete_file": {"per_second": 5, "per_minute": 30, "per_session": 200},
"send_email": {"per_second": 1, "per_minute": 5, "per_session": 20},
"update_database": {"per_second": 5, "per_minute": 50, "per_session": 500},
"query_database": {"per_second": 10, "per_minute": 100, "per_session": 1000},
}

high_risk_tools = {"delete_file", "run_shell", "send_email", "update_database"}

pre_guardrails = [
SchemaValidator(schemas),
DenylistGuardrail(
global_forbidden_patterns=SHELL_DENYLIST,
forbidden_patterns={
"send_email": [r"@attacker\.com", r"@evil\.", r"@malicious\."],
"query_database": [r"DROP\s+TABLE", r"TRUNCATE\s+TABLE", r"DELETE\s+FROM\s+users"],
},
),
RateLimitGuardrail(
limits=rate_limits,
global_session_limit=2000,
),
ConfirmationGuardrail(
high_risk_tools=high_risk_tools,
confirmation_fn=confirmation_fn,
),
]

post_guardrails = [
OutputPIIFilter(redact=True),
OutputBoundsGuardrail(max_output_size=50_000),
]

return GuardrailPipeline(
pre_guardrails=pre_guardrails,
post_guardrails=post_guardrails,
)


# ─────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────

if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)

pipeline = build_standard_pipeline(
task_type="file_management",
confirmation_fn=lambda name, preview, params: True, # Auto-approve for demo
)

test_cases = [
("read_file", {"path": "/data/report.csv"}),
("delete_file", {"path": "/important.db"}),
("run_shell", {"command": "rm -rf / --no-preserve-root"}),
("send_email", {"to": "[email protected]", "subject": "Data", "body": "Here"}),
("query_database", {"query": "SELECT * FROM users WHERE id=1"}),
("query_database", {"query": "DROP TABLE users"}),
]

for tool, params in test_cases:
print(f"\n{'='*50}")
print(f"Testing: {tool}({params})")
decision = pipeline.pre_check(tool, params)
print(decision.summary())

print(f"\n\nAudit log entries: {len(pipeline.audit_log)}")
blocked = [e for e in pipeline.audit_log if not e["approved"]]
print(f"Blocked actions: {len(blocked)}")

Production Notes

:::warning NeMo Guardrails and Llama Guard NVIDIA's NeMo Guardrails and Meta's Llama Guard are LLM-based guardrail frameworks that use trained models to classify inputs and outputs. They are more semantically sophisticated than pattern-based approaches but add latency and cost. Use them when semantic context matters (e.g., "Is this user message requesting harmful information?"). For structural safety (denylist, rate limiting, schema validation), use the programmatic approach - it is faster and more deterministic. :::

:::danger Never Rely on a Single Guardrail A single guardrail is a single point of failure. Use multiple independent layers: pattern matching catches known attacks, schema validation catches malformed inputs, rate limiting catches loops, confirmation gates catch irreversible surprises. Defense in depth means each layer provides independent protection. :::


Interview Questions

Q1: What is the difference between a pre-action and post-action guardrail, and when do you need each?

A: Pre-action guardrails validate before a tool executes - they can prevent harmful actions entirely. Post-action guardrails validate the output of a tool before it is returned to the agent - they catch sensitive data leakage (PII, secrets in database results), unexpected output sizes, and injection patterns in retrieved content. You need pre-action guardrails for every tool call to enforce safety constraints before damage occurs. You need post-action guardrails when your tools can return sensitive or potentially dangerous content that should not be passed back to the LLM unfiltered. Both are necessary; pre-action has the advantage of preventing harm, post-action has the advantage of filtering output the agent will reason about.

Q2: How do you design a guardrail system that is not brittle when agent capabilities expand?

A: The answer is composability. Design guardrails as small, single-responsibility validators that can be combined into pipelines. Adding a new tool means adding a schema entry, rate limit entry, and optionally a denylist or allowlist entry - not modifying the guardrail logic itself. Use a registry-based approach where tool specifications (schemas, rate limits, risk levels) are data, not code. This means adding a new tool is a configuration change, not a code change. Also: fail-safe by default - if a guardrail throws an exception, treat it as a soft block rather than a pass. This prevents guardrail failures from creating security holes.

Q3: How do you prevent a denylist from being defeated by encoding attacks?

A: Normalize before matching. Before running denylist patterns, apply: (1) Unicode normalization (NFKC) to collapse confusable characters; (2) homoglyph replacement for common lookalike characters; (3) URL decoding for encoded characters; (4) base64 decoding for encoded payloads (detect base64 strings and decode them before checking). Also: match against normalized versions but preserve the original for logging so you have evidence of the attack attempt. Remember that normalization is a layer, not a complete solution - there are encoding attacks that evade normalization. Use multiple layers including semantic checking for high-risk tools.

Q4: What makes a confirmation gate meaningful versus rubber-stamp approval?

A: Meaningful confirmation requires: (1) specific, actionable information - show the user exactly what will happen, with enough context to evaluate the risk; (2) appropriate framing - make the risk visible without being alarmist for low-risk confirmations; (3) reasonable cognitive load - a single confirmation per high-risk action, not approval for every step; (4) a clear cancel path - the default should be "cancel" not "approve"; (5) a timeout that cancels rather than approves; (6) monitoring - track approval rates and investigate if > 90% are approved without modification. Rubber-stamp approval happens when (a) users are shown too many confirmations (fatigue), (b) confirmations are poorly explained, or (c) the default is to approve.

Q5: How would you implement semantic validation - checking whether a tool call is coherent with the stated task - without adding a full LLM call to every tool invocation?

A: Tiered approach: (1) Fast rule-based semantic check - map task types to expected tool sets (task "summarize emails" → expected tools: [read_email, format_text]; any call to send_email is semantically anomalous). This is O(1) and adds < 1ms. (2) Statistical anomaly detection - compare current tool call sequence against a distribution of normal sequences for this task type, learned from historical runs. Flag sequences that are more than 2 standard deviations from normal. This is O(n) in sequence length. (3) LLM semantic validation - use a fast, cheap model (Haiku) only when rules and statistics both flag uncertainty. Cache results for identical (task, tool_call) pairs. This approach means the vast majority of calls are validated in microseconds by rules; LLM validation is reserved for novel or anomalous situations.

© 2026 EngineersOfAI. All rights reserved.