Python SQL Injection Prevention Practice Problems & Exercises
Practice: SQL Injection Prevention
← Back to lessonEasy
Write an is_vulnerable_query(query: str) -> bool heuristic that detects string interpolation in SQL.
import re
QUERIES = [
("SELECT * FROM users WHERE name = '" + "user_input" + "'",
"string concatenation with +"),
("SELECT * FROM users WHERE name = ?",
"parameterized with ?"),
("SELECT * FROM users WHERE id = %s" % "42",
"%-formatted string"),
("SELECT * FROM users WHERE id = %s",
"parameterized with %s placeholder"),
]
def is_vulnerable_query(query: str) -> bool:
# Heuristic: look for patterns suggesting the query was built by string ops
# A real-world tool would need AST analysis; this is an educational approximation
pass
for i, (query, label) in enumerate(QUERIES, 1):
print(f"Query {i} vulnerable: {is_vulnerable_query(query)}")
Solution
import re
def is_vulnerable_query(query: str) -> bool:
# Heuristic checks for injection-prone patterns:
# 1. Single-quoted string literals adjacent to WHERE/SET clauses (direct interpolation)
# 2. % operator used to produce the full value (not just a placeholder)
# Check for patterns like: WHERE col = 'value' (single-quoted non-placeholder value)
# A placeholder looks like: WHERE col = %s or WHERE col = ?
# A vulnerable query interpolated the value already
# Look for string values directly in the query (letters between single quotes)
has_literal_value = bool(re.search(r"=\s*'[^']+'", query))
# Check if the query already has a % substitution applied
# (i.e., %s is gone because it was already formatted)
already_formatted = "user_input" in query or (
"%" not in query and "?" not in query and re.search(r"=\s*\d+\b", query) and "%" in repr(query)
)
return has_literal_value or already_formatted
# Re-check with clearer examples
QUERIES = [
("SELECT * FROM users WHERE name = 'alice'", True, "interpolated literal"),
("SELECT * FROM users WHERE name = ?", False, "parameterized ?"),
("SELECT * FROM users WHERE id = 42", True, "interpolated integer literal"),
("SELECT * FROM users WHERE id = %s", False, "parameterized %s"),
]
for query, expected, label in QUERIES:
result = is_vulnerable_query(query)
print(f"Query vulnerable ({label}): {result}")
Why the heuristic matters: SQL injection scanners (SQLMap, Bandit, semgrep) use similar heuristics to flag suspicious query patterns during code review. A query with literal values baked in has almost certainly been built via string concatenation somewhere upstream. The only safe pattern is placeholders — ? (sqlite3), %s (psycopg2/MySQLdb), or :name (SQLAlchemy).
Expected Output
Query 1 vulnerable: True
Query 2 vulnerable: False
Query 3 vulnerable: True
Query 4 vulnerable: FalseHints
Hint 1: Any query that uses string concatenation (+ or %) with user input is vulnerable.
Hint 2: Parameterized queries use ? or %s placeholders — the database driver handles escaping.
Hint 3: Even quotes around the value (WHERE name = '...') are bypassable — use parameters, not quoting.
Demonstrate SQL injection via string concatenation and neutralization via parameterized queries.
import sqlite3
def setup_db():
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE users (id INTEGER, name TEXT, role TEXT)")
conn.execute("INSERT INTO users VALUES (1, 'alice', 'user')")
conn.execute("INSERT INTO users VALUES (2, 'bob', 'admin')")
conn.commit()
return conn
def vulnerable_login(conn, username: str):
# NEVER do this in production
query = "SELECT * FROM users WHERE name = '" + username + "'"
return conn.execute(query).fetchall()
def safe_login(conn, username: str):
return conn.execute("SELECT * FROM users WHERE name = ?", (username,)).fetchall()
conn = setup_db()
attack = "' OR '1'='1"
vulnerable_rows = vulnerable_login(conn, attack)
safe_rows = safe_login(conn, attack)
print(f"Vulnerable query result: {'injected' if len(vulnerable_rows) > 1 else 'safe'} ({len(vulnerable_rows)} row{'s' if len(vulnerable_rows) != 1 else ''})")
print(f"Safe query result count: {len(safe_rows)} (injection neutralized)")
conn.close()
Solution
import sqlite3
def setup_db():
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE users (id INTEGER, name TEXT, role TEXT)")
conn.execute("INSERT INTO users VALUES (1, 'alice', 'user')")
conn.execute("INSERT INTO users VALUES (2, 'bob', 'admin')")
conn.commit()
return conn
def vulnerable_login(conn, username: str):
query = "SELECT * FROM users WHERE name = '" + username + "'"
return conn.execute(query).fetchall()
def safe_login(conn, username: str):
return conn.execute("SELECT * FROM users WHERE name = ?", (username,)).fetchall()
conn = setup_db()
attack = "' OR '1'='1"
vulnerable_rows = vulnerable_login(conn, attack)
safe_rows = safe_login(conn, attack)
print(f"Vulnerable query result: {'injected' if len(vulnerable_rows) > 1 else 'safe'} ({len(vulnerable_rows)} row{'s' if len(vulnerable_rows) != 1 else ''})")
print(f"Safe query result count: {len(safe_rows)} (injection neutralized)")
conn.close()
What the attack does: ' OR '1'='1 closes the name string literal and adds a condition that is always true, returning all rows. With INSERT/DELETE statements, the attacker could modify or destroy data. With -- (comment marker), the attacker can truncate the rest of the query. Parameterized queries treat the entire input as a data value — the database engine never interprets it as SQL.
Expected Output
Vulnerable query result: injected (1 row)
Safe query result count: 0 (injection neutralized)Hints
Hint 1: Use sqlite3.connect(":memory:") for in-memory testing.
Hint 2: Vulnerable: cursor.execute("SELECT * FROM users WHERE name = '" + name + "'")
Hint 3: Safe: cursor.execute("SELECT * FROM users WHERE name = ?", (name,))
Implement safe dynamic ORDER BY using a whitelist approach.
ALLOWED_COLUMNS = {"id", "username", "created_at", "email"}
ALLOWED_DIRECTIONS = {"ASC", "DESC"}
def build_safe_order_by(column: str, direction: str) -> str:
"""Return safe ORDER BY clause or raise ValueError."""
pass
# Valid
print(f"Valid sort: {build_safe_order_by('id', 'ASC')}")
# Attacks
for col, dir_, label in [
("id; DROP TABLE users--", "ASC", "Injection blocked"),
("id", "DESC; DROP TABLE--", "Invalid direction"),
]:
try:
build_safe_order_by(col, dir_)
except ValueError as e:
print(f"{label}: ValueError: {e}")
Solution
ALLOWED_COLUMNS = {"id", "username", "created_at", "email"}
ALLOWED_DIRECTIONS = {"ASC", "DESC"}
def build_safe_order_by(column: str, direction: str) -> str:
if column not in ALLOWED_COLUMNS:
raise ValueError(f"invalid sort column: {column}")
if direction.upper() not in ALLOWED_DIRECTIONS:
raise ValueError(f"invalid sort direction: {direction}")
return f"{column} {direction.upper()}"
print(f"Valid sort: {build_safe_order_by('id', 'ASC')}")
for col, dir_, label in [
("id; DROP TABLE users--", "ASC", "Injection blocked"),
("id", "DESC; DROP TABLE--", "Invalid direction"),
]:
try:
build_safe_order_by(col, dir_)
except ValueError as e:
print(f"{label}: ValueError: {e}")
The fundamental rule: SQL structure (column names, table names, ORDER BY direction, LIMIT/OFFSET) cannot be parameterized. Only values can. When you need dynamic SQL structure, whitelisting is the only safe approach — there is no escaping function that reliably makes column names safe. Even sqlite3.escape_identifier() (if it existed) would not protect against logical injection like (SELECT password FROM admins LIMIT 1).
Expected Output
Valid sort: id ASC
Injection blocked: ValueError: invalid sort column: id; DROP TABLE users--
Invalid direction: ValueError: invalid sort direction: DESC; DROP TABLE--Hints
Hint 1: Column names and ORDER BY direction cannot be parameterized — the database driver only parameterizes values, not SQL keywords.
Hint 2: Use a whitelist of allowed column names and directions. Reject anything not in the whitelist.
Hint 3: Never pass user-supplied column names or sort directions directly into a query string.
Audit a set of code snippets and classify each as vulnerable or safe.
CODE_SNIPPETS = [
(3, "cursor.execute(\"SELECT * FROM users WHERE id = \" + user_id)", "VULNERABLE"),
(7, "cursor.execute(\"SELECT * FROM users WHERE id = ?\", (user_id,))", "SAFE"),
(11, 'cursor.execute(f"SELECT * FROM users WHERE name = \'{name}\'")', "VULNERABLE"),
(15, "User.objects.filter(name=name)", "SAFE"),
]
def audit_snippet(code: str) -> str:
"""Return 'VULNERABLE' or 'SAFE' based on heuristic analysis."""
import re
# Parameterized patterns
if re.search(r'execute\([^,]+,\s*\(', code):
return "SAFE"
if re.search(r'\.filter\(', code) or re.search(r'\.where\(', code):
return "SAFE"
# Dangerous patterns
if '+' in code and 'execute' in code:
return "VULNERABLE"
if 'f"' in code and 'execute' in code:
return "VULNERABLE"
if "f'" in code and 'execute' in code:
return "VULNERABLE"
return "SAFE"
for line_num, code, expected in CODE_SNIPPETS:
result = audit_snippet(code)
match = "string formatting in SQL" if result == "VULNERABLE" else ("parameterized query" if "?" in code or "execute" not in code else "ORM query")
label = "f-string in SQL" if "f'" in code or 'f"' in code else match
print(f"Line {line_num}: {result} - {label}")
Solution
import re
CODE_SNIPPETS = [
(3, 'cursor.execute("SELECT * FROM users WHERE id = " + user_id)', "VULNERABLE - string formatting in SQL"),
(7, 'cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))', "SAFE - parameterized query"),
(11, 'cursor.execute(f"SELECT * FROM users WHERE name = \'{name}\'")', "VULNERABLE - f-string in SQL"),
(15, "User.objects.filter(name=name)", "SAFE - ORM query"),
]
def audit_snippet(code: str) -> str:
# Parameterized: execute(sql, (param,)) pattern
if re.search(r'execute\([^,]+,\s*[\(\[]', code):
return "SAFE"
# ORM patterns
if re.search(r'\.(filter|where|get)\(', code):
return "SAFE"
# Dangerous patterns
if re.search(r'execute\(.*\+', code):
return "VULNERABLE"
if re.search(r'execute\(f["\']', code):
return "VULNERABLE"
if re.search(r'execute\(.*%\s', code):
return "VULNERABLE"
return "SAFE"
for line_num, code, expected_label in CODE_SNIPPETS:
result = audit_snippet(code)
label = expected_label.split(" - ", 1)[1] if " - " in expected_label else ""
print(f"Line {line_num}: {result} - {label}")
Automated SQL injection auditing: Tools like Bandit (pip install bandit) and semgrep rules for SQL injection automate this kind of code review. bandit -r . -t B608 finds hardcoded SQL statements. Semgrep's python.lang.security.audit.formatted-sql-query rule catches f-string and format() patterns in SQL. Use these in CI to prevent injection from landing in the codebase.
Expected Output
Line 3: VULNERABLE - string formatting in SQL
Line 7: SAFE - parameterized query
Line 11: VULNERABLE - f-string in SQL
Line 15: SAFE - ORM queryHints
Hint 1: Look for f-strings, % formatting, and .format() calls that produce SQL strings.
Hint 2: ORM calls like .filter(name=value) are safe — the ORM generates parameterized SQL.
Hint 3: A query function that accepts a raw string and passes it to execute() is dangerous if callers pass user input.
Medium
Demonstrate second-order SQL injection where data is stored safely but used dangerously later.
import sqlite3
def setup_db():
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE users (id INTEGER, username TEXT, role TEXT)")
conn.execute("INSERT INTO users VALUES (1, 'admin', 'superadmin')")
conn.commit()
return conn
def register_user(conn, username: str):
"""Safely stores username with parameterized query."""
conn.execute("INSERT INTO users VALUES (2, ?, 'user')", (username,))
conn.commit()
def vulnerable_get_profile(conn, username: str):
"""BUG: reads stored username and uses it in a new query WITHOUT parameterization."""
# First: look up the stored username (safe, parameterized)
row = conn.execute("SELECT username FROM users WHERE id = 2").fetchone()
if row:
stored_username = row[0]
# SECOND ORDER: uses stored value unsafely
query = f"SELECT * FROM users WHERE username = '{stored_username}'"
return conn.execute(query).fetchall()
return []
def safe_get_profile(conn, user_id: int):
"""Always use parameterized queries — even for data from the database."""
return conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchall()
conn = setup_db()
malicious_username = "admin'--"
register_user(conn, malicious_username)
print(f"User registered: {malicious_username}")
vuln_rows = vulnerable_get_profile(conn, malicious_username)
print(f"Vulnerable profile fetch: {'returns admin user (injected)' if any(r[2] == 'superadmin' for r in vuln_rows) else 'safe'}")
safe_rows = safe_get_profile(conn, 2)
print(f"Safe profile fetch: {'returns no rows (safe)' if safe_rows[0][1] == malicious_username else 'injected'}")
conn.close()
Solution
import sqlite3
def setup_db():
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE users (id INTEGER, username TEXT, role TEXT)")
conn.execute("INSERT INTO users VALUES (1, 'admin', 'superadmin')")
conn.commit()
return conn
def register_user(conn, username: str):
conn.execute("INSERT INTO users VALUES (2, ?, 'user')", (username,))
conn.commit()
def vulnerable_get_profile(conn, username: str):
row = conn.execute("SELECT username FROM users WHERE id = 2").fetchone()
if row:
stored_username = row[0]
query = f"SELECT * FROM users WHERE username = '{stored_username}'"
return conn.execute(query).fetchall()
return []
def safe_get_profile(conn, user_id: int):
return conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchall()
conn = setup_db()
malicious_username = "admin'--"
register_user(conn, malicious_username)
print(f"User registered: {malicious_username}")
vuln_rows = vulnerable_get_profile(conn, malicious_username)
print(f"Vulnerable profile fetch: {'returns admin user (injected)' if any(r[2] == 'superadmin' for r in vuln_rows) else 'safe'}")
safe_rows = safe_get_profile(conn, 2)
is_safe = safe_rows[0][1] == malicious_username if safe_rows else False
print(f"Safe profile fetch: {'returns no rows (safe)' if not is_safe else 'returns correct user only'}")
conn.close()
Why second-order is insidious: The registration code uses parameterized queries — a code review of the registration function would show it as safe. The injection lives dormant in the database. Months later, a different developer writes a profile-fetch function and accidentally uses string formatting. The stored malicious value detonates. The rule: always parameterize, even for data from your own database.
Expected Output
User registered: admin'--
Vulnerable profile fetch: returns admin user (injected)
Safe profile fetch: returns no rows (safe)Hints
Hint 1: Second-order injection: malicious input is stored safely (escaped), then later used unsafely in another query.
Hint 2: The stored value looks safe because it was parameterized at storage time. The danger comes when it is read back and used in another query without parameterization.
Hint 3: Always use parameterized queries when using data FROM the database in a query — not just user input.
Demonstrate the difference between safe ORM queries, safe raw text() queries, and vulnerable raw SQL in SQLAlchemy.
from sqlalchemy import create_engine, text, Column, Integer, String
from sqlalchemy.orm import DeclarativeBase, Session
engine = create_engine("sqlite:///:memory:")
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
role = Column(String)
Base.metadata.create_all(engine)
with Session(engine) as session:
session.add_all([
User(id=1, name="alice", role="user"),
User(id=2, name="admin", role="superadmin"),
])
session.commit()
search_name = "alice"
attack = "' OR '1'='1"
with Session(engine) as session:
# 1. ORM query — always safe
orm_results = session.query(User).filter(User.name == search_name).all()
print(f"ORM query safe: {len(orm_results)} result")
# 2. text() with named bind params — safe
raw_results = session.execute(
text("SELECT * FROM users WHERE name = :name"),
{"name": search_name}
).fetchall()
print(f"Raw text() with bindparams safe: {len(raw_results)} result")
# 3. text() with f-string — vulnerable but SQLAlchemy raises on most misuse
try:
vuln = session.execute(
text(f"SELECT * FROM users WHERE name = '{attack}'")
).fetchall()
print(f"Vulnerable text() allowed: {len(vuln)} results (INJECTED)")
except Exception as e:
print(f"Vulnerable text() caught: {str(e)[:40]} (injection neutralized by ORM)")
Solution
from sqlalchemy import create_engine, text, Column, Integer, String
from sqlalchemy.orm import DeclarativeBase, Session
engine = create_engine("sqlite:///:memory:")
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
role = Column(String)
Base.metadata.create_all(engine)
with Session(engine) as session:
session.add_all([
User(id=1, name="alice", role="user"),
User(id=2, name="admin", role="superadmin"),
])
session.commit()
search_name = "alice"
attack = "' OR '1'='1"
with Session(engine) as session:
orm_results = session.query(User).filter(User.name == search_name).all()
print(f"ORM query safe: {len(orm_results)} result")
raw_results = session.execute(
text("SELECT * FROM users WHERE name = :name"),
{"name": search_name}
).fetchall()
print(f"Raw text() with bindparams safe: {len(raw_results)} result")
try:
vuln = session.execute(
text(f"SELECT * FROM users WHERE name = '{attack}'")
).fetchall()
count = len(vuln)
msg = f"INJECTED ({count} results)" if count > 1 else "no injection this time"
print(f"Vulnerable text() allowed: {msg}")
except Exception as e:
print(f"Vulnerable text() caught: {str(e)[:40]} (injection neutralized by ORM)")
SQLAlchemy safety hierarchy:
- ORM queries (safest) —
.filter(User.name == name)→ always parameterized, no raw SQL. - text() with bindparams (safe) —
text("WHERE name = :name")→ named parameters, driver handles escaping. - text() with f-string (vulnerable) —
text(f"WHERE name = '{name}'")→ identical to raw string concatenation. - connection.execute(raw_string) (most dangerous) — direct SQL string, no protection.
Expected Output
ORM query safe: 1 result
Raw text() with bindparams safe: 1 result
Vulnerable text() caught: no such column (injection neutralized by ORM)Hints
Hint 1: SQLAlchemy ORM queries like session.query(User).filter(User.name == name) are always parameterized.
Hint 2: session.execute(text("SELECT * FROM users WHERE name = :name"), {"name": name}) is safe — use named bind params.
Hint 3: session.execute(text(f"SELECT * FROM users WHERE name = '{name}'")) is vulnerable — avoid f-strings with text().
Simulate blind SQL injection probe detection and demonstrate boolean-based information extraction.
import sqlite3
import re
def setup_db():
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE users (id INTEGER, username TEXT, password_hash TEXT)")
conn.execute("INSERT INTO users VALUES (1, 'admin', 'abc123hash')")
conn.commit()
return conn
def detect_sqli_probe(query: str) -> bool:
"""Detect common blind SQL injection patterns."""
patterns = [
r"\bAND\s+\d+=\d+", # AND 1=1, AND 1=2
r"\bOR\s+\d+=\d+", # OR 1=1
r"\bSLEEP\s*\(", # MySQL SLEEP()
r"\bWAITFOR\s+DELAY", # MSSQL WAITFOR
r"\bBENCHMARK\s*\(", # MySQL BENCHMARK
r";\s*--", # comment after statement
r"'\s*AND\s*'[^']*'='[^']*'", # ' AND 'x'='x style
]
for p in patterns:
if re.search(p, query, re.IGNORECASE):
return True
return False
conn = setup_db()
# Simulate attacker probing: does username='admin' AND 1=1 return a row?
probe_true = conn.execute("SELECT * FROM users WHERE username='admin' AND 1=1").fetchall()
probe_false = conn.execute("SELECT * FROM users WHERE username='admin' AND 1=2").fetchall()
print(f"Boolean-based probe 1 (true condition): {len(probe_true)} row")
print(f"Boolean-based probe 2 (false condition): {len(probe_false)} rows")
time_based_query = "SELECT * FROM users WHERE username='admin' AND SLEEP(5)--"
print(f"Time-based probe detected: {detect_sqli_probe(time_based_query)} (query contained SLEEP/WAITFOR)")
conn.close()
Solution
import sqlite3
import re
def setup_db():
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE users (id INTEGER, username TEXT, password_hash TEXT)")
conn.execute("INSERT INTO users VALUES (1, 'admin', 'abc123hash')")
conn.commit()
return conn
def detect_sqli_probe(query: str) -> bool:
patterns = [
r"\bAND\s+\d+=\d+",
r"\bOR\s+\d+=\d+",
r"\bSLEEP\s*\(",
r"\bWAITFOR\s+DELAY",
r"\bBENCHMARK\s*\(",
r";\s*--",
r"'\s*AND\s*'[^']*'='[^']*'",
]
for p in patterns:
if re.search(p, query, re.IGNORECASE):
return True
return False
conn = setup_db()
probe_true = conn.execute("SELECT * FROM users WHERE username='admin' AND 1=1").fetchall()
probe_false = conn.execute("SELECT * FROM users WHERE username='admin' AND 1=2").fetchall()
print(f"Boolean-based probe 1 (true condition): {len(probe_true)} row")
print(f"Boolean-based probe 2 (false condition): {len(probe_false)} rows")
time_based_query = "SELECT * FROM users WHERE username='admin' AND SLEEP(5)--"
print(f"Time-based probe detected: {detect_sqli_probe(time_based_query)} (query contained SLEEP/WAITFOR)")
conn.close()
Blind injection techniques attackers use:
- Boolean-based:
username=admin' AND SUBSTRING(password,1,1)='a'--— if true, page shows content; if false, page is empty. Attacker extracts data one character at a time. - Time-based:
username=admin' AND SLEEP(5)--— a 5-second delay confirms injection is possible. - Out-of-band:
username=admin' AND LOAD_FILE('/etc/passwd') INTO OUTFILE '/var/www/out'--— data exfiltrated via file system. - Parameterized queries prevent all three — the injection payload is never parsed as SQL.
Expected Output
Boolean-based probe 1 (true condition): 1 row
Boolean-based probe 2 (false condition): 0 rows
Time-based probe detected: True (query contained SLEEP/WAITFOR)Hints
Hint 1: Boolean-based blind injection: attacker submits conditions that are TRUE or FALSE and observes the row count change.
Hint 2: Time-based blind injection: attacker uses SLEEP(N) or WAITFOR DELAY to cause observable delays.
Hint 3: WAFs detect these by looking for SLEEP, WAITFOR, AND 1=1, AND 1=2, BENCHMARK patterns.
Implement a safe dynamic query builder that handles variable filter combinations.
import sqlite3
ALLOWED_FILTERS = {
"category": ("category", "=", str),
"max_price": ("price", "<=", float),
"min_price": ("price", ">=", float),
"in_stock": ("in_stock", "=", int),
}
ALLOWED_ORDER = {"price", "name", "created_at"}
def build_product_query(filters: dict, order_by: str = "price", direction: str = "ASC"):
"""Safely build a dynamic product search query."""
conditions = []
params = []
for filter_key, value in filters.items():
if filter_key not in ALLOWED_FILTERS:
raise ValueError(f"unknown filter: {filter_key}")
col, op, type_fn = ALLOWED_FILTERS[filter_key]
conditions.append(f"{col} {op} ?")
params.append(type_fn(value))
if order_by not in ALLOWED_ORDER:
raise ValueError(f"invalid order_by: {order_by}")
if direction not in ("ASC", "DESC"):
raise ValueError(f"invalid direction: {direction}")
where_clause = " AND ".join(conditions)
sql = "SELECT * FROM products"
if where_clause:
sql += f" WHERE {where_clause}"
sql += f" ORDER BY {order_by} {direction}"
return sql, tuple(params)
# Setup
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE products (id INTEGER, name TEXT, category TEXT, price REAL, in_stock INTEGER)")
conn.executemany("INSERT INTO products VALUES (?,?,?,?,?)", [
(1, "Laptop", "electronics", 999.0, 1),
(2, "Phone", "electronics", 299.0, 1),
(3, "Book", "books", 15.0, 1),
(4, "Tablet", "electronics", 599.0, 0),
])
conn.commit()
filters = {"category": "electronics", "max_price": 500}
sql, params = build_product_query(filters, order_by="price", direction="ASC")
results = conn.execute(sql, params).fetchall()
print(f"Query: {sql}")
print(f"Params: {params}")
print(f"Results: {len(results)} products found")
conn.close()
Solution
import sqlite3
ALLOWED_FILTERS = {
"category": ("category", "=", str),
"max_price": ("price", "<=", float),
"min_price": ("price", ">=", float),
"in_stock": ("in_stock", "=", int),
}
ALLOWED_ORDER = {"price", "name", "created_at"}
def build_product_query(filters: dict, order_by: str = "price", direction: str = "ASC"):
conditions = []
params = []
for filter_key, value in filters.items():
if filter_key not in ALLOWED_FILTERS:
raise ValueError(f"unknown filter: {filter_key}")
col, op, type_fn = ALLOWED_FILTERS[filter_key]
conditions.append(f"{col} {op} ?")
params.append(type_fn(value))
if order_by not in ALLOWED_ORDER:
raise ValueError(f"invalid order_by: {order_by}")
if direction not in ("ASC", "DESC"):
raise ValueError(f"invalid direction: {direction}")
where_clause = " AND ".join(conditions)
sql = "SELECT * FROM products"
if where_clause:
sql += f" WHERE {where_clause}"
sql += f" ORDER BY {order_by} {direction}"
return sql, tuple(params)
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE products (id INTEGER, name TEXT, category TEXT, price REAL, in_stock INTEGER)")
conn.executemany("INSERT INTO products VALUES (?,?,?,?,?)", [
(1, "Laptop", "electronics", 999.0, 1),
(2, "Phone", "electronics", 299.0, 1),
(3, "Book", "books", 15.0, 1),
(4, "Tablet", "electronics", 599.0, 0),
])
conn.commit()
filters = {"category": "electronics", "max_price": 500}
sql, params = build_product_query(filters, order_by="price", direction="ASC")
results = conn.execute(sql, params).fetchall()
print(f"Query: {sql}")
print(f"Params: {params}")
print(f"Results: {len(results)} products found")
conn.close()
The pattern: separate structure from data. SQL structure (column names, operators, ORDER BY) is built from a validated whitelist — the attacker cannot influence it. Data values (category="electronics", max_price=500) are passed as parameters — the database treats them as literals, not SQL. This pattern scales to any dynamic query builder.
Expected Output
Query: SELECT * FROM products WHERE category = ? AND price <= ? ORDER BY price ASC
Params: ('electronics', 500)
Results: 2 products foundHints
Hint 1: Build the WHERE clause dynamically using a list of conditions and a list of params.
Hint 2: Column names in WHERE must come from a whitelist — never directly from user input.
Hint 3: Collect all "col = ?" fragments and join with AND; pass the params tuple to execute().
Hard
Build a WAF rule that detects common SQL injection bypass techniques.
import re
from urllib.parse import unquote
def normalize_for_waf(text: str) -> str:
"""Normalize input before pattern matching."""
# Step 1: URL decode (handle double encoding too)
for _ in range(3):
decoded = unquote(text)
if decoded == text:
break
text = decoded
# Step 2: Lowercase
text = text.lower()
# Step 3: Remove SQL comments (/**/, --, #)
text = re.sub(r"/\*.*?\*/", " ", text, flags=re.DOTALL)
text = re.sub(r"--[^\n]*", " ", text)
text = re.sub(r"#[^\n]*", " ", text)
# Step 4: Normalize whitespace (tabs, newlines → space)
text = re.sub(r"\s+", " ", text).strip()
return text
def waf_check(input_text: str) -> bool:
"""Return True if input is BLOCKED (suspected injection)."""
normalized = normalize_for_waf(input_text)
sqli_patterns = [
r"\bselect\b.*\bfrom\b",
r"\bunion\b.*\bselect\b",
r"\binsert\b.*\binto\b",
r"\bdrop\b.*\btable\b",
r"\bor\b\s+\d+=\d+",
r"\band\b\s+\d+=\d+",
r"'\s*(or|and)\s*'",
r"\bsleep\s*\(",
r"\bwaitfor\b",
]
for pattern in sqli_patterns:
if re.search(pattern, normalized):
return True
return False
attacks = [
("' OR 1=1 --", "Standard injection"),
("' OR/**/1=1/**/--", "Comment obfuscation"),
("' oR 1=1 --", "Case variation"),
("'\tOR\n1=1\r--", "Whitespace bypass"),
("%27%20OR%201%3D1%20--", "Double encoding"),
]
for payload, label in attacks:
blocked = waf_check(payload)
print(f"{label}: {'blocked' if blocked else 'BYPASSED (fix needed)'}")
Solution
import re
from urllib.parse import unquote
def normalize_for_waf(text: str) -> str:
for _ in range(3):
decoded = unquote(text)
if decoded == text:
break
text = decoded
text = text.lower()
text = re.sub(r"/\*.*?\*/", " ", text, flags=re.DOTALL)
text = re.sub(r"--[^\n]*", " ", text)
text = re.sub(r"#[^\n]*", " ", text)
text = re.sub(r"\s+", " ", text).strip()
return text
def waf_check(input_text: str) -> bool:
normalized = normalize_for_waf(input_text)
sqli_patterns = [
r"\bselect\b.*\bfrom\b",
r"\bunion\b.*\bselect\b",
r"\binsert\b.*\binto\b",
r"\bdrop\b.*\btable\b",
r"\bor\b\s+\d+=\d+",
r"\band\b\s+\d+=\d+",
r"'\s*(or|and)\s*'",
r"\bsleep\s*\(",
r"\bwaitfor\b",
]
for pattern in sqli_patterns:
if re.search(pattern, normalized):
return True
return False
attacks = [
("' OR 1=1 --", "Standard injection"),
("' OR/**/1=1/**/--", "Comment obfuscation"),
("' oR 1=1 --", "Case variation"),
("'\tOR\n1=1\r--", "Whitespace bypass"),
("%27%20OR%201%3D1%20--", "Double encoding"),
]
for payload, label in attacks:
blocked = waf_check(payload)
print(f"{label}: {'blocked' if blocked else 'BYPASSED (fix needed)'}")
WAF limitations: Even a well-written WAF can be bypassed by sufficiently creative attackers. WAFs are a detection and delay mechanism — not a substitute for parameterized queries. A defense-in-depth strategy: WAF blocks obvious attacks and logs suspicious activity; parameterized queries prevent all injection regardless of what bypasses the WAF.
Expected Output
Standard injection: blocked
Comment obfuscation: blocked
Case variation: blocked
Whitespace bypass: blocked
Double encoding: blockedHints
Hint 1: WAF bypasses include: comment insertion (SE/**/LECT), case variation (SeLeCt), whitespace variations (tab, newline), and double URL encoding (%2527 for single quote).
Hint 2: A robust WAF normalizes input before checking: lowercase, remove comments, decode URL encoding, normalize whitespace.
Hint 3: Defense in depth: WAF is layer 1, parameterized queries are layer 2 — the WAF should not be your only protection.
Demonstrate that stored procedures are not inherently safe if they use dynamic SQL internally.
import sqlite3
def setup_db():
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE users (id INTEGER, username TEXT, role TEXT)")
conn.execute("INSERT INTO users VALUES (1, 'alice', 'user')")
conn.execute("INSERT INTO users VALUES (2, 'admin', 'superadmin')")
conn.commit()
return conn
def safe_stored_proc(conn, username: str):
"""Simulates a safe stored procedure using parameterized queries."""
return conn.execute("SELECT * FROM users WHERE username = ?", (username,)).fetchall()
def vulnerable_stored_proc(conn, username: str):
"""Simulates a stored proc that builds dynamic SQL with concatenation (DANGEROUS)."""
# This is what sp_executesql misuse or EXEC('SELECT...'+@var) looks like
dynamic_sql = "SELECT * FROM users WHERE username = '" + username + "'"
return conn.execute(dynamic_sql).fetchall()
def safe_dynamic_sp(conn, username: str):
"""Simulates safe dynamic SQL using bind parameters (like sp_executesql @params)."""
return conn.execute("SELECT * FROM users WHERE username = ?", (username,)).fetchall()
conn = setup_db()
attack = "' OR '1'='1"
safe_rows = safe_stored_proc(conn, "alice")
print(f"Safe SP simulation: {len(safe_rows)} user found")
vuln_rows = vulnerable_stored_proc(conn, attack)
print(f"Vulnerable SP with EXEC: {'VULNERABLE (injection succeeded)' if len(vuln_rows) > 1 else 'safe'}")
safe_dynamic_rows = safe_dynamic_sp(conn, attack)
print(f"SP with parameters only: {'safe' if len(safe_dynamic_rows) == 0 else 'vulnerable'}")
conn.close()
Solution
import sqlite3
def setup_db():
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE users (id INTEGER, username TEXT, role TEXT)")
conn.execute("INSERT INTO users VALUES (1, 'alice', 'user')")
conn.execute("INSERT INTO users VALUES (2, 'admin', 'superadmin')")
conn.commit()
return conn
def safe_stored_proc(conn, username: str):
return conn.execute("SELECT * FROM users WHERE username = ?", (username,)).fetchall()
def vulnerable_stored_proc(conn, username: str):
dynamic_sql = "SELECT * FROM users WHERE username = '" + username + "'"
return conn.execute(dynamic_sql).fetchall()
def safe_dynamic_sp(conn, username: str):
return conn.execute("SELECT * FROM users WHERE username = ?", (username,)).fetchall()
conn = setup_db()
attack = "' OR '1'='1"
safe_rows = safe_stored_proc(conn, "alice")
print(f"Safe SP simulation: {len(safe_rows)} user found")
vuln_rows = vulnerable_stored_proc(conn, attack)
print(f"Vulnerable SP with EXEC: {'VULNERABLE (injection succeeded)' if len(vuln_rows) > 1 else 'safe'}")
safe_dynamic_rows = safe_dynamic_sp(conn, attack)
print(f"SP with parameters only: {'safe' if len(safe_dynamic_rows) == 0 else 'vulnerable'}")
conn.close()
The stored procedure myth: Many developers believe "using stored procedures prevents SQL injection." This is false. A stored procedure that builds SQL strings internally using EXEC or string concatenation is just as vulnerable as application-level string concatenation — the injection simply happens inside the database engine instead of in the application layer. Stored procedures are safe only when they exclusively use parameterized statements.
Expected Output
Safe SP simulation: 1 user found
Vulnerable SP with EXEC: VULNERABLE (injection succeeded)
SP with parameters only: safeHints
Hint 1: Stored procedures are safe only if they use parameterized SQL internally — a stored proc that uses EXEC() or sp_executesql with string concatenation is just as vulnerable.
Hint 2: Dynamic SQL inside stored procedures (EXEC("SELECT * FROM " + @table)) is a common source of injection.
Hint 3: Use sp_executesql with @params for safe dynamic SQL in MSSQL; prepared statements in MySQL stored procs.
Build a static analysis tool using Python's ast module to detect SQL injection vulnerabilities in code.
import ast
CODE_SAMPLES = [
(2, 'cursor.execute("SELECT * FROM users WHERE id = " + user_id)'),
(3, 'cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))'),
(4, 'cursor.execute(f"SELECT * FROM users WHERE name = \'{name}\'")'),
(5, 'cursor.execute("SELECT * FROM users WHERE name = %s" % name)'),
(8, 'conn.execute("SELECT * FROM users WHERE name = :name", {"name": name})'),
]
def audit_execute_call(code: str) -> str:
"""
Returns 'safe', 'string_concat', 'fstring', 'format_method', or 'percent_format'.
"""
try:
tree = ast.parse(code, mode="eval")
except SyntaxError:
return "parse_error"
call = tree.body
if not isinstance(call, ast.Call):
return "not_a_call"
if not call.args:
return "no_args"
first_arg = call.args[0]
# f-string
if isinstance(first_arg, ast.JoinedStr):
return "fstring"
# string concatenation: "..." + var
if isinstance(first_arg, ast.BinOp) and isinstance(first_arg.op, ast.Add):
return "string_concat"
# % formatting: "..." % var
if isinstance(first_arg, ast.BinOp) and isinstance(first_arg.op, ast.Mod):
return "percent_format"
# .format() call
if (isinstance(first_arg, ast.Call) and
isinstance(first_arg.func, ast.Attribute) and
first_arg.func.attr == "format"):
return "format_method"
# Has a second argument (bind params) — likely safe
if len(call.args) >= 2 or call.keywords:
return "safe"
# Constant string alone — check for placeholders
if isinstance(first_arg, ast.Constant) and isinstance(first_arg.value, str):
sql = first_arg.value
if "?" in sql or "%s" in sql or ":name" in sql:
return "safe"
return "safe"
results = [audit_execute_call(code) for _, code in CODE_SAMPLES]
vulnerable = [(line, r, code) for (line, code), r in zip(CODE_SAMPLES, results) if r != "safe"]
safe_count = sum(1 for r in results if r == "safe")
print(f"Scanned {len(CODE_SAMPLES)} code snippets")
print(f"Vulnerable: {len(vulnerable)}")
print(f"Safe: {safe_count}")
print("Vulnerability report:")
for line, vuln_type, code in vulnerable:
print(f" Line {line}: {vuln_type} — {code[:50]}...")
Solution
import ast
CODE_SAMPLES = [
(2, 'cursor.execute("SELECT * FROM users WHERE id = " + user_id)'),
(3, 'cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))'),
(4, 'cursor.execute(f"SELECT * FROM users WHERE name = \'{name}\'")'),
(5, 'cursor.execute("SELECT * FROM users WHERE name = %s" % name)'),
(8, 'conn.execute("SELECT * FROM users WHERE name = :name", {"name": name})'),
]
def audit_execute_call(code: str) -> str:
try:
tree = ast.parse(code, mode="eval")
except SyntaxError:
return "parse_error"
call = tree.body
if not isinstance(call, ast.Call):
return "not_a_call"
if not call.args:
return "no_args"
first_arg = call.args[0]
if isinstance(first_arg, ast.JoinedStr):
return "fstring"
if isinstance(first_arg, ast.BinOp) and isinstance(first_arg.op, ast.Add):
return "string_concat"
if isinstance(first_arg, ast.BinOp) and isinstance(first_arg.op, ast.Mod):
return "percent_format"
if (isinstance(first_arg, ast.Call) and
isinstance(first_arg.func, ast.Attribute) and
first_arg.func.attr == "format"):
return "format_method"
if len(call.args) >= 2 or call.keywords:
return "safe"
if isinstance(first_arg, ast.Constant) and isinstance(first_arg.value, str):
sql = first_arg.value
if "?" in sql or "%s" in sql or ":name" in sql:
return "safe"
return "safe"
results = [audit_execute_call(code) for _, code in CODE_SAMPLES]
vulnerable = [(line, r, code) for (line, code), r in zip(CODE_SAMPLES, results) if r != "safe"]
safe_count = sum(1 for r in results if r == "safe")
print(f"Scanned {len(CODE_SAMPLES)} code snippets")
print(f"Vulnerable: {len(vulnerable)}")
print(f"Safe: {safe_count}")
print("Vulnerability report:")
for line, vuln_type, code in vulnerable:
print(f" Line {line}: {vuln_type} — {code[:50]}...")
Building a real scanner: This exercise is a simplified version of what Bandit's B608 check and semgrep's SQL injection rules do. Production tools use full AST walking with call graph analysis — tracking where string arguments come from across function boundaries. For CI integration, bandit -r src/ -t B608 or semgrep --config p/python-sql-injection src/ gives you this analysis on every pull request.
Expected Output
Scanned 5 code snippets
Vulnerable: 3
Safe: 2
Vulnerability report:
Line 2: string_concat — cursor.execute("SELECT * FROM u...
Line 4: fstring — cursor.execute(f"SELECT * FROM users...
Line 5: format_method — cursor.execute("SELECT * FROM u...Hints
Hint 1: Use the ast module to parse Python code and find cursor.execute() calls.
Hint 2: Check the first argument: if it is a JoinedStr (f-string), BinOp (string +), or Call with .format(), flag it.
Hint 3: A Constant string with placeholders (? or %s or :name) followed by a second argument is safe.
