Skip to main content

SQLite with Python

Reading time: ~35 minutes | Level: Intermediate → Engineering

Before reading further, predict what happens when this code runs:

import sqlite3
import threading

conn = sqlite3.connect("tasks.db")

def worker():
cursor = conn.cursor()
cursor.execute("SELECT * FROM tasks") # uses connection from main thread
print(cursor.fetchall())

t = threading.Thread(target=worker)
t.start()
t.join()

If you guessed it crashes with:

ProgrammingError: SQLite objects created in a thread can only be used
in that same thread.

you are right. But do you know why? And do you know the three different ways to fix it correctly - each with different tradeoffs?

Understanding that requires understanding how SQLite's threading model works and how Python's sqlite3 module enforces it. That is what this lesson covers.

What You Will Learn

  • The Python DB-API 2.0 standard that all database drivers follow
  • The three-layer model: connection, cursor, result
  • How to execute queries safely using parameterized statements
  • Context managers for automatic transaction management
  • Row factories for named-column access
  • In-memory databases for fast, isolated testing
  • SQLite's threading model and how to work with it correctly
  • When SQLite is the right tool and when to move to PostgreSQL

Prerequisites

  • Python Foundation: functions, context managers (with), exceptions
  • Module 07, Lesson 01: SQL Fundamentals (SELECT, INSERT, UPDATE, DELETE, transactions)
  • Basic understanding of Python threads (Module 08) is helpful but not required

Part 1 - Python DB-API 2.0: The Interface All Drivers Follow

Python database drivers do not each invent their own API. They follow PEP 249 - Python Database API Specification 2.0 (commonly called DB-API 2). Every driver you will use - sqlite3, psycopg2, psycopg3, mysql-connector-python - implements the same interface.

This matters for a concrete reason: if you learn the DB-API 2.0 pattern once, switching databases requires changing only the import and connection call, not the entire codebase.

The Three-Layer Model

  • Connection - represents the link to the database file (or server). Manages transactions.
  • Cursor - a stateful object used to execute SQL and retrieve results. One connection can spawn many cursors.
  • Results - returned as Python tuples (or custom row objects) via fetch methods.
import sqlite3

# Layer 1: Connection
conn = sqlite3.connect("tasks.db")

# Layer 2: Cursor
cursor = conn.cursor()

# Layer 3: Execute + Fetch
cursor.execute("SELECT sqlite_version()")
version = cursor.fetchone()
print(version) # ('3.43.2',)

conn.close()

What DB-API 2.0 Guarantees

ComponentWhat it defines
connect(**kwargs)How to obtain a connection
connection.cursor()How to create a cursor
cursor.execute(sql, params)How to run a statement
cursor.executemany(sql, seq)How to run a statement with many param sets
cursor.fetchone()Get next row, or None
cursor.fetchall()Get all remaining rows as a list
cursor.fetchmany(size)Get next N rows
cursor.rowcountRows affected by last DML
cursor.lastrowidID of last inserted row
connection.commit()Commit the current transaction
connection.rollback()Abort the current transaction
connection.close()Close the connection

Part 2 - Connecting to SQLite

sqlite3.connect() - File vs Memory

import sqlite3

# Connect to a file - creates it if it doesn't exist
conn = sqlite3.connect("myapp.db")

# In-memory - lives only for the duration of the process
conn = sqlite3.connect(":memory:")

# Shared-memory URI - advanced use case
conn = sqlite3.connect("file:mydb?mode=memory&cache=shared", uri=True)

SQLite creates the file on first connect if it does not exist. No server. No user. No password. The file is the database.

Creating a Schema

import sqlite3

conn = sqlite3.connect("tasks.db")
cursor = conn.cursor()

