Python SQLite with Python Practice Problems & Exercises
Practice: SQLite with Python
← Back to lessonCreate an in-memory SQLite database, insert three notes, and return all rows.
Solution:
import sqlite3
def create_and_query():
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE notes (id INTEGER PRIMARY KEY, content TEXT)")
conn.executemany("INSERT INTO notes VALUES (?,?)", [
(1, 'Hello SQLite'),
(2, 'In-memory is fast'),
(3, 'No disk needed'),
])
conn.commit()
cur = conn.execute("SELECT * FROM notes")
return cur.fetchall()
print(create_and_query())
import sqlite3
# TODO: Create an in-memory database, insert 3 rows into a 'notes' table
# with columns (id INTEGER PRIMARY KEY, content TEXT),
# then return all rows as a list of tuples.
def create_and_query():
pass
print(create_and_query())
Expected Output
[(1, 'Hello SQLite'), (2, 'In-memory is fast'), (3, 'No disk needed')]Hints
Hint 1: sqlite3.connect(':memory:') creates an in-memory database.
Hint 2: Use conn.execute() for DDL and DML, or create a cursor with conn.cursor().
Hint 3: conn.commit() is required before fetchall() returns inserted rows.
Insert a user with a parameterized query, then fetch and return that user by username.
Solution:
def add_and_find_user(conn, username, email):
conn.execute(
"INSERT INTO users (username, email) VALUES (?, ?)",
(username, email)
)
conn.commit()
cur = conn.execute(
"SELECT * FROM users WHERE username = ?",
(username,)
)
return cur.fetchone()
import sqlite3
conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE,
email TEXT
)
""")
conn.commit()
# TODO: Insert a user safely using parameterized query.
# Then retrieve the user by username and return the row.
# Never use string formatting for values.
def add_and_find_user(conn, username, email):
pass
conn.execute("INSERT INTO users (username, email) VALUES (?,?)", ('alice', '[email protected]'))
conn.commit()
result = add_and_find_user(conn, 'bob', '[email protected]')
print(result)
Expected Output
(2, 'bob', '[email protected]')Hints
Hint 1: Use (?, ?) placeholders — never f-strings — for parameterized queries.
Hint 2: conn.execute(sql, (username, email)) inserts safely.
Hint 3: fetchone() returns one row or None.
Configure a row factory and return query results as a list of dictionaries.
Solution:
def modern_books(conn):
conn.row_factory = sqlite3.Row
cur = conn.execute(
"SELECT title, author, year FROM books WHERE year > 2000 ORDER BY year"
)
return [dict(row) for row in cur.fetchall()]
import sqlite3
conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE books (
id INTEGER PRIMARY KEY,
title TEXT,
author TEXT,
year INTEGER
)
""")
conn.executemany("INSERT INTO books VALUES (?,?,?,?)", [
(1, 'Clean Code', 'Robert Martin', 2008),
(2, 'The Pragmatic Programmer', 'Hunt & Thomas', 1999),
(3, 'Designing Data-Intensive Applications', 'Martin Kleppmann', 2017),
])
conn.commit()
# TODO: Set the row_factory so rows are accessible by column name.
# Return a list of dicts with keys: title, author, year
# for books published after 2000.
def modern_books(conn):
pass
for book in modern_books(conn):
print(book)
Expected Output
{'title': 'Clean Code', 'author': 'Robert Martin', 'year': 2008}
{'title': 'Designing Data-Intensive Applications', 'author': 'Martin Kleppmann', 'year': 2017}Hints
Hint 1: Set conn.row_factory = sqlite3.Row before executing queries.
Hint 2: sqlite3.Row supports both index-based and name-based access.
Hint 3: dict(row) converts a sqlite3.Row to a plain dictionary.
Use cursor.lastrowid to capture the auto-assigned ID of an INSERT, and cursor.rowcount to count rows affected by an UPDATE.
Solution:
def insert_task(conn, title):
cur = conn.execute(
"INSERT INTO tasks (title) VALUES (?)", (title,)
)
conn.commit()
return cur.lastrowid
def mark_all_done(conn):
cur = conn.execute("UPDATE tasks SET done = 1")
conn.commit()
return cur.rowcount
import sqlite3
conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
done INTEGER DEFAULT 0
)
""")
conn.commit()
# TODO: Insert a task and return its auto-generated id.
# Then mark all tasks as done and return the number of rows updated.
def insert_task(conn, title):
# Return the new row's id
pass
def mark_all_done(conn):
# Return the count of updated rows
pass
tid = insert_task(conn, 'Write tests')
insert_task(conn, 'Deploy app')
insert_task(conn, 'Write docs')
print('New task id:', tid)
print('Rows updated:', mark_all_done(conn))
Expected Output
New task id: 1
Rows updated: 3Hints
Hint 1: cursor.lastrowid gives the rowid of the most recently inserted row.
Hint 2: cursor.rowcount gives the number of rows affected by the last UPDATE/DELETE.
Hint 3: Use a cursor object (conn.cursor() then cur.execute()) to access these attributes.
Implement a fund transfer using the SQLite connection as a context manager to ensure atomic commit or rollback.
Solution:
def transfer(conn, sender_id, receiver_id, amount):
with conn:
cur = conn.execute(
"SELECT balance FROM accounts WHERE id = ?", (sender_id,)
)
sender_balance = cur.fetchone()[0]
if sender_balance < amount:
raise ValueError("Insufficient funds")
conn.execute(
"UPDATE accounts SET balance = balance - ? WHERE id = ?",
(amount, sender_id)
)
conn.execute(
"UPDATE accounts SET balance = balance + ? WHERE id = ?",
(amount, receiver_id)
)
cur = conn.execute(
"SELECT balance FROM accounts WHERE id IN (?, ?) ORDER BY id",
(sender_id, receiver_id)
)
rows = cur.fetchall()
return (rows[0][0], rows[1][0])
import sqlite3
conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE accounts (
id INTEGER PRIMARY KEY,
owner TEXT,
balance REAL
)
""")
conn.executemany("INSERT INTO accounts VALUES (?,?,?)", [
(1, 'Alice', 1000.0),
(2, 'Bob', 500.0),
])
conn.commit()
# TODO: Transfer amount from sender_id to receiver_id using
# sqlite3 connection as context manager (with conn:).
# If sender has insufficient funds, raise ValueError WITHOUT
# modifying the database. Return new balances as (sender_bal, receiver_bal).
def transfer(conn, sender_id, receiver_id, amount):
pass
print(transfer(conn, 1, 2, 300)) # Alice sends 300 to Bob
try:
print(transfer(conn, 2, 1, 1000)) # Bob can't afford this
except ValueError as e:
print(f"Transfer failed: {e}")
Expected Output
(700.0, 800.0)
Transfer failed: Insufficient fundsHints
Hint 1: Use 'with conn:' as a transaction context — it commits on success, rolls back on exception.
Hint 2: Check balance INSIDE the with block so a ValueError triggers rollback.
Hint 3: After the transfer, query both balances to return them.
Use executemany to batch-insert 10,000 log entries, and return the row count.
Solution:
def bulk_insert(conn, entries):
with conn:
conn.executemany(
"INSERT INTO log_entries (level, message, ts) VALUES (?,?,?)",
entries
)
cur = conn.execute("SELECT COUNT(*) FROM log_entries")
return cur.fetchone()[0]
import sqlite3
import time
conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE log_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
level TEXT,
message TEXT,
ts REAL
)
""")
conn.commit()
# TODO: Insert a list of (level, message, ts) tuples using executemany.
# Return the total count of inserted rows.
# Measure and print the time taken.
def bulk_insert(conn, entries):
pass
entries = [(f'INFO', f'Log entry {i}', float(i)) for i in range(10000)]
start = time.time()
count = bulk_insert(conn, entries)
elapsed = time.time() - start
print(f"Inserted {count} rows in {elapsed:.3f}s")
Expected Output
Inserted 10000 rows in 0.XXXsHints
Hint 1: executemany(sql, list_of_tuples) inserts multiple rows efficiently.
Hint 2: conn.execute('SELECT COUNT(*) FROM log_entries').fetchone()[0] counts rows.
Hint 3: Wrapping executemany in a transaction (with conn:) is significantly faster.
Register a custom SQLite adapter and converter to transparently store and retrieve Python dicts as JSON.
Solution:
import sqlite3
import json
def setup_json_column():
sqlite3.register_adapter(dict, lambda d: json.dumps(d).encode())
sqlite3.register_converter('JSON', lambda b: json.loads(b))
conn = sqlite3.connect(':memory:', detect_types=sqlite3.PARSE_DECLTYPES)
conn.execute("CREATE TABLE data (id INTEGER, payload JSON)")
conn.execute("INSERT INTO data VALUES (?, ?)", (1, {'name': 'Alice', 'score': 42}))
conn.commit()
cur = conn.execute("SELECT payload FROM data WHERE id = 1")
return cur.fetchone()[0]
result = setup_json_column()
print(type(result), result)
import sqlite3
import json
from datetime import date
# TODO: Register an adapter that stores Python dicts as JSON TEXT.
# Register a converter that turns that JSON TEXT back into a dict.
# Then insert and retrieve a row that contains a dict in a column.
def setup_json_column():
# Return the retrieved dict value from the database
pass
result = setup_json_column()
print(type(result), result)
Expected Output
<class 'dict'> {'name': 'Alice', 'score': 42}Hints
Hint 1: sqlite3.register_adapter(dict, lambda d: json.dumps(d).encode()) converts dict to bytes.
Hint 2: sqlite3.register_converter('JSON', lambda b: json.loads(b)) converts back.
Hint 3: Pass detect_types=sqlite3.PARSE_COLNAMES or PARSE_DECLTYPES to connect().
Implement a cache upsert that inserts a new entry or updates an existing one on key conflict.
Solution:
import time
def upsert_cache(conn, key, value):
conn.execute("""
INSERT INTO cache (key, value, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
updated_at = excluded.updated_at
""", (key, value, time.time()))
conn.commit()
cur = conn.execute("SELECT value FROM cache WHERE key = ?", (key,))
return cur.fetchone()[0]
import sqlite3
conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE cache (
key TEXT PRIMARY KEY,
value TEXT,
updated_at REAL
)
""")
conn.commit()
import time
# TODO: Implement an upsert function that inserts a new key-value pair
# or updates it if the key already exists.
# Use INSERT OR REPLACE or INSERT...ON CONFLICT DO UPDATE.
# Return the final value stored for the given key.
def upsert_cache(conn, key, value):
pass
upsert_cache(conn, 'user:1', 'Alice')
upsert_cache(conn, 'user:2', 'Bob')
upsert_cache(conn, 'user:1', 'Alice Updated') # update existing
print(upsert_cache(conn, 'user:1', 'Alice Updated'))
print(upsert_cache(conn, 'user:3', 'Carol'))
Expected Output
Alice Updated
CarolHints
Hint 1: INSERT OR REPLACE INTO replaces an existing row when the PRIMARY KEY conflicts.
Hint 2: Alternatively: INSERT INTO ... ON CONFLICT(key) DO UPDATE SET value=excluded.value.
Hint 3: After upsert, SELECT the value to return it.
Configure SQLite WAL mode and demonstrate concurrent reads alongside a writer thread.
Solution:
import sqlite3
import threading
import tempfile
import os
def wal_concurrent_test():
db_path = tempfile.mktemp(suffix='.db')
try:
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("CREATE TABLE counter (val INTEGER)")
conn.execute("INSERT INTO counter VALUES (0)")
conn.commit()
conn.close()
reader_results = []
lock = threading.Lock()
def reader():
c = sqlite3.connect(db_path)
row = c.execute("SELECT val FROM counter").fetchone()
with lock:
reader_results.append(row[0])
c.close()
def writer():
c = sqlite3.connect(db_path)
for _ in range(100):
c.execute("UPDATE counter SET val = val + 1")
c.commit()
c.close()
threads = [threading.Thread(target=reader) for _ in range(5)]
wt = threading.Thread(target=writer)
wt.start()
for t in threads:
t.start()
wt.join()
for t in threads:
t.join()
conn = sqlite3.connect(db_path)
final = conn.execute("SELECT val FROM counter").fetchone()[0]
conn.close()
return reader_results, final
finally:
if os.path.exists(db_path):
os.remove(db_path)
for ext in ['-wal', '-shm']:
p = db_path + ext
if os.path.exists(p):
os.remove(p)
import sqlite3
import threading
import tempfile
import os
# TODO: Create a file-based SQLite database in WAL mode.
# Spawn 5 reader threads and 1 writer thread that run concurrently.
# Verify all reads succeed and the final count is correct.
# Return (reader_results, final_count).
def wal_concurrent_test():
pass
reader_results, final_count = wal_concurrent_test()
print('All readers succeeded:', all(r >= 0 for r in reader_results))
print('Final count:', final_count)
Expected Output
All readers succeeded: True
Final count: 100Hints
Hint 1: PRAGMA journal_mode=WAL enables Write-Ahead Logging.
Hint 2: Each thread needs its own connection — do NOT share a single connection across threads.
Hint 3: WAL allows concurrent readers and one writer without blocking.
Build an FTS5 virtual table for full-text article search, returning results ordered by relevance.
Solution:
def setup_fts(conn, articles):
conn.execute("CREATE VIRTUAL TABLE articles USING fts5(title, body)")
conn.executemany("INSERT INTO articles VALUES (?, ?)", articles)
conn.commit()
def search_articles(conn, query):
cur = conn.execute(
"SELECT title FROM articles WHERE articles MATCH ? ORDER BY rank",
(query,)
)
return [row[0] for row in cur.fetchall()]
import sqlite3
conn = sqlite3.connect(':memory:')
# TODO: Create an FTS5 virtual table for article search.
# Insert 5 articles with (title, body) columns.
# Implement a search function that returns matching titles
# sorted by relevance (bm25 rank).
articles = [
('Python Performance Tips', 'Profile before optimizing. Use numpy for numerical work. Avoid global lookups.'),
('Database Indexing Guide', 'B-tree indexes speed up range queries. Composite indexes need careful column order.'),
('Async Python with asyncio', 'Asyncio uses an event loop. Coroutines are cheap. Use gather for concurrency.'),
('SQL Window Functions', 'Window functions operate on a result set partition. ROW_NUMBER and RANK differ.'),
('Python Profiling Tools', 'cProfile, line_profiler, and py-spy are essential performance profiling tools.'),
]
def setup_fts(conn, articles):
pass
def search_articles(conn, query):
pass
setup_fts(conn, articles)
print(search_articles(conn, 'python performance'))
print(search_articles(conn, 'indexing'))
Expected Output
['Python Profiling Tools', 'Python Performance Tips']
['Database Indexing Guide']Hints
Hint 1: CREATE VIRTUAL TABLE articles USING fts5(title, body) creates the FTS table.
Hint 2: SELECT title FROM articles WHERE articles MATCH ? searches the index.
Hint 3: ORDER BY rank uses the built-in BM25 relevance scoring.
Implement a thread-safe SQLite connection pool using a queue, supporting context-manager-style acquisition.
Solution:
import sqlite3
import threading
import queue
import tempfile
import os
from contextlib import contextmanager
class SQLitePool:
def __init__(self, db_path, size=5):
self._pool = queue.Queue(maxsize=size)
for _ in range(size):
conn = sqlite3.connect(db_path, check_same_thread=False)
self._pool.put(conn)
@contextmanager
def acquire(self):
conn = self._pool.get()
try:
yield conn
finally:
self._pool.put(conn)
def close_all(self):
while not self._pool.empty():
self._pool.get().close()
def test_pool():
db_path = tempfile.mktemp(suffix='.db')
try:
setup = sqlite3.connect(db_path)
setup.execute("CREATE TABLE test (val INTEGER)")
setup.execute("INSERT INTO test VALUES (42)")
setup.commit()
setup.close()
pool = SQLitePool(db_path, size=3)
results = []
errors = []
lock = threading.Lock()
def worker():
try:
with pool.acquire() as conn:
val = conn.execute("SELECT val FROM test").fetchone()[0]
with lock:
results.append(val)
except Exception as e:
with lock:
errors.append(str(e))
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
pool.close_all()
assert len(errors) == 0
assert len(results) == 10
print("All 10 threads completed successfully")
finally:
if os.path.exists(db_path):
os.remove(db_path)
test_pool()
import sqlite3
import threading
import queue
import tempfile
import os
# TODO: Implement a simple SQLite connection pool.
# - Pool holds N connections to a database file
# - get() acquires a connection (blocks if none available)
# - release(conn) returns it to the pool
# - Use as a context manager (with pool.acquire() as conn:)
# - Test with 10 threads each doing a read query
class SQLitePool:
def __init__(self, db_path, size=5):
pass
def acquire(self):
pass
def release(self, conn):
pass
# Test
def test_pool():
pass
test_pool()
Expected Output
All 10 threads completed successfullyHints
Hint 1: Use queue.Queue(maxsize=size) to hold available connections.
Hint 2: get() calls self._pool.get() (blocking); release() calls self._pool.put(conn).
Hint 3: A context manager class needs __enter__ (returns conn) and __exit__ (calls release).
