Python Database Migrations with Alembic: Practice Problems & Exercises
Practice: Database Migrations with Alembic
← Back to lesson:::note About These Problems Alembic is tightly coupled to a project file structure. These problems use sqlite3 directly to teach migration concepts (schema versioning, add/drop column, data backfill) that map directly to Alembic operations. Notes explain the equivalent Alembic commands throughout. :::
Build a schema version tracking table that mirrors Alembic's alembic_version table.
Solution:
import time
def create_migration_table(conn):
conn.execute("""
CREATE TABLE schema_migrations (
version TEXT PRIMARY KEY,
applied_at REAL
)
""")
conn.commit()
def apply_migration(conn, version):
conn.execute(
"INSERT OR IGNORE INTO schema_migrations (version, applied_at) VALUES (?,?)",
(version, time.time())
)
conn.commit()
def get_current_version(conn):
row = conn.execute(
"SELECT version FROM schema_migrations ORDER BY applied_at DESC LIMIT 1"
).fetchone()
return row[0] if row else None
import sqlite3
# Alembic tracks migrations in a table called 'alembic_version'
# with a single column: version_num (the current revision hash).
# This problem simulates that bookkeeping manually.
conn = sqlite3.connect(':memory:')
# TODO: Create a schema_migrations table with columns:
# (version TEXT PRIMARY KEY, applied_at REAL)
# Implement apply_migration(conn, version) and get_current_version(conn).
import time
def create_migration_table(conn):
pass
def apply_migration(conn, version):
pass
def get_current_version(conn):
pass
create_migration_table(conn)
apply_migration(conn, '001_create_users')
apply_migration(conn, '002_add_email_index')
print(get_current_version(conn))
Expected Output
002_add_email_indexHints
Hint 1: CREATE TABLE schema_migrations (version TEXT PRIMARY KEY, applied_at REAL)
Hint 2: INSERT the version and current timestamp; use ON CONFLICT IGNORE for idempotency.
Hint 3: SELECT version FROM schema_migrations ORDER BY applied_at DESC LIMIT 1 gets the latest.
Write upgrade (ADD COLUMN) and downgrade (remove column) migration functions.
Solution:
def upgrade(conn):
conn.execute("ALTER TABLE users ADD COLUMN phone TEXT")
conn.commit()
def downgrade(conn):
conn.execute("""
CREATE TABLE users_new (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT
)
""")
conn.execute("INSERT INTO users_new SELECT id, name, email FROM users")
conn.execute("DROP TABLE users")
conn.execute("ALTER TABLE users_new RENAME TO users")
conn.commit()
import sqlite3
# Alembic upgrade: op.add_column('users', sa.Column('phone', sa.String()))
# Alembic downgrade: op.drop_column('users', 'phone')
# SQLite equivalent: ALTER TABLE users ADD COLUMN phone TEXT
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)")
conn.executemany("INSERT INTO users VALUES (?,?,?)", [
(1,'Alice','[email protected]'),
(2,'Bob','[email protected]'),
])
conn.commit()
# TODO: Implement upgrade() that adds a 'phone' column (nullable TEXT).
# Implement downgrade() that... note: SQLite can't drop columns before 3.35.
# For downgrade, recreate the table without the column.
def upgrade(conn):
pass
def downgrade(conn):
pass
def get_columns(conn, table):
cur = conn.execute(f"PRAGMA table_info({table})")
return [row[1] for row in cur.fetchall()]
upgrade(conn)
print('After upgrade:', get_columns(conn, 'users'))
downgrade(conn)
print('After downgrade:', get_columns(conn, 'users'))
Expected Output
After upgrade: ['id', 'name', 'email', 'phone']
After downgrade: ['id', 'name', 'email']Hints
Hint 1: ALTER TABLE users ADD COLUMN phone TEXT adds the column.
Hint 2: SQLite drop column workaround: CREATE new table, INSERT SELECT, DROP old, RENAME new.
Hint 3: In Alembic, use op.drop_column() — it handles the recreation internally for SQLite.
Rename a column using ALTER TABLE ... RENAME COLUMN and verify the data is intact.
Solution:
def rename_price_column(conn):
conn.execute("ALTER TABLE products RENAME COLUMN price TO unit_price")
conn.commit()
import sqlite3
# SQLite 3.25+ supports: ALTER TABLE t RENAME COLUMN old TO new
# Alembic: op.alter_column('products', 'price', new_column_name='unit_price')
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT, price REAL)")
conn.executemany("INSERT INTO products VALUES (?,?,?)", [
(1, 'Widget', 9.99), (2, 'Gadget', 49.99)
])
conn.commit()
# TODO: Rename column 'price' to 'unit_price'
# Verify existing data is intact.
def rename_price_column(conn):
pass
rename_price_column(conn)
cols = [r[1] for r in conn.execute("PRAGMA table_info(products)").fetchall()]
data = conn.execute("SELECT id, name, unit_price FROM products").fetchall()
print('Columns:', cols)
print('Data:', data)
Expected Output
Columns: ['id', 'name', 'unit_price']
Data: [(1, 'Widget', 9.99), (2, 'Gadget', 49.99)]Hints
Hint 1: ALTER TABLE products RENAME COLUMN price TO unit_price works in SQLite 3.25+.
Hint 2: The data is preserved — only the column name changes.
Hint 3: In Alembic: op.alter_column('products', 'price', new_column_name='unit_price').
Build a migration runner that applies pending migrations sequentially and supports rolling back.
Solution:
class MigrationRunner:
def __init__(self, conn, migrations):
self.conn = conn
self.migrations = migrations
self.applied = []
conn.execute("CREATE TABLE IF NOT EXISTS _migrations (version TEXT PRIMARY KEY)")
conn.commit()
rows = conn.execute("SELECT version FROM _migrations").fetchall()
applied_versions = {r[0] for r in rows}
self.applied = [m for m in migrations if m['version'] in applied_versions]
def upgrade_all(self):
applied_versions = {m['version'] for m in self.applied}
for migration in self.migrations:
if migration['version'] not in applied_versions:
self.conn.executescript(migration['up'])
self.conn.execute(
"INSERT INTO _migrations VALUES (?)", (migration['version'],)
)
self.conn.commit()
self.applied.append(migration)
def downgrade(self, n=1):
for _ in range(min(n, len(self.applied))):
migration = self.applied.pop()
self.conn.executescript(migration['down'])
self.conn.execute(
"DELETE FROM _migrations WHERE version = ?", (migration['version'],)
)
self.conn.commit()
def current_version(self):
return self.applied[-1]['version'] if self.applied else None
import sqlite3
# Alembic builds a directed graph of revisions.
# Each revision has an 'up_revision' pointing to its parent.
# Running 'alembic upgrade head' applies all pending migrations in order.
conn = sqlite3.connect(':memory:')
# Simulate 3 migrations in sequence
MIGRATIONS = [
{
'version': 'v001',
'up': "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)",
'down': "DROP TABLE users",
},
{
'version': 'v002',
'up': "ALTER TABLE users ADD COLUMN email TEXT",
'down': "CREATE TABLE users_bak AS SELECT id, name FROM users; DROP TABLE users; ALTER TABLE users_bak RENAME TO users",
},
{
'version': 'v003',
'up': "CREATE INDEX idx_email ON users(email)",
'down': "DROP INDEX idx_email",
},
]
# TODO: Implement MigrationRunner with:
# - upgrade_all(): apply all pending migrations
# - downgrade(n=1): roll back n migrations
# - current_version(): return current version string
class MigrationRunner:
def __init__(self, conn, migrations):
pass
def upgrade_all(self):
pass
def downgrade(self, n=1):
pass
def current_version(self):
pass
runner = MigrationRunner(conn, MIGRATIONS)
runner.upgrade_all()
print('After upgrade_all:', runner.current_version())
runner.downgrade(1)
print('After downgrade(1):', runner.current_version())
Expected Output
After upgrade_all: v003
After downgrade(1): v002Hints
Hint 1: Track applied migrations in a list (or an in-memory table).
Hint 2: upgrade_all() applies migrations whose version is not yet in the applied list.
Hint 3: downgrade(n) pops the last n applied migrations and runs their 'down' SQL.
Write a data backfill migration that computes a derived column's value for all existing rows.
Solution:
def backfill_totals(conn):
cur = conn.execute("""
UPDATE orders
SET total = ROUND(subtotal * (1 + tax_rate), 2)
WHERE total IS NULL
""")
conn.commit()
return cur.rowcount
import sqlite3
# A common migration pattern: add a column, then backfill it from existing data.
# Alembic: add_column in upgrade, then op.execute("UPDATE ...") to backfill.
conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE orders (
id INTEGER PRIMARY KEY,
subtotal REAL,
tax_rate REAL,
total REAL -- currently NULL, needs backfill
)
""")
conn.executemany("INSERT INTO orders VALUES (?,?,?,?)", [
(1, 100.0, 0.08, None),
(2, 250.0, 0.10, None),
(3, 75.0, 0.08, None),
(4, 500.0, 0.05, None),
])
conn.commit()
# TODO: Write a data migration that:
# 1. Computes total = subtotal * (1 + tax_rate) for all rows where total IS NULL
# 2. Updates each row
# 3. Adds a NOT NULL constraint by recreating the table
# Return count of rows updated.
def backfill_totals(conn):
pass
count = backfill_totals(conn)
print('Rows updated:', count)
rows = conn.execute("SELECT id, total FROM orders ORDER BY id").fetchall()
for row in rows:
print(f' Order {row[0]}: {row[1]:.2f}')
Expected Output
Rows updated: 4
Order 1: 108.00
Order 2: 275.00
Order 3: 81.00
Order 4: 525.00Hints
Hint 1: UPDATE orders SET total = subtotal * (1 + tax_rate) WHERE total IS NULL
Hint 2: cursor.rowcount gives the number of rows affected.
Hint 3: For the NOT NULL constraint: recreate the table with total REAL NOT NULL and INSERT SELECT.
Implement the expand-contract pattern for a zero-downtime column rename across three migration phases.
Solution:
def expand(conn):
conn.execute("ALTER TABLE users ADD COLUMN display_name TEXT")
conn.commit()
def migrate_data(conn):
conn.execute("UPDATE users SET display_name = username WHERE display_name IS NULL")
conn.commit()
def contract(conn):
conn.execute("""
CREATE TABLE users_new (
id INTEGER PRIMARY KEY,
email TEXT,
display_name TEXT
)
""")
conn.execute("INSERT INTO users_new SELECT id, email, display_name FROM users")
conn.execute("DROP TABLE users")
conn.execute("ALTER TABLE users_new RENAME TO users")
conn.commit()
import sqlite3
# The expand-contract pattern makes schema changes without downtime:
# Phase 1 (Expand): Add new column, keep old column, dual-write to both
# Phase 2 (Migrate): Backfill new column from old
# Phase 3 (Contract): Remove old column once all readers use the new one
# Scenario: rename 'username' to 'display_name' with zero downtime
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, username TEXT, email TEXT)")
conn.executemany("INSERT INTO users VALUES (?,?,?)", [
(1,'alice','[email protected]'),
(2,'bob','[email protected]'),
])
conn.commit()
# TODO:
# Phase 1: expand() — add display_name column (nullable)
# Phase 2: migrate() — copy username -> display_name for NULL rows
# Phase 3: contract() — remove username column
# Return column names after each phase.
def expand(conn):
pass
def migrate_data(conn):
pass
def contract(conn):
pass
def get_columns(conn):
return [r[1] for r in conn.execute("PRAGMA table_info(users)").fetchall()]
print('Start:', get_columns(conn))
expand(conn)
print('After expand:', get_columns(conn))
migrate_data(conn)
data_mid = conn.execute("SELECT id, username, display_name FROM users ORDER BY id").fetchall()
print('Data mid:', data_mid)
contract(conn)
print('After contract:', get_columns(conn))
Expected Output
Start: ['id', 'username', 'email']
After expand: ['id', 'username', 'email', 'display_name']
Data mid: [(1, 'alice', 'alice'), (2, 'bob', 'bob')]
After contract: ['id', 'email', 'display_name']Hints
Hint 1: expand(): ALTER TABLE users ADD COLUMN display_name TEXT
Hint 2: migrate_data(): UPDATE users SET display_name = username WHERE display_name IS NULL
Hint 3: contract(): recreate table with only (id, email, display_name).
Write an idempotent migration that can be safely applied multiple times without errors or data corruption.
Solution:
def migration_001(conn):
conn.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY,
name TEXT,
ts INTEGER
)
""")
# Add column only if it doesn't exist
existing = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
if 'processed' not in existing:
conn.execute("ALTER TABLE events ADD COLUMN processed INTEGER DEFAULT 0")
conn.execute("CREATE INDEX IF NOT EXISTS idx_events_ts ON events(ts)")
conn.commit()
import sqlite3
# Migrations should be idempotent — running them twice should not fail or corrupt data.
# Alembic tracks applied migrations, but the SQL itself should also be safe to re-run.
conn = sqlite3.connect(':memory:')
# TODO: Write migration_001(conn) that is fully idempotent:
# 1. Creates table 'events' if it doesn't exist
# 2. Adds column 'processed' if it doesn't exist
# 3. Creates index 'idx_events_ts' if it doesn't exist
# Running it twice should produce no errors and same end state.
def migration_001(conn):
pass
# Run twice — both should succeed
migration_001(conn)
migration_001(conn)
cols = [r[1] for r in conn.execute("PRAGMA table_info(events)").fetchall()]
indexes = [r[1] for r in conn.execute("PRAGMA index_list(events)").fetchall()]
print('Columns:', sorted(cols))
print('Indexes:', sorted(indexes))
Expected Output
Columns: ['id', 'name', 'processed', 'ts']
Indexes: ['idx_events_ts']Hints
Hint 1: CREATE TABLE IF NOT EXISTS and CREATE INDEX IF NOT EXISTS are idempotent.
Hint 2: For ADD COLUMN: check PRAGMA table_info first, only add if column doesn't exist.
Hint 3: Wrap in a try/except OperationalError as a fallback if IF NOT EXISTS isn't supported.
Add a foreign key constraint to an existing table by recreating it, after cleaning up orphaned rows.
Solution:
def add_foreign_key_constraint(conn):
# Clean orphans
conn.execute("""
DELETE FROM items
WHERE category_id NOT IN (SELECT id FROM categories)
""")
# Recreate with FK
conn.execute("""
CREATE TABLE items_new (
id INTEGER PRIMARY KEY,
name TEXT,
category_id INTEGER,
FOREIGN KEY(category_id) REFERENCES categories(id)
)
""")
conn.execute("INSERT INTO items_new SELECT id, name, category_id FROM items")
conn.execute("DROP TABLE items")
conn.execute("ALTER TABLE items_new RENAME TO items")
conn.commit()
count = conn.execute("SELECT COUNT(*) FROM items").fetchone()[0]
fk_ok = test_fk_enforcement(conn)
return count, fk_ok
import sqlite3
# SQLite doesn't support adding foreign keys to existing tables via ALTER TABLE.
# The only way is to recreate the table with the constraint.
# This is the same workaround Alembic uses for SQLite.
conn = sqlite3.connect(':memory:')
conn.execute("PRAGMA foreign_keys = ON")
conn.executescript("""
CREATE TABLE categories (id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, category_id INTEGER);
INSERT INTO categories VALUES (1,'Hardware'),(2,'Software');
INSERT INTO items VALUES (1,'Widget',1),(2,'App',2),(3,'Orphan',99);
""")
# TODO: Migrate items table to add a proper FOREIGN KEY constraint on category_id.
# First clean up orphaned rows (category_id not in categories).
# Then recreate the table with the FK constraint.
# Return final row count and whether FK is enforced.
def add_foreign_key_constraint(conn):
pass
def test_fk_enforcement(conn):
try:
conn.execute("INSERT INTO items VALUES (100, 'Bad', 999)")
conn.commit()
return False
except Exception:
return True
count, fk_ok = add_foreign_key_constraint(conn)
print(f'Rows after migration: {count}')
print(f'FK enforced: {fk_ok}')
Expected Output
Rows after migration: 2
FK enforced: TrueHints
Hint 1: DELETE FROM items WHERE category_id NOT IN (SELECT id FROM categories) removes orphans.
Hint 2: Recreate: CREATE TABLE items_new (..., FOREIGN KEY(category_id) REFERENCES categories(id)).
Hint 3: INSERT SELECT from old to new, DROP old, RENAME new.
Build a schema diff tool that compares the live database schema against an expected definition, reporting discrepancies.
Solution:
def schema_diff(conn, expected_schema):
actual_tables = {
row[0] for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
).fetchall()
}
expected_tables = set(expected_schema.keys())
missing_tables = list(expected_tables - actual_tables)
extra_tables = list(actual_tables - expected_tables)
missing_columns = {}
extra_columns = {}
for table in expected_tables & actual_tables:
actual_cols = {
row[1] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()
}
expected_cols = set(expected_schema[table])
missing = list(expected_cols - actual_cols)
extra = list(actual_cols - expected_cols)
if missing:
missing_columns[table] = sorted(missing)
if extra:
extra_columns[table] = sorted(extra)
return {
'missing_tables': missing_tables,
'extra_tables': extra_tables,
'missing_columns': missing_columns,
'extra_columns': extra_columns,
}
import sqlite3
# Alembic's autogenerate compares SQLAlchemy models to the live DB schema.
# This problem simulates a simple schema diff.
# TODO: Implement schema_diff(conn, expected_schema) that:
# - expected_schema is a dict: {table_name: [column_names]}
# - Returns a dict with 'missing_tables', 'extra_tables',
# 'missing_columns' (per table), 'extra_columns' (per table)
def schema_diff(conn, expected_schema):
pass
conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT);
CREATE TABLE orders (id INTEGER PRIMARY KEY, user_id INTEGER, total REAL);
CREATE TABLE legacy_table (id INTEGER PRIMARY KEY, old_col TEXT);
""")
expected = {
'users': ['id', 'name', 'email', 'phone'], # missing: phone
'orders': ['id', 'user_id', 'total', 'status'], # missing: status
'products': ['id', 'name', 'price'], # missing table
# legacy_table not in expected -> extra table
}
diff = schema_diff(conn, expected)
print('Missing tables:', sorted(diff['missing_tables']))
print('Extra tables:', sorted(diff['extra_tables']))
print('Missing columns:', dict(sorted(diff['missing_columns'].items())))
Expected Output
Missing tables: ['products']
Extra tables: ['legacy_table']
Missing columns: {'orders': ['status'], 'users': ['phone']}Hints
Hint 1: SELECT name FROM sqlite_master WHERE type='table' lists actual tables.
Hint 2: PRAGMA table_info(table_name) lists columns for a given table.
Hint 3: Set differences: expected_set - actual_set gives missing; actual_set - expected_set gives extra.
Execute a multi-step migration atomically — rolling back all steps if any single one fails.
Solution:
def run_migration(conn, steps):
conn.execute("BEGIN")
try:
for step in steps:
conn.execute(step)
conn.execute("COMMIT")
return True, None
except Exception as e:
conn.execute("ROLLBACK")
return False, str(e)
import sqlite3
# A complex migration may have multiple steps.
# If any step fails, the entire migration should roll back.
conn = sqlite3.connect(':memory:')
conn.executescript("""
CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT, price REAL);
INSERT INTO products VALUES (1,'Widget',10.0),(2,'Gadget',50.0),(3,'Doohickey',25.0);
""")
# TODO: Implement run_migration(conn, steps) where steps is a list of SQL strings.
# All steps run inside a single transaction.
# If any step fails, roll back everything and return (False, error_message).
# If all succeed, commit and return (True, None).
def run_migration(conn, steps):
pass
# Test 1: Successful migration
ok, err = run_migration(conn, [
"ALTER TABLE products ADD COLUMN category TEXT",
"UPDATE products SET category = 'hardware' WHERE id IN (1, 3)",
"UPDATE products SET category = 'electronics' WHERE id = 2",
"CREATE INDEX idx_category ON products(category)",
])
print('Migration 1:', ok, err)
# Test 2: Failed migration (invalid SQL)
ok2, err2 = run_migration(conn, [
"ALTER TABLE products ADD COLUMN weight REAL",
"UPDATE products SET weight = 1.0",
"THIS IS NOT VALID SQL", # will fail
])
print('Migration 2:', ok2, err2 is not None)
# Verify the failed migration left no trace
cols = [r[1] for r in conn.execute("PRAGMA table_info(products)").fetchall()]
print('weight column exists:', 'weight' in cols)
Expected Output
Migration 1: True None
Migration 2: False True
weight column exists: FalseHints
Hint 1: Use BEGIN/COMMIT/ROLLBACK directly, or a SAVEPOINT, to wrap all steps.
Hint 2: Loop through steps; on sqlite3.OperationalError, call conn.rollback() and return (False, str(e)).
Hint 3: SQLite's executescript() auto-commits — use execute() for each step to keep manual transaction control.
Build a migration safety validator that flags dangerous SQL patterns before they reach production.
Solution:
import re
def validate_migration(sql):
issues = []
lines = sql.splitlines()
for rule in SAFETY_RULES:
pattern = re.compile(rule['pattern'], re.IGNORECASE | re.MULTILINE)
for i, line in enumerate(lines, 1):
if pattern.search(line):
issues.append({
'rule': rule['name'],
'severity': rule['severity'],
'message': rule['message'],
'line': i,
})
break # one issue per rule per migration
severity_order = {'ERROR': 0, 'WARNING': 1}
issues.sort(key=lambda x: (severity_order.get(x['severity'], 2), x['rule']))
return issues
import sqlite3
import re
# Before running a migration in production, validate it against safety rules:
# 1. No DROP TABLE without a backup check
# 2. No truncate/DELETE without WHERE
# 3. All new columns should be nullable or have a DEFAULT
# 4. Detect potential lock-heavy operations
SAFETY_RULES = [
{
'name': 'drop_table_without_backup',
'pattern': r'DROPs+TABLE(?!s+IFs+EXISTS)',
'severity': 'ERROR',
'message': 'DROP TABLE without IF EXISTS — data loss risk',
},
{
'name': 'delete_without_where',
'pattern': r'DELETEs+FROMs+w+s*;',
'severity': 'ERROR',
'message': 'DELETE without WHERE — will delete all rows',
},
{
'name': 'add_not_null_no_default',
'pattern': r'ADDs+COLUMNs+w+s+w+s+NOTs+NULL(?!s+DEFAULT)',
'severity': 'WARNING',
'message': 'ADD COLUMN NOT NULL without DEFAULT — fails on non-empty table',
},
{
'name': 'drop_index',
'pattern': r'DROPs+INDEX',
'severity': 'WARNING',
'message': 'Dropping an index — may cause query slowdown',
},
]
# TODO: Implement validate_migration(sql) that checks the SQL against all rules.
# Return a list of dicts: {rule, severity, message, line}.
def validate_migration(sql):
pass
migration_sql = """
ALTER TABLE users ADD COLUMN age INTEGER NOT NULL;
DELETE FROM sessions;
DROP TABLE temp_data;
DROP INDEX idx_old;
"""
issues = validate_migration(migration_sql)
for issue in issues:
print(f"[{issue['severity']}] {issue['rule']}: {issue['message']}")
Expected Output
[WARNING] add_not_null_no_default: ADD COLUMN NOT NULL without DEFAULT — fails on non-empty table
[ERROR] delete_without_where: DELETE without WHERE — will delete all rows
[ERROR] drop_table_without_backup: DROP TABLE without IF EXISTS — data loss risk
[WARNING] drop_index: Dropping an index — may cause query slowdownHints
Hint 1: Use re.search(rule['pattern'], sql, re.IGNORECASE) to check each rule.
Hint 2: To find the line number: iterate sql.splitlines() and check each line.
Hint 3: Sort results by severity (ERROR before WARNING) for consistent output.
