Skip to main content

Python SQLite with Python Practice Problems & Exercises

Practice: SQLite with Python

11 problems4 Easy4 Medium3 Hard45–65 min
← Back to lesson

#1Open and Query an In-Memory DatabaseEasy
sqlite3connectin-memory

Create an in-memory SQLite database, insert three notes, and return all rows.

Solution:

import sqlite3

def create_and_query():
conn = sqlite3.connect(':memory:')
conn.execute("CREATE TABLE notes (id INTEGER PRIMARY KEY, content TEXT)")
conn.executemany("INSERT INTO notes VALUES (?,?)", [
(1, 'Hello SQLite'),
(2, 'In-memory is fast'),
(3, 'No disk needed'),
])
conn.commit()
cur = conn.execute("SELECT * FROM notes")
return cur.fetchall()

print(create_and_query())
import sqlite3

# TODO: Create an in-memory database, insert 3 rows into a 'notes' table
# with columns (id INTEGER PRIMARY KEY, content TEXT),
# then return all rows as a list of tuples.

def create_and_query():
  pass

print(create_and_query())
Expected Output
[(1, 'Hello SQLite'), (2, 'In-memory is fast'), (3, 'No disk needed')]
Hints

Hint 1: sqlite3.connect(':memory:') creates an in-memory database.

Hint 2: Use conn.execute() for DDL and DML, or create a cursor with conn.cursor().

Hint 3: conn.commit() is required before fetchall() returns inserted rows.


#2Parameterized INSERT and SELECTEasy
parameterized querySQL injection?

Insert a user with a parameterized query, then fetch and return that user by username.

Solution:

def add_and_find_user(conn, username, email):
conn.execute(
"INSERT INTO users (username, email) VALUES (?, ?)",
(username, email)
)
conn.commit()
cur = conn.execute(
"SELECT * FROM users WHERE username = ?",
(username,)
)
return cur.fetchone()
import sqlite3

conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE users (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  username TEXT UNIQUE,
  email TEXT
)
""")
conn.commit()

# TODO: Insert a user safely using parameterized query.
# Then retrieve the user by username and return the row.
# Never use string formatting for values.

def add_and_find_user(conn, username, email):
  pass

conn.execute("INSERT INTO users (username, email) VALUES (?,?)", ('alice', '[email protected]'))
conn.commit()

result = add_and_find_user(conn, 'bob', '[email protected]')
print(result)
Expected Output
(2, 'bob', '[email protected]')
Hints

Hint 1: Use (?, ?) placeholders — never f-strings — for parameterized queries.

Hint 2: conn.execute(sql, (username, email)) inserts safely.

Hint 3: fetchone() returns one row or None.


#3Row Factory — Access Columns by NameEasy
row_factorysqlite3.Rowcolumn access

Configure a row factory and return query results as a list of dictionaries.

Solution:

def modern_books(conn):
conn.row_factory = sqlite3.Row
cur = conn.execute(
"SELECT title, author, year FROM books WHERE year > 2000 ORDER BY year"
)
return [dict(row) for row in cur.fetchall()]
import sqlite3

conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE books (
  id INTEGER PRIMARY KEY,
  title TEXT,
  author TEXT,
  year INTEGER
)
""")
conn.executemany("INSERT INTO books VALUES (?,?,?,?)", [
  (1, 'Clean Code', 'Robert Martin', 2008),
  (2, 'The Pragmatic Programmer', 'Hunt & Thomas', 1999),
  (3, 'Designing Data-Intensive Applications', 'Martin Kleppmann', 2017),
])
conn.commit()

# TODO: Set the row_factory so rows are accessible by column name.
# Return a list of dicts with keys: title, author, year
# for books published after 2000.

def modern_books(conn):
  pass

for book in modern_books(conn):
  print(book)
Expected Output
{'title': 'Clean Code', 'author': 'Robert Martin', 'year': 2008}
{'title': 'Designing Data-Intensive Applications', 'author': 'Martin Kleppmann', 'year': 2017}
Hints

Hint 1: Set conn.row_factory = sqlite3.Row before executing queries.

Hint 2: sqlite3.Row supports both index-based and name-based access.

Hint 3: dict(row) converts a sqlite3.Row to a plain dictionary.


#4lastrowid and rowcountEasy
lastrowidrowcountcursor

Use cursor.lastrowid to capture the auto-assigned ID of an INSERT, and cursor.rowcount to count rows affected by an UPDATE.

Solution:

def insert_task(conn, title):
cur = conn.execute(
"INSERT INTO tasks (title) VALUES (?)", (title,)
)
conn.commit()
return cur.lastrowid

def mark_all_done(conn):
cur = conn.execute("UPDATE tasks SET done = 1")
conn.commit()
return cur.rowcount
import sqlite3

conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE tasks (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  title TEXT,
  done INTEGER DEFAULT 0
)
""")
conn.commit()

# TODO: Insert a task and return its auto-generated id.
# Then mark all tasks as done and return the number of rows updated.

def insert_task(conn, title):
  # Return the new row's id
  pass

def mark_all_done(conn):
  # Return the count of updated rows
  pass

tid = insert_task(conn, 'Write tests')
insert_task(conn, 'Deploy app')
insert_task(conn, 'Write docs')
print('New task id:', tid)
print('Rows updated:', mark_all_done(conn))
Expected Output
New task id: 1
Rows updated: 3
Hints

Hint 1: cursor.lastrowid gives the rowid of the most recently inserted row.

Hint 2: cursor.rowcount gives the number of rows affected by the last UPDATE/DELETE.

Hint 3: Use a cursor object (conn.cursor() then cur.execute()) to access these attributes.


#5Context Manager — Automatic Commit/RollbackMedium
context managerwith connrollback

Implement a fund transfer using the SQLite connection as a context manager to ensure atomic commit or rollback.

Solution:

def transfer(conn, sender_id, receiver_id, amount):
with conn:
cur = conn.execute(
"SELECT balance FROM accounts WHERE id = ?", (sender_id,)
)
sender_balance = cur.fetchone()[0]
if sender_balance < amount:
raise ValueError("Insufficient funds")
conn.execute(
"UPDATE accounts SET balance = balance - ? WHERE id = ?",
(amount, sender_id)
)
conn.execute(
"UPDATE accounts SET balance = balance + ? WHERE id = ?",
(amount, receiver_id)
)
cur = conn.execute(
"SELECT balance FROM accounts WHERE id IN (?, ?) ORDER BY id",
(sender_id, receiver_id)
)
rows = cur.fetchall()
return (rows[0][0], rows[1][0])
import sqlite3

conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE accounts (
  id INTEGER PRIMARY KEY,
  owner TEXT,
  balance REAL
)
""")
conn.executemany("INSERT INTO accounts VALUES (?,?,?)", [
  (1, 'Alice', 1000.0),
  (2, 'Bob', 500.0),
])
conn.commit()

# TODO: Transfer amount from sender_id to receiver_id using
# sqlite3 connection as context manager (with conn:).
# If sender has insufficient funds, raise ValueError WITHOUT
# modifying the database. Return new balances as (sender_bal, receiver_bal).

def transfer(conn, sender_id, receiver_id, amount):
  pass

print(transfer(conn, 1, 2, 300))  # Alice sends 300 to Bob

try:
  print(transfer(conn, 2, 1, 1000))  # Bob can't afford this
except ValueError as e:
  print(f"Transfer failed: {e}")
Expected Output
(700.0, 800.0)
Transfer failed: Insufficient funds
Hints

Hint 1: Use 'with conn:' as a transaction context — it commits on success, rolls back on exception.

Hint 2: Check balance INSIDE the with block so a ValueError triggers rollback.

Hint 3: After the transfer, query both balances to return them.


#6Batch INSERT with executemanyMedium
executemanybatch insertperformance

Use executemany to batch-insert 10,000 log entries, and return the row count.

Solution:

def bulk_insert(conn, entries):
with conn:
conn.executemany(
"INSERT INTO log_entries (level, message, ts) VALUES (?,?,?)",
entries
)
cur = conn.execute("SELECT COUNT(*) FROM log_entries")
return cur.fetchone()[0]
import sqlite3
import time

conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE log_entries (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  level TEXT,
  message TEXT,
  ts REAL
)
""")
conn.commit()

# TODO: Insert a list of (level, message, ts) tuples using executemany.
# Return the total count of inserted rows.
# Measure and print the time taken.

def bulk_insert(conn, entries):
  pass

entries = [(f'INFO', f'Log entry {i}', float(i)) for i in range(10000)]
start = time.time()
count = bulk_insert(conn, entries)
elapsed = time.time() - start
print(f"Inserted {count} rows in {elapsed:.3f}s")
Expected Output
Inserted 10000 rows in 0.XXXs
Hints

Hint 1: executemany(sql, list_of_tuples) inserts multiple rows efficiently.

Hint 2: conn.execute('SELECT COUNT(*) FROM log_entries').fetchone()[0] counts rows.

Hint 3: Wrapping executemany in a transaction (with conn:) is significantly faster.


#7Custom sqlite3 Adapter and ConverterMedium
adapterconverterdetect_typescustom type

Register a custom SQLite adapter and converter to transparently store and retrieve Python dicts as JSON.

Solution:

import sqlite3
import json

def setup_json_column():
sqlite3.register_adapter(dict, lambda d: json.dumps(d).encode())
sqlite3.register_converter('JSON', lambda b: json.loads(b))

conn = sqlite3.connect(':memory:', detect_types=sqlite3.PARSE_DECLTYPES)
conn.execute("CREATE TABLE data (id INTEGER, payload JSON)")
conn.execute("INSERT INTO data VALUES (?, ?)", (1, {'name': 'Alice', 'score': 42}))
conn.commit()

cur = conn.execute("SELECT payload FROM data WHERE id = 1")
return cur.fetchone()[0]

result = setup_json_column()
print(type(result), result)
import sqlite3
import json
from datetime import date

# TODO: Register an adapter that stores Python dicts as JSON TEXT.
# Register a converter that turns that JSON TEXT back into a dict.
# Then insert and retrieve a row that contains a dict in a column.

def setup_json_column():
  # Return the retrieved dict value from the database
  pass

result = setup_json_column()
print(type(result), result)
Expected Output
<class 'dict'> {'name': 'Alice', 'score': 42}
Hints

Hint 1: sqlite3.register_adapter(dict, lambda d: json.dumps(d).encode()) converts dict to bytes.

Hint 2: sqlite3.register_converter('JSON', lambda b: json.loads(b)) converts back.

Hint 3: Pass detect_types=sqlite3.PARSE_COLNAMES or PARSE_DECLTYPES to connect().


#8Upsert with INSERT OR REPLACEMedium
upsertINSERT OR REPLACEON CONFLICT

Implement a cache upsert that inserts a new entry or updates an existing one on key conflict.

Solution:

import time

def upsert_cache(conn, key, value):
conn.execute("""
INSERT INTO cache (key, value, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
updated_at = excluded.updated_at
""", (key, value, time.time()))
conn.commit()
cur = conn.execute("SELECT value FROM cache WHERE key = ?", (key,))
return cur.fetchone()[0]
import sqlite3

conn = sqlite3.connect(':memory:')
conn.execute("""
CREATE TABLE cache (
  key TEXT PRIMARY KEY,
  value TEXT,
  updated_at REAL
)
""")
conn.commit()

import time

# TODO: Implement an upsert function that inserts a new key-value pair
# or updates it if the key already exists.
# Use INSERT OR REPLACE or INSERT...ON CONFLICT DO UPDATE.
# Return the final value stored for the given key.

def upsert_cache(conn, key, value):
  pass

upsert_cache(conn, 'user:1', 'Alice')
upsert_cache(conn, 'user:2', 'Bob')
upsert_cache(conn, 'user:1', 'Alice Updated')  # update existing

print(upsert_cache(conn, 'user:1', 'Alice Updated'))
print(upsert_cache(conn, 'user:3', 'Carol'))
Expected Output
Alice Updated
Carol
Hints

Hint 1: INSERT OR REPLACE INTO replaces an existing row when the PRIMARY KEY conflicts.

Hint 2: Alternatively: INSERT INTO ... ON CONFLICT(key) DO UPDATE SET value=excluded.value.

Hint 3: After upsert, SELECT the value to return it.


#9WAL Mode and Multi-Reader SetupHard
WALjournal modePRAGMAconcurrent reads

Configure SQLite WAL mode and demonstrate concurrent reads alongside a writer thread.

Solution:

import sqlite3
import threading
import tempfile
import os

def wal_concurrent_test():
db_path = tempfile.mktemp(suffix='.db')
try:
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("CREATE TABLE counter (val INTEGER)")
conn.execute("INSERT INTO counter VALUES (0)")
conn.commit()
conn.close()

reader_results = []
lock = threading.Lock()

def reader():
c = sqlite3.connect(db_path)
row = c.execute("SELECT val FROM counter").fetchone()
with lock:
reader_results.append(row[0])
c.close()

def writer():
c = sqlite3.connect(db_path)
for _ in range(100):
c.execute("UPDATE counter SET val = val + 1")
c.commit()
c.close()

threads = [threading.Thread(target=reader) for _ in range(5)]
wt = threading.Thread(target=writer)
wt.start()
for t in threads:
t.start()
wt.join()
for t in threads:
t.join()

conn = sqlite3.connect(db_path)
final = conn.execute("SELECT val FROM counter").fetchone()[0]
conn.close()
return reader_results, final
finally:
if os.path.exists(db_path):
os.remove(db_path)
for ext in ['-wal', '-shm']:
p = db_path + ext
if os.path.exists(p):
os.remove(p)
import sqlite3
import threading
import tempfile
import os

# TODO: Create a file-based SQLite database in WAL mode.
# Spawn 5 reader threads and 1 writer thread that run concurrently.
# Verify all reads succeed and the final count is correct.
# Return (reader_results, final_count).

def wal_concurrent_test():
  pass

reader_results, final_count = wal_concurrent_test()
print('All readers succeeded:', all(r >= 0 for r in reader_results))
print('Final count:', final_count)
Expected Output
All readers succeeded: True
Final count: 100
Hints

Hint 1: PRAGMA journal_mode=WAL enables Write-Ahead Logging.

Hint 2: Each thread needs its own connection — do NOT share a single connection across threads.

Hint 3: WAL allows concurrent readers and one writer without blocking.


#10Full-Text Search with FTS5Hard
FTS5full-text searchvirtual table

Build an FTS5 virtual table for full-text article search, returning results ordered by relevance.

Solution:

def setup_fts(conn, articles):
conn.execute("CREATE VIRTUAL TABLE articles USING fts5(title, body)")
conn.executemany("INSERT INTO articles VALUES (?, ?)", articles)
conn.commit()

def search_articles(conn, query):
cur = conn.execute(
"SELECT title FROM articles WHERE articles MATCH ? ORDER BY rank",
(query,)
)
return [row[0] for row in cur.fetchall()]
import sqlite3

conn = sqlite3.connect(':memory:')

# TODO: Create an FTS5 virtual table for article search.
# Insert 5 articles with (title, body) columns.
# Implement a search function that returns matching titles
# sorted by relevance (bm25 rank).

articles = [
  ('Python Performance Tips', 'Profile before optimizing. Use numpy for numerical work. Avoid global lookups.'),
  ('Database Indexing Guide', 'B-tree indexes speed up range queries. Composite indexes need careful column order.'),
  ('Async Python with asyncio', 'Asyncio uses an event loop. Coroutines are cheap. Use gather for concurrency.'),
  ('SQL Window Functions', 'Window functions operate on a result set partition. ROW_NUMBER and RANK differ.'),
  ('Python Profiling Tools', 'cProfile, line_profiler, and py-spy are essential performance profiling tools.'),
]

def setup_fts(conn, articles):
  pass

def search_articles(conn, query):
  pass

setup_fts(conn, articles)
print(search_articles(conn, 'python performance'))
print(search_articles(conn, 'indexing'))
Expected Output
['Python Profiling Tools', 'Python Performance Tips']
['Database Indexing Guide']
Hints

Hint 1: CREATE VIRTUAL TABLE articles USING fts5(title, body) creates the FTS table.

Hint 2: SELECT title FROM articles WHERE articles MATCH ? searches the index.

Hint 3: ORDER BY rank uses the built-in BM25 relevance scoring.


#11SQLite Connection Pool PatternHard
connection poolthreadingqueuecontext manager

Implement a thread-safe SQLite connection pool using a queue, supporting context-manager-style acquisition.

Solution:

import sqlite3
import threading
import queue
import tempfile
import os
from contextlib import contextmanager

class SQLitePool:
def __init__(self, db_path, size=5):
self._pool = queue.Queue(maxsize=size)
for _ in range(size):
conn = sqlite3.connect(db_path, check_same_thread=False)
self._pool.put(conn)

@contextmanager
def acquire(self):
conn = self._pool.get()
try:
yield conn
finally:
self._pool.put(conn)

def close_all(self):
while not self._pool.empty():
self._pool.get().close()

def test_pool():
db_path = tempfile.mktemp(suffix='.db')
try:
setup = sqlite3.connect(db_path)
setup.execute("CREATE TABLE test (val INTEGER)")
setup.execute("INSERT INTO test VALUES (42)")
setup.commit()
setup.close()

pool = SQLitePool(db_path, size=3)
results = []
errors = []
lock = threading.Lock()

def worker():
try:
with pool.acquire() as conn:
val = conn.execute("SELECT val FROM test").fetchone()[0]
with lock:
results.append(val)
except Exception as e:
with lock:
errors.append(str(e))

threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()

pool.close_all()
assert len(errors) == 0
assert len(results) == 10
print("All 10 threads completed successfully")
finally:
if os.path.exists(db_path):
os.remove(db_path)

test_pool()
import sqlite3
import threading
import queue
import tempfile
import os

# TODO: Implement a simple SQLite connection pool.
# - Pool holds N connections to a database file
# - get() acquires a connection (blocks if none available)
# - release(conn) returns it to the pool
# - Use as a context manager (with pool.acquire() as conn:)
# - Test with 10 threads each doing a read query

class SQLitePool:
  def __init__(self, db_path, size=5):
      pass

  def acquire(self):
      pass

  def release(self, conn):
      pass

# Test
def test_pool():
  pass

test_pool()
Expected Output
All 10 threads completed successfully
Hints

Hint 1: Use queue.Queue(maxsize=size) to hold available connections.

Hint 2: get() calls self._pool.get() (blocking); release() calls self._pool.put(conn).

Hint 3: A context manager class needs __enter__ (returns conn) and __exit__ (calls release).

© 2026 EngineersOfAI. All rights reserved.