cursor.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
done INTEGER NOT NULL DEFAULT 0,
created TEXT NOT NULL DEFAULT (datetime('now'))
)
""")

conn.commit()
conn.close()

:::note INTEGER PRIMARY KEY AUTOINCREMENT In SQLite, INTEGER PRIMARY KEY is an alias for the internal rowid. Adding AUTOINCREMENT prevents rowid reuse - SQLite will never reuse a deleted row's ID. For most applications this is the right choice; without AUTOINCREMENT, SQLite can reassign IDs from deleted rows. :::

Part 3 - Executing Queries

execute() - Single Statement

cursor.execute("SELECT id, title, done FROM tasks")

fetchone(), fetchall(), fetchmany()

# fetchone - returns one row as a tuple, or None if no more rows
row = cursor.fetchone()
if row:
print(row) # (1, 'Write tests', 0)

# fetchall - returns all remaining rows as a list of tuples
rows = cursor.fetchall()
for row in rows:
print(row)

# fetchmany(n) - returns up to n rows; efficient for pagination
batch = cursor.fetchmany(100)

:::tip Iterating a Cursor Directly A cursor is iterable. Instead of fetchall() (which loads everything into memory), you can iterate the cursor for memory-efficient row processing:

cursor.execute("SELECT * FROM tasks")
for row in cursor: # streams rows one at a time
process(row)

This matters when queries return millions of rows. :::

executemany() - Batch Inserts

tasks = [
("Write unit tests", 0),
("Deploy to staging", 0),
("Review pull request", 1),
]

cursor.executemany(
"INSERT INTO tasks (title, done) VALUES (?, ?)",
tasks
)
conn.commit()

executemany() is significantly more efficient than calling execute() in a loop - it batches the operations into fewer round-trips and SQLite can optimize the write journal.

cursor.rowcount and cursor.lastrowid

cursor.execute("INSERT INTO tasks (title, done) VALUES (?, ?)", ("New task", 0))
print(cursor.lastrowid) # e.g. 4 - ID of the inserted row
conn.commit()

cursor.execute("UPDATE tasks SET done = 1 WHERE done = 0")
print(cursor.rowcount) # number of rows updated
conn.commit()

Part 4 - Parameterized Queries: The Only Safe Way

:::danger Never Use f-strings or String Formatting to Build SQL This is SQL injection, one of the most common and destructive vulnerabilities in software:

# NEVER DO THIS
user_input = "'; DROP TABLE tasks; --"
cursor.execute(f"SELECT * FROM tasks WHERE title = '{user_input}'")
# The injected SQL runs. Your table is gone.

The sqlite3 module provides a safe mechanism. Use it - always, without exception. :::

SQLite Placeholder: ?

SQLite's DB-API driver uses ? as the positional placeholder. Pass parameters as a tuple (or list) as the second argument to execute():

# Safe: parameters are escaped and bound separately from the SQL text
cursor.execute(
"SELECT * FROM tasks WHERE title = ? AND done = ?",
("Write tests", 0)
)

# Single parameter - still needs a tuple (note the trailing comma)
cursor.execute("SELECT * FROM tasks WHERE id = ?", (task_id,))

# Named placeholders - use :name style
cursor.execute(
"SELECT * FROM tasks WHERE title = :title AND done = :done",
{"title": "Write tests", "done": 0}
)

The database driver sends the SQL text and the parameters separately to the database engine. The engine never concatenates them into a string. No injection is possible.

:::note %s Is Not For sqlite3 Some PostgreSQL drivers use %s as the placeholder. SQLite uses ?. Do not mix them up - cursor.execute("SELECT * FROM tasks WHERE id = %s", (1,)) will silently do nothing useful in sqlite3. :::

Part 5 - Context Managers and Transactions

with conn: - Automatic Commit / Rollback

The sqlite3.Connection object is a context manager. Using with conn: gives you automatic transaction management:

import sqlite3

conn = sqlite3.connect("tasks.db")

try:
with conn: # begins a transaction implicitly
conn.execute("INSERT INTO tasks (title, done) VALUES (?, ?)", ("Task A", 0))
conn.execute("INSERT INTO tasks (title, done) VALUES (?, ?)", ("Task B", 0))
# If we reach here, the context manager commits on exit
# If any exception is raised inside, it rolls back
except sqlite3.Error as e:
print(f"Transaction failed: {e}")
# conn is still open and usable - just the transaction was rolled back
finally:
conn.close()

:::warning with conn: Does Not Close the Connection with conn: manages the transaction - commit on success, rollback on exception. It does not close the connection when the with block exits. You must call conn.close() separately, or use a separate context manager pattern. :::

Clean Pattern: Full Connection Lifecycle

import sqlite3
import contextlib

@contextlib.contextmanager
def get_db(path: str):
"""Context manager that opens a connection and closes it on exit."""
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row # covered in Part 6
try:
yield conn
finally:
conn.close()

# Usage
with get_db("tasks.db") as conn:
with conn: # transaction
conn.execute("INSERT INTO tasks (title, done) VALUES (?, ?)", ("New task", 0))

SQLite's Implicit Transaction Behaviour

SQLite in Python has a nuance: by default, Python's sqlite3 module uses autocommit for DDL (CREATE TABLE, DROP TABLE) but requires explicit commit() for DML (INSERT, UPDATE, DELETE).

conn = sqlite3.connect("tasks.db")

# DDL - auto-committed by sqlite3 module
conn.execute("CREATE TABLE IF NOT EXISTS tasks (id INTEGER PRIMARY KEY, title TEXT)")

# DML - you must commit or it's rolled back when the connection closes
conn.execute("INSERT INTO tasks (title) VALUES (?)", ("My task",))
conn.commit() # required!

In Python 3.12+, sqlite3.connect(autocommit=True) gives you standard SQL autocommit behaviour where every statement is committed immediately.

Part 6 - Row Factories: Column Access by Name

By default, rows come back as plain tuples - you access columns by index, which is fragile and unreadable.

cursor.execute("SELECT id, title, done, created FROM tasks")
row = cursor.fetchone()
print(row[0]) # id - what is index 0 again?
print(row[1]) # title

sqlite3.Row solves this. Set it once on the connection:

conn = sqlite3.connect("tasks.db")
conn.row_factory = sqlite3.Row # set before creating cursors

cursor = conn.cursor()
cursor.execute("SELECT id, title, done, created FROM tasks")
row = cursor.fetchone()

# Access by column name
print(row["id"])
print(row["title"])
print(row["done"])

# Still supports index access
print(row[0]) # id

# Convert to dict
print(dict(row)) # {'id': 1, 'title': 'Write tests', 'done': 0, 'created': '...'}

# Get column names
print(row.keys()) # ['id', 'title', 'done', 'created']

:::tip Always Use row_factory = sqlite3.Row There is almost no reason not to use sqlite3.Row. The named access makes code dramatically more readable and resilient to column order changes in queries. Set it immediately after connect() as a matter of habit. :::

Custom Row Factory

For more control - say, you want rows as dicts or as dataclass instances - you can assign any callable that takes (cursor, row):

def dict_factory(cursor, row):
fields = [col[0] for col in cursor.description]
return dict(zip(fields, row))

conn.row_factory = dict_factory

cursor.execute("SELECT id, title FROM tasks")
print(cursor.fetchone()) # {'id': 1, 'title': 'Write tests'}

Part 7 - In-Memory Databases for Testing

":memory:" creates a database that lives entirely in RAM, is never written to disk, and is destroyed when the connection closes. This makes it perfect for tests.

import sqlite3
import unittest

def create_schema(conn):
conn.executescript("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
done INTEGER NOT NULL DEFAULT 0
);
""")

def add_task(conn, title):
with conn:
conn.execute("INSERT INTO tasks (title) VALUES (?)", (title,))

def get_tasks(conn):
conn.row_factory = sqlite3.Row
return conn.execute("SELECT * FROM tasks ORDER BY id").fetchall()


class TestTaskDB(unittest.TestCase):

def setUp(self):
"""Each test gets its own fresh in-memory database."""
self.conn = sqlite3.connect(":memory:")
self.conn.row_factory = sqlite3.Row
create_schema(self.conn)

def tearDown(self):
self.conn.close()

def test_add_task(self):
add_task(self.conn, "Write tests")
tasks = get_tasks(self.conn)
self.assertEqual(len(tasks), 1)
self.assertEqual(tasks[0]["title"], "Write tests")
self.assertEqual(tasks[0]["done"], 0)

def test_multiple_tasks(self):
add_task(self.conn, "Task A")
add_task(self.conn, "Task B")
tasks = get_tasks(self.conn)
self.assertEqual(len(tasks), 2)

:::tip In-Memory DBs Are Faster Than Mocking When testing database-layer code, an in-memory SQLite database is often better than mocking. Mocks test that you called the right methods - an in-memory DB tests that your SQL is actually correct. Tests run at full speed with no I/O. :::

executescript() for Multi-Statement Setup

conn.executescript("""
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
);

CREATE TABLE posts (
id INTEGER PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
body TEXT NOT NULL
);

INSERT INTO users (name, email) VALUES ('Alice', '[email protected]');
INSERT INTO users (name, email) VALUES ('Bob', '[email protected]');
""")

executescript() commits any pending transaction first, then runs all statements separated by semicolons. Use it for schema setup only - it bypasses parameterized query safety, so never pass user input to it.

Part 8 - Thread Safety and the Opening Puzzle

Why the Crash Happens

SQLite itself is not fully thread-safe by default. Each sqlite3.Connection object in Python carries state (active transactions, cursor positions) that is not protected by a lock. Python's sqlite3 module enforces a rule: the connection object must only be used from the thread that created it. If you try to use it from another thread, it raises ProgrammingError.

This is the crash in the opening puzzle:

conn = sqlite3.connect("tasks.db") # created in main thread

def worker():
cursor = conn.cursor() # used in worker thread → CRASH
cursor.execute("SELECT * FROM tasks")

Each thread creates and owns its own connection:

import sqlite3
import threading

DB_PATH = "tasks.db"

def worker(worker_id):
# Each thread gets its own connection
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM tasks")
count = cursor.fetchone()[0]
print(f"Worker {worker_id}: {count} tasks")
finally:
conn.close()

threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()

Solution 2: check_same_thread=False (Use with Extreme Care)

Disabling the thread check is allowed but requires you to protect the connection with a lock:

import sqlite3
import threading

conn = sqlite3.connect("tasks.db", check_same_thread=False)
lock = threading.Lock()

def worker(worker_id):
with lock: # serialize access - only one thread at a time
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM tasks")
count = cursor.fetchone()[0]
print(f"Worker {worker_id}: {count} tasks")

:::warning check_same_thread=False Without a Lock Is Data Corruption If you disable the thread check without a lock, two threads can interleave cursor operations, corrupt transaction state, and produce wrong results. The lock negates most of the concurrency benefit anyway - for real multi-threaded applications, use one connection per thread or a connection pool. :::

Solution 3: threading.local() - Thread-Local Connections

import sqlite3
import threading

_local = threading.local()

def get_conn():
"""Returns a per-thread connection, creating it on first call."""
if not hasattr(_local, "conn"):
_local.conn = sqlite3.connect("tasks.db")
_local.conn.row_factory = sqlite3.Row
return _local.conn

def worker(worker_id):
conn = get_conn() # returns THIS thread's connection
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM tasks")
count = cursor.fetchone()[0]
print(f"Worker {worker_id}: {count} tasks")

threading.local() stores a separate value per thread - each thread gets its own connection without explicit locking.

Part 9 - When to Use SQLite vs PostgreSQL

SQLite is excellent for specific scenarios and wrong for others. Here is the decision framework:

CriteriaSQLitePostgreSQL
Concurrent writers1 (WAL mode: limited)Many - row-level locking
Multiple processesFile locking onlyFull client-server
Network accessNo - file system onlyYes - TCP/IP
Data sizeUp to ~1 TB (practical limit lower)Unlimited
Setup complexityZero - no serverRequires server/Docker
Development / CIExcellent - instant setupPossible but heavier
Embedded / mobile appsIdealNot applicable
Full-text searchBasic (FTS5)Advanced (tsvector/tsquery)
JSON / JSONBJSON1 extension (limited)Full JSONB with indexing
Replication / HANot built-inStreaming replication, logical replication

Use SQLite when:

  • You are building a desktop application, CLI tool, or mobile app
  • You are writing tests and want zero-infrastructure test isolation
  • You have one writer at a time (or can tolerate WAL-mode limitations)
  • You are prototyping and want to defer infrastructure decisions
  • The database is embedded in an application (not shared across processes)

Move to PostgreSQL when:

  • Multiple application instances write concurrently
  • You need network access from multiple machines
  • Data volume exceeds ~10 GB with complex queries
  • You need advanced features: JSONB indexes, full-text search, row-level security, replication

:::note SQLite in Production Is Not Automatically Wrong SQLite runs production workloads at Notion, Expensify, and thousands of embedded/edge applications. The question is not "is it production-ready?" but "does this workload fit SQLite's model?" Read-heavy workloads with rare writes often run perfectly on SQLite at scale. :::

Full Working Example: Task Manager

This example brings together everything in this lesson: connection management, parameterized queries, row factory, context managers, and in-memory testing.

"""
task_manager.py - A complete SQLite-backed task manager.
Demonstrates: connection management, parameterized queries,
row_factory, context managers, and in-memory testing.
"""
import sqlite3
import contextlib
from dataclasses import dataclass
from datetime import datetime
from typing import Optional


# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------

@dataclass
class Task:
id: Optional[int]
title: str
done: bool
created: str

@classmethod
def from_row(cls, row: sqlite3.Row) -> "Task":
return cls(
id=row["id"],
title=row["title"],
done=bool(row["done"]),
created=row["created"],
)


# ---------------------------------------------------------------------------
# Database layer
# ---------------------------------------------------------------------------

SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
done INTEGER NOT NULL DEFAULT 0,
created TEXT NOT NULL DEFAULT (datetime('now'))
);
"""


@contextlib.contextmanager
def open_db(path: str):
"""Open a connection with row_factory, yield it, close on exit."""
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL") # better concurrent reads
conn.execute("PRAGMA foreign_keys=ON")
try:
yield conn
finally:
conn.close()


def init_db(conn: sqlite3.Connection) -> None:
"""Create schema if it doesn't already exist."""
with conn:
conn.executescript(SCHEMA)


def add_task(conn: sqlite3.Connection, title: str) -> int:
"""Insert a new task. Returns the new row's ID."""
with conn:
cursor = conn.execute(
"INSERT INTO tasks (title) VALUES (?)",
(title,),
)
return cursor.lastrowid


def get_task(conn: sqlite3.Connection, task_id: int) -> Optional[Task]:
"""Fetch a single task by ID. Returns None if not found."""
row = conn.execute(
"SELECT id, title, done, created FROM tasks WHERE id = ?",
(task_id,),
).fetchone()
return Task.from_row(row) if row else None


def list_tasks(conn: sqlite3.Connection, done: Optional[bool] = None) -> list[Task]:
"""Return all tasks, optionally filtered by completion status."""
if done is None:
rows = conn.execute(
"SELECT id, title, done, created FROM tasks ORDER BY id"
).fetchall()
else:
rows = conn.execute(
"SELECT id, title, done, created FROM tasks WHERE done = ? ORDER BY id",
(int(done),),
).fetchall()
return [Task.from_row(row) for row in rows]


def complete_task(conn: sqlite3.Connection, task_id: int) -> bool:
"""Mark a task as done. Returns True if a row was updated."""
with conn:
cursor = conn.execute(
"UPDATE tasks SET done = 1 WHERE id = ? AND done = 0",
(task_id,),
)
return cursor.rowcount > 0


def delete_task(conn: sqlite3.Connection, task_id: int) -> bool:
"""Delete a task. Returns True if a row was deleted."""
with conn:
cursor = conn.execute(
"DELETE FROM tasks WHERE id = ?",
(task_id,),
)
return cursor.rowcount > 0


def bulk_add_tasks(conn: sqlite3.Connection, titles: list[str]) -> None:
"""Insert multiple tasks in a single transaction."""
with conn:
conn.executemany(
"INSERT INTO tasks (title) VALUES (?)",
[(title,) for title in titles],
)


# ---------------------------------------------------------------------------
# CLI demo
# ---------------------------------------------------------------------------

def main():
with open_db("tasks.db") as conn:
init_db(conn)

# Add some tasks
bulk_add_tasks(conn, [
"Write unit tests",
"Deploy to staging",
"Review pull requests",
"Update documentation",
])

# Show all tasks
print("=== All Tasks ===")
for task in list_tasks(conn):
status = "[x]" if task.done else "[ ]"
print(f" {status} [{task.id}] {task.title}")

# Complete a task
complete_task(conn, 1)

# Show pending only
print("\n=== Pending Tasks ===")
for task in list_tasks(conn, done=False):
print(f" [ ] [{task.id}] {task.title}")

# Show completed
print("\n=== Completed Tasks ===")
for task in list_tasks(conn, done=True):
print(f" [x] [{task.id}] {task.title}")

# Fetch one
task = get_task(conn, 2)
if task:
print(f"\n=== Task #2 Detail ===")
print(f" Title: {task.title}")
print(f" Done: {task.done}")
print(f" Created: {task.created}")


if __name__ == "__main__":
main()
Unit tests using in-memory database
"""
test_task_manager.py - Tests using :memory: for zero-infrastructure isolation.
"""
import sqlite3
import unittest
from task_manager import (
open_db, init_db, add_task, get_task, list_tasks,
complete_task, delete_task, bulk_add_tasks,
)


class TestTaskManager(unittest.TestCase):
"""All tests use :memory: - fast, isolated, no disk I/O."""

def setUp(self):
self.conn = sqlite3.connect(":memory:")
self.conn.row_factory = sqlite3.Row
init_db(self.conn)

def tearDown(self):
self.conn.close()

def test_add_and_retrieve_task(self):
task_id = add_task(self.conn, "Write tests")
task = get_task(self.conn, task_id)
self.assertIsNotNone(task)
self.assertEqual(task.title, "Write tests")
self.assertFalse(task.done)

def test_list_tasks_empty(self):
self.assertEqual(list_tasks(self.conn), [])

def test_list_tasks_all(self):
bulk_add_tasks(self.conn, ["Task A", "Task B", "Task C"])
tasks = list_tasks(self.conn)
self.assertEqual(len(tasks), 3)

def test_list_tasks_filtered(self):
id1 = add_task(self.conn, "Task A")
id2 = add_task(self.conn, "Task B")
complete_task(self.conn, id1)

pending = list_tasks(self.conn, done=False)
done = list_tasks(self.conn, done=True)

self.assertEqual(len(pending), 1)
self.assertEqual(len(done), 1)
self.assertEqual(pending[0].title, "Task B")
self.assertEqual(done[0].title, "Task A")

def test_complete_task(self):
task_id = add_task(self.conn, "To complete")
result = complete_task(self.conn, task_id)
self.assertTrue(result)
task = get_task(self.conn, task_id)
self.assertTrue(task.done)

def test_complete_already_done_returns_false(self):
task_id = add_task(self.conn, "Already done")
complete_task(self.conn, task_id)
result = complete_task(self.conn, task_id) # second time
self.assertFalse(result)

def test_delete_task(self):
task_id = add_task(self.conn, "To delete")
result = delete_task(self.conn, task_id)
self.assertTrue(result)
self.assertIsNone(get_task(self.conn, task_id))

def test_get_nonexistent_task(self):
self.assertIsNone(get_task(self.conn, 9999))

def test_bulk_add_is_atomic(self):
"""All or nothing - if one fails, none should be inserted."""
try:
with self.conn:
self.conn.executemany(
"INSERT INTO tasks (title) VALUES (?)",
[("Valid",), (None,)], # None violates NOT NULL
)
except sqlite3.IntegrityError:
pass
# Transaction rolled back - no tasks added
self.assertEqual(list_tasks(self.conn), [])


if __name__ == "__main__":
unittest.main()

Graded Practice Challenges

Write a function search_tasks(conn, keyword) that returns all tasks whose title contains the keyword (case-insensitive). Use a parameterized query with the SQL LIKE operator.

Requirements:

  • Must use a parameterized query - no f-strings
  • Must be case-insensitive
  • Returns a list of Task objects
Show Solution
def search_tasks(conn: sqlite3.Connection, keyword: str) -> list[Task]:
"""
Search tasks by title keyword (case-insensitive).
LIKE is case-insensitive for ASCII in SQLite by default.
"""
pattern = f"%{keyword}%" # build the pattern in Python, not in SQL
rows = conn.execute(
"SELECT id, title, done, created FROM tasks WHERE title LIKE ?",
(pattern,),
).fetchall()
return [Task.from_row(row) for row in rows]


# Usage
with open_db("tasks.db") as conn:
results = search_tasks(conn, "test")
for task in results:
print(task.title)

Key point: the % wildcard characters are added in Python string concatenation (f"%{keyword}%"), not inside the SQL. The entire pattern string is passed as a parameter. The SQL text itself contains only ? - this is safe.

Intermediate - Schema Migration

Your tasks table needs a new priority column (1 = high, 2 = medium, 3 = low) with a default of 2. Write a migration function that:

  1. Checks whether the priority column already exists (to make the migration idempotent)
  2. Adds the column if it does not exist
  3. Updates all existing rows to have priority = 2
Show Solution
def migrate_add_priority(conn: sqlite3.Connection) -> None:
"""
Idempotent migration: add 'priority' column to tasks if it doesn't exist.
Safe to run multiple times - does nothing on the second run.
"""
# Check existing columns via PRAGMA
columns = {
row[1] # column name is at index 1
for row in conn.execute("PRAGMA table_info(tasks)").fetchall()
}

if "priority" in columns:
print("Migration already applied - 'priority' column exists.")
return

with conn:
# ALTER TABLE ADD COLUMN cannot be in the same transaction as SELECT in SQLite
conn.execute(
"ALTER TABLE tasks ADD COLUMN priority INTEGER NOT NULL DEFAULT 2"
)
# Backfill - set all existing rows to medium priority
cursor = conn.execute("UPDATE tasks SET priority = 2")
print(f"Migration applied - updated {cursor.rowcount} existing rows.")


# Run on startup (idempotent - safe every time)
with open_db("tasks.db") as conn:
migrate_add_priority(conn)

PRAGMA table_info(table_name) returns one row per column with fields: (cid, name, type, notnull, dflt_value, pk). Using it to check for existing columns is the standard SQLite approach since SQLite does not support IF NOT EXISTS for ALTER TABLE.

Advanced - Connection Pool Emulation with Thread-Local Storage

Build a ConnectionPool class for SQLite that:

  1. Uses threading.local() to give each thread its own connection
  2. Exposes a get(path) class method that returns the thread's connection, creating it if needed
  3. Exposes a close_all() method that closes and removes all thread-local connections
  4. Tracks how many connections are currently active (across all threads)
  5. Has a configurable maximum connection count with a hard error if exceeded
Show Solution
import sqlite3
import threading
import contextlib
from typing import Optional


class SQLiteConnectionPool:
"""
Thread-local SQLite connection pool.
Each thread gets exactly one connection to a given database path.
"""
_local = threading.local()
_lock = threading.Lock()
_active_count = 0
_max_connections: int = 20

@classmethod
def configure(cls, max_connections: int = 20) -> None:
cls._max_connections = max_connections

@classmethod
def get(cls, path: str) -> sqlite3.Connection:
"""
Return the calling thread's connection to `path`.
Creates a new connection if none exists for this thread.
Raises RuntimeError if max connections exceeded.
"""
connections = getattr(cls._local, "connections", None)
if connections is None:
cls._local.connections = {}
connections = cls._local.connections

if path not in connections:
with cls._lock:
if cls._active_count >= cls._max_connections:
raise RuntimeError(
f"Connection pool exhausted: max {cls._max_connections} connections"
)
cls._active_count += 1

conn = sqlite3.connect(path, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
connections[path] = conn

return connections[path]

@classmethod
def close_current_thread(cls) -> None:
"""Close all connections for the current thread."""
connections = getattr(cls._local, "connections", {})
for path, conn in connections.items():
conn.close()
with cls._lock:
cls._active_count -= 1
cls._local.connections = {}

@classmethod
def active_count(cls) -> int:
"""How many connections are currently open across all threads."""
with cls._lock:
return cls._active_count

@classmethod
@contextlib.contextmanager
def connection(cls, path: str):
"""Context manager that returns the thread's connection."""
conn = cls.get(path)
try:
yield conn
except Exception:
conn.rollback()
raise


# --- Demo ---
def db_worker(worker_id: int, path: str):
with SQLiteConnectionPool.connection(path) as conn:
conn.execute("CREATE TABLE IF NOT EXISTS log (msg TEXT)")
with conn:
conn.execute("INSERT INTO log (msg) VALUES (?)", (f"worker-{worker_id}",))
rows = conn.execute("SELECT COUNT(*) FROM log").fetchone()
print(f"Worker {worker_id}: {rows[0]} log entries, "
f"pool size={SQLiteConnectionPool.active_count()}")
SQLiteConnectionPool.close_current_thread()


if __name__ == "__main__":
SQLiteConnectionPool.configure(max_connections=10)
threads = [
threading.Thread(target=db_worker, args=(i, ":memory:"))
for i in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Final pool count: {SQLiteConnectionPool.active_count()}")

Key Takeaways

  • Python DB-API 2.0 is the standard interface all drivers follow - learn it once, apply everywhere
  • The three layers are: Connection (session), Cursor (execution state), Results (rows)
  • Always use parameterized queries with ? placeholders - never f-strings or % formatting in SQL
  • with conn: manages transactions - commit on success, rollback on exception - but does not close the connection
  • Set conn.row_factory = sqlite3.Row immediately after connecting to access columns by name
  • ":memory:" databases are the right tool for database-layer unit tests - fast, isolated, zero infrastructure
  • SQLite connections are not thread-safe by default - the correct fix is one connection per thread (not check_same_thread=False without a lock)
  • Use SQLite for embedded apps, CLIs, testing, and single-writer workloads; move to PostgreSQL for concurrent multi-writer production services

What's Next

Lesson 03 covers PostgreSQL with Python - connecting via psycopg2 and psycopg3, connection pooling, PostgreSQL-specific types (JSONB, arrays, UUID), bulk COPY operations, and production connection management.

PostgreSQL lifts every limitation that makes you hesitate to use SQLite in production: concurrent writers, network access, replication, and advanced query features. The DB-API 2.0 patterns you learned here transfer directly - the driver changes, the interface stays the same.

© 2026 EngineersOfAI. All rights reserved.