Skip to main content

Mock Coding Problems

20 full mock coding problems simulating real AI interview questions - a mix of DSA, NumPy/Pandas, SQL, and ML implementation. Each with problem statement, constraints, hints, full solution, and complexity analysis.

Reading time: ~60 min | Interview relevance: Critical | Roles: MLE, AI Engineer, Data Scientist, Research Engineer

The Real Interview Moment

You are in the final coding round at a top AI company. You have sixty minutes and four problems. The interviewer says: "These problems are designed to test the range of skills we need on the team. You will see a data structures problem, a NumPy problem, a SQL problem, and an ML implementation problem. Prioritize correctness over speed - a working solution with clear explanations beats a fast but buggy one."

You take a breath. You know the drill: read the problem carefully, clarify constraints, think before coding, and talk through your approach. You have practiced exactly this scenario dozens of times.

This chapter is your practice arena. Work through these problems under timed conditions. For each one, try to solve it independently before reading the hints or solution. Track your time - if a Medium takes you more than 25 minutes, revisit the underlying pattern.

How to Use This Chapter

Mock Problem Practice Workflow

60-Second Answer

Interview pacing strategy: In a 4-problem, 60-minute interview, spend at most 15 minutes per problem. If you are stuck after 5 minutes of coding, step back and re-examine your approach. A clear explanation of a brute-force solution is better than a confused attempt at an optimal one. Always start by stating your approach and time complexity before writing code.

Problem 1: Two Sum with Model Predictions (DSA - Easy)

Category: Hash Map | Time limit: 15 minutes | Difficulty: Easy

You have an array of model prediction scores and a target sum. Find two predictions whose scores sum to exactly the target. Return their indices.

Constraints:

  • 2 <= len(predictions) <= 10^5
  • -10^9 <= predictions[i] <= 10^9
  • Exactly one solution exists
  • You may not use the same element twice
# Input: predictions = [0.2, 0.7, 0.45, 0.3], target = 1.0
# Output: [1, 2] (0.7 + 0.3... wait, 0.7 + 0.45 = 1.15, 0.2 + 0.3 = 0.5)
# Let's use integers for clarity:
# Input: predictions = [2, 7, 11, 15], target = 9
# Output: [0, 1] (2 + 7 = 9)
Hint 1
For each number, the complement is `target - number`. Can you check if the complement has already been seen?
Hint 2
Use a hash map to store `{value: index}` as you iterate. For each new element, check if `target - element` exists in the map.
Solution
def two_sum(nums, target):
"""Find two indices whose values sum to target.

Time: O(n), Space: O(n)
"""
seen = {} # value -> index
for i, num in enumerate(nums):
complement = target - num
if complement in seen:
return [seen[complement], i]
seen[num] = i
return []

Complexity: O(n) time (single pass), O(n) space (hash map). The brute force O(n^2) approach checks all pairs - the hash map reduces lookup from O(n) to O(1).

Problem 2: Batch Matrix Multiplication Check (NumPy - Easy)

Category: NumPy | Time limit: 15 minutes | Difficulty: Easy

Given two 3D arrays A of shape (batch, m, k) and B of shape (batch, k, n), compute the batch matrix multiplication and return the result of shape (batch, m, n). Also verify that the result matches a naive implementation.

Constraints:

  • batch <= 1000
  • m, k, n <= 100
  • Use NumPy operations only (no explicit loops for the matrix multiplication)
# Input: A.shape = (32, 4, 8), B.shape = (32, 8, 3)
# Output: C.shape = (32, 4, 3)
# C[i] = A[i] @ B[i] for each i in batch
Hint 1
NumPy's `@` operator and `np.matmul` already handle batch dimensions.
Hint 2
For the verification, use `np.einsum('bij,bjk->bik', A, B)` as an independent check, or loop and compare per-sample.
Solution
import numpy as np

def batch_matmul(A, B):
"""Batch matrix multiplication using NumPy.

Time: O(batch * m * k * n) - same as naive, but parallelized
Space: O(batch * m * n)
"""
# Method 1: Direct - handles batch dimensions automatically
C = A @ B # or np.matmul(A, B)

return C

def verify_batch_matmul(A, B, C):
"""Verify batch matmul result against a naive loop.

Returns True if results match within tolerance.
"""
batch = A.shape[0]
for i in range(batch):
expected = A[i] @ B[i]
if not np.allclose(C[i], expected, atol=1e-6):
return False
return True

def batch_matmul_einsum(A, B):
"""Alternative using Einstein summation - more explicit about indices.

'bij,bjk->bik' means:
- b = batch dimension (shared)
- j = contraction dimension (summed over)
- i, k = output dimensions
"""
return np.einsum('bij,bjk->bik', A, B)

# Example usage:
# A = np.random.randn(32, 4, 8)
# B = np.random.randn(32, 8, 3)
# C = batch_matmul(A, B)
# assert C.shape == (32, 4, 3)
# assert verify_batch_matmul(A, B, C)

Complexity: O(batch * m * k * n) arithmetic operations. NumPy executes this in optimized C/BLAS, making it orders of magnitude faster than a Python loop.

Problem 3: Top-K Frequent Words in Documents (DSA - Medium)

Category: Hash Map + Heap | Time limit: 25 minutes | Difficulty: Medium

Given a list of words from a document corpus, return the K most frequent words sorted by frequency (descending). If two words have the same frequency, sort them alphabetically.

Constraints:

  • 1 <= len(words) <= 10^6
  • 1 <= k <= number of unique words
  • Words are lowercase alphabetic strings
# Input: words = ["neural", "network", "deep", "neural", "deep", "deep",
# "learning", "network", "neural"], k = 2
# Output: ["deep", "neural"]
# Frequencies: deep=3, neural=3, network=2, learning=1
# Tie broken alphabetically: "deep" before "neural"
Hint 1
Count frequencies with a hash map, then use a heap to find the top K.
Hint 2
To handle the tie-breaking (alphabetical for same frequency), use a min-heap with entries `(-freq, word)`. The negation makes higher frequencies "smaller" in the heap. For alphabetical ordering, Python's string comparison handles it naturally since we want alphabetical order for ties.
Hint 3
Use `heapq.nsmallest(k, items, key=...)` or build a custom heap. Alternatively, just sort - O(n log n) is often acceptable.
Solution
from collections import Counter
import heapq

def top_k_frequent_words(words, k):
"""Find k most frequent words with alphabetical tie-breaking.

Time: O(n + m log k) where m = unique words
Space: O(m)
"""
counts = Counter(words)

# Sort by (-frequency, word) - nsmallest gives us the k smallest,
# which are the k highest frequencies (due to negation)
# and alphabetically first for ties
return heapq.nsmallest(k, counts.keys(),
key=lambda w: (-counts[w], w))

# Alternative: bucket sort approach for O(n) when k is large
def top_k_frequent_bucket(words, k):
"""O(n) approach using bucket sort.

Create buckets indexed by frequency. Each bucket contains words
with that frequency. Iterate from highest frequency down.
"""
counts = Counter(words)
max_freq = max(counts.values())

# Buckets: index = frequency, value = list of words
buckets = [[] for _ in range(max_freq + 1)]
for word, freq in counts.items():
buckets[freq].append(word)

result = []
for freq in range(max_freq, 0, -1):
bucket = sorted(buckets[freq]) # Alphabetical within same freq
for word in bucket:
result.append(word)
if len(result) == k:
return result

return result

Complexity: Heap approach: O(n + m log k). Bucket sort approach: O(n) amortized. The bucket approach is better when k is close to m.

Problem 4: Rolling Window Feature Engineering (Pandas - Medium)

Category: Pandas | Time limit: 25 minutes | Difficulty: Medium

Given a DataFrame with columns timestamp, user_id, and purchase_amount, compute for each row:

  1. The user's rolling 7-day total purchase amount (including the current row)
  2. The user's rolling 30-day average purchase amount
  3. The number of purchases the user made in the last 7 days
# Input DataFrame:
# | timestamp | user_id | purchase_amount |
# |------------|---------|-----------------|
# | 2024-01-01 | A | 50 |
# | 2024-01-03 | A | 30 |
# | 2024-01-05 | B | 100 |
# | 2024-01-06 | A | 20 |
# | 2024-01-08 | A | 40 |

# Output (additional columns for user A, row 2024-01-08):
# rolling_7d_sum = 20 + 40 = 60 (Jan 6 and Jan 8, within 7 days of Jan 8)
# Actually: 30 + 20 + 40 = 90 (Jan 3, 6, 8 all within 7 days)
# rolling_30d_avg = (50 + 30 + 20 + 40) / 4 = 35.0
# rolling_7d_count = 3
Hint 1
Use `groupby('user_id')` with `.rolling('7D')` on a DatetimeIndex. Make sure to sort by timestamp first and set it as the index.
Hint 2
Pandas rolling with a time offset like '7D' requires a DatetimeIndex. Use `set_index('timestamp').sort_index()` within each group.
Hint 3
For rolling count, use `.rolling('7D')['purchase_amount'].count()`.
Solution
import pandas as pd

def compute_rolling_features(df):
"""Compute rolling window features per user.

Time: O(n log n) for sorting + O(n * window) for rolling
Space: O(n)
"""
df = df.copy()
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values(['user_id', 'timestamp'])

# Set timestamp as index for time-based rolling
df = df.set_index('timestamp')

# Compute rolling features per user
grouped = df.groupby('user_id')['purchase_amount']

df['rolling_7d_sum'] = grouped.transform(
lambda x: x.rolling('7D').sum()
)
df['rolling_30d_avg'] = grouped.transform(
lambda x: x.rolling('30D').mean()
)
df['rolling_7d_count'] = grouped.transform(
lambda x: x.rolling('7D').count()
)

return df.reset_index()

# Alternative using explicit merge for complex window logic:
def compute_rolling_features_manual(df):
"""Manual approach without time-based rolling - more control.

For each row, filter to same user + within time window + before/equal
current timestamp, then aggregate.
"""
df = df.copy()
df['timestamp'] = pd.to_datetime(df['timestamp'])
results = []

for idx, row in df.iterrows():
user_mask = df['user_id'] == row['user_id']
time_mask_7d = (
(df['timestamp'] >= row['timestamp'] - pd.Timedelta(days=6)) &
(df['timestamp'] <= row['timestamp'])
)
time_mask_30d = (
(df['timestamp'] >= row['timestamp'] - pd.Timedelta(days=29)) &
(df['timestamp'] <= row['timestamp'])
)

window_7d = df[user_mask & time_mask_7d]['purchase_amount']
window_30d = df[user_mask & time_mask_30d]['purchase_amount']

results.append({
'rolling_7d_sum': window_7d.sum(),
'rolling_30d_avg': window_30d.mean(),
'rolling_7d_count': len(window_7d),
})

return pd.concat([df, pd.DataFrame(results, index=df.index)], axis=1)

Complexity: Pandas rolling approach: O(n log n) for sorting, O(n) for rolling (amortized). Manual approach: O(n^2) - only use for verification or small data. In interviews, always mention you would use the Pandas rolling approach in production.

Problem 5: LRU Cache for Model Predictions (DSA - Medium)

Category: Hash Map + Doubly Linked List | Time limit: 25 minutes | Difficulty: Medium

Design a Least Recently Used (LRU) cache for storing model prediction results. The cache has a fixed capacity. When the cache is full and a new entry is added, evict the least recently used entry. Both get and put must be O(1).

Constraints:

  • 1 <= capacity <= 3000
  • 0 <= key <= 10^4
  • 0 <= value <= 10^5
  • At most 2 * 10^5 calls to get and put
# ML context: Caching inference results for repeated inputs.
# If the same input has been seen recently, return the cached prediction
# instead of running the model again.

# cache = LRUCache(2)
# cache.put(1, 0.95) # cache: {1: 0.95}
# cache.put(2, 0.87) # cache: {1: 0.95, 2: 0.87}
# cache.get(1) # returns 0.95, cache: {2: 0.87, 1: 0.95}
# cache.put(3, 0.92) # evicts key 2, cache: {1: 0.95, 3: 0.92}
# cache.get(2) # returns -1 (not found)
Hint 1
You need O(1) lookup (hash map) and O(1) removal/insertion of the LRU element (doubly linked list). Combine both data structures.
Hint 2
Use a hash map mapping key to a node in a doubly linked list. The list maintains access order - most recently used at the tail, least recently used at the head.
Hint 3
On `get`: move the node to the tail. On `put`: if key exists, update and move to tail. If new and at capacity, remove the head node, then add the new node at the tail.
Solution
class ListNode:
def __init__(self, key=0, val=0):
self.key = key
self.val = val
self.prev = None
self.next = None

class LRUCache:
"""LRU Cache with O(1) get and put.

Uses a hash map + doubly linked list.

Time: O(1) for both get and put
Space: O(capacity)
"""
def __init__(self, capacity):
self.capacity = capacity
self.cache = {} # key -> ListNode

# Dummy head and tail to simplify edge cases
self.head = ListNode()
self.tail = ListNode()
self.head.next = self.tail
self.tail.prev = self.head

def _remove(self, node):
"""Remove a node from the doubly linked list."""
node.prev.next = node.next
node.next.prev = node.prev

def _add_to_tail(self, node):
"""Add a node right before the tail (most recently used)."""
node.prev = self.tail.prev
node.next = self.tail
self.tail.prev.next = node
self.tail.prev = node

def get(self, key):
if key not in self.cache:
return -1
node = self.cache[key]
self._remove(node)
self._add_to_tail(node)
return node.val

def put(self, key, value):
if key in self.cache:
node = self.cache[key]
node.val = value
self._remove(node)
self._add_to_tail(node)
else:
if len(self.cache) >= self.capacity:
# Evict LRU (head.next)
lru = self.head.next
self._remove(lru)
del self.cache[lru.key]

new_node = ListNode(key, value)
self.cache[key] = new_node
self._add_to_tail(new_node)

Complexity: O(1) for both get and put. Space is O(capacity). Python's collections.OrderedDict provides this functionality built-in, but interviewers want to see the manual implementation.

Common Trap

Do not use Python's OrderedDict as your primary solution unless the interviewer explicitly allows it. The point of this problem is to test your understanding of hash maps and linked lists working together. Mention OrderedDict as a follow-up: "In production, I would use OrderedDict.move_to_end() and popitem(last=False)."

Problem 6: Cosine Similarity Matrix (NumPy - Medium)

Category: NumPy | Time limit: 25 minutes | Difficulty: Medium

Given a matrix of document embeddings (shape: n_docs x embed_dim), compute the pairwise cosine similarity matrix without using explicit Python loops.

Constraints:

  • 1 <= n_docs <= 10000
  • 1 <= embed_dim <= 768
  • Output shape: (n_docs, n_docs)
  • Output[i][j] = cosine similarity between document i and document j
# Cosine similarity: cos(A, B) = (A . B) / (||A|| * ||B||)
# Input: embeddings.shape = (1000, 768)
# Output: similarity_matrix.shape = (1000, 1000)
Hint 1
Normalize each row to unit length first. Then cosine similarity becomes just dot product.
Hint 2
Norm of each row: `norms = np.linalg.norm(embeddings, axis=1, keepdims=True)`. Normalize: `normalized = embeddings / norms`. Similarity: `normalized @ normalized.T`.
Hint 3
Handle zero vectors (zero norm) by adding a small epsilon to avoid division by zero: `norms = np.maximum(norms, 1e-8)`.
Solution
import numpy as np

def cosine_similarity_matrix(embeddings):
"""Compute pairwise cosine similarity matrix.

Time: O(n^2 * d) for the matrix multiply
Space: O(n^2) for the output + O(n * d) for normalized copy
"""
# Compute L2 norms
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
norms = np.maximum(norms, 1e-8) # Avoid division by zero

# Normalize to unit vectors
normalized = embeddings / norms

# Cosine similarity = dot product of unit vectors
similarity = normalized @ normalized.T

# Clip to [-1, 1] to handle floating point errors
return np.clip(similarity, -1.0, 1.0)

def top_k_similar(embeddings, query_idx, k):
"""Find the k most similar documents to a query document.

More efficient than computing the full matrix when you only
need similarities for one query.

Time: O(n * d), Space: O(n)
"""
query = embeddings[query_idx]
query_norm = np.linalg.norm(query)
if query_norm < 1e-8:
return []

# Compute similarities with all documents
norms = np.linalg.norm(embeddings, axis=1)
norms = np.maximum(norms, 1e-8)

similarities = (embeddings @ query) / (norms * query_norm)

# Get top-k indices (excluding the query itself)
similarities[query_idx] = -np.inf
top_indices = np.argpartition(similarities, -k)[-k:]
top_indices = top_indices[np.argsort(-similarities[top_indices])]

return list(zip(top_indices, similarities[top_indices]))

Complexity: Full matrix: O(n^2 * d). Single query top-K: O(n * d + n log K) using argpartition. The key NumPy insight is that normalizing first converts cosine similarity to a single matrix multiplication.

Problem 7: SQL - User Retention Analysis

Category: SQL | Time limit: 25 minutes | Difficulty: Medium

Given tables users(user_id, signup_date) and logins(user_id, login_date), write a SQL query to compute the 7-day retention rate: the percentage of users who signed up on each date and logged in again within 7 days of signup (not counting the signup day itself).

-- Expected output:
-- | signup_date | total_signups | retained_users | retention_rate |
-- |-------------|---------------|----------------|----------------|
-- | 2024-01-01 | 100 | 42 | 0.42 |
-- | 2024-01-02 | 85 | 38 | 0.447 |
Hint 1
Join users with logins where the login date is between signup_date + 1 and signup_date + 7.
Hint 2
Use a LEFT JOIN so that users who never logged in again still appear in the count. Use COUNT(DISTINCT) on the login table's user_id to count retained users.
Hint 3
Group by signup_date and compute the ratio. Use CAST or multiply by 1.0 to get a decimal result.
Solution
SELECT
u.signup_date,
COUNT(DISTINCT u.user_id) AS total_signups,
COUNT(DISTINCT l.user_id) AS retained_users,
ROUND(
COUNT(DISTINCT l.user_id) * 1.0 / COUNT(DISTINCT u.user_id),
3
) AS retention_rate
FROM users u
LEFT JOIN logins l
ON u.user_id = l.user_id
AND l.login_date > u.signup_date
AND l.login_date <= u.signup_date + INTERVAL '7 days'
GROUP BY u.signup_date
ORDER BY u.signup_date;

Alternative using a subquery (more explicit):

WITH retained AS (
SELECT DISTINCT u.user_id, u.signup_date
FROM users u
INNER JOIN logins l
ON u.user_id = l.user_id
AND l.login_date > u.signup_date
AND l.login_date <= u.signup_date + INTERVAL '7 days'
)
SELECT
u.signup_date,
COUNT(DISTINCT u.user_id) AS total_signups,
COUNT(DISTINCT r.user_id) AS retained_users,
ROUND(
COUNT(DISTINCT r.user_id) * 1.0 / COUNT(DISTINCT u.user_id),
3
) AS retention_rate
FROM users u
LEFT JOIN retained r
ON u.user_id = r.user_id AND u.signup_date = r.signup_date
GROUP BY u.signup_date
ORDER BY u.signup_date;

Complexity: O(n * m) in the worst case for the join, where n = users and m = logins. Indexes on user_id and login_date reduce this significantly.

Problem 8: Implement Softmax from Scratch (ML - Medium)

Category: ML Implementation | Time limit: 25 minutes | Difficulty: Medium

Implement the softmax function and its numerically stable variant. Then implement the cross-entropy loss. Do not use any ML framework - only NumPy.

Constraints:

  • Input is a 2D array of logits (batch_size, num_classes)
  • Must handle numerical overflow/underflow
  • Cross-entropy loss takes logits and integer labels
# Input: logits = np.array([[2.0, 1.0, 0.1], [1.0, 2.0, 3.0]])
# labels = np.array([0, 2])
# softmax(logits[0]) = [0.659, 0.242, 0.099]
# loss = -log(0.659) + (-log(0.950)) / 2 = (0.417 + 0.051) / 2 = 0.234
Hint 1
Naive softmax: `exp(x_i) / sum(exp(x_j))`. This overflows for large values. Fix: subtract `max(x)` from all elements before exponentiating.
Hint 2
For cross-entropy loss with integer labels, you need: `-log(softmax(logits)[i, label[i]])` averaged over the batch. Use the log-sum-exp trick for numerical stability.
Hint 3
Log-softmax is more numerically stable than taking log of softmax: `log_softmax(x_i) = x_i - max(x) - log(sum(exp(x_j - max(x))))`.
Solution
import numpy as np

def softmax(logits):
"""Numerically stable softmax.

Subtracting max prevents overflow in exp().
softmax(x) = softmax(x - c) for any constant c.

Input: (batch_size, num_classes)
Output: (batch_size, num_classes) - probabilities summing to 1
"""
# Subtract max for numerical stability (per row)
shifted = logits - np.max(logits, axis=1, keepdims=True)
exp_vals = np.exp(shifted)
return exp_vals / np.sum(exp_vals, axis=1, keepdims=True)

def log_softmax(logits):
"""Numerically stable log-softmax.

log_softmax(x_i) = x_i - log(sum(exp(x_j)))
With stability: x_i - max(x) - log(sum(exp(x_j - max(x))))
"""
max_vals = np.max(logits, axis=1, keepdims=True)
shifted = logits - max_vals
log_sum_exp = np.log(np.sum(np.exp(shifted), axis=1, keepdims=True))
return shifted - log_sum_exp

def cross_entropy_loss(logits, labels):
"""Cross-entropy loss from logits and integer labels.

Input: logits (batch_size, num_classes), labels (batch_size,)
Output: scalar loss (mean over batch)

Uses log-softmax for numerical stability.
"""
batch_size = logits.shape[0]
log_probs = log_softmax(logits)

# Select the log probability of the correct class for each sample
correct_log_probs = log_probs[np.arange(batch_size), labels]

# Negative log likelihood, averaged over batch
return -np.mean(correct_log_probs)

# Verification:
# logits = np.array([[2.0, 1.0, 0.1], [1.0, 2.0, 3.0]])
# labels = np.array([0, 2])
# probs = softmax(logits)
# print(probs) # [[0.659, 0.242, 0.099], [0.090, 0.245, 0.665]]
# print(cross_entropy_loss(logits, labels)) # ~0.417 + 0.407 / 2 ≈ some value

Complexity: O(batch_size * num_classes) time and space. The key insight tested here is numerical stability - interviewers want to see the max-subtraction trick without being prompted.

Instant Rejection

If you implement softmax as np.exp(x) / np.sum(np.exp(x)) without the max-subtraction stability trick, it is an immediate red flag for any ML role. This naive implementation produces inf for logits above ~710 (float64 exp overflow). Every production ML system uses the stabilized version. If you cannot explain why softmax(x) = softmax(x - max(x)), you will fail this question.

Problem 9: Graph - Course Prerequisites (DSA - Medium)

Category: Graph / Topological Sort | Time limit: 25 minutes | Difficulty: Medium

There are n ML courses numbered 0 to n-1. Some courses have prerequisites. Given a list of prerequisite pairs [course, prerequisite], determine if it is possible to complete all courses (detect if there is a cycle).

Constraints:

  • 1 <= n <= 2000
  • 0 <= prerequisites.length <= 5000
  • No duplicate prerequisite pairs
# Input: n = 4, prerequisites = [[1, 0], [2, 1], [3, 2]]
# Output: True (order: 0 -> 1 -> 2 -> 3)

# Input: n = 2, prerequisites = [[1, 0], [0, 1]]
# Output: False (cycle: 0 requires 1, 1 requires 0)
Hint 1
This is cycle detection in a directed graph. If the prerequisite graph has a cycle, it is impossible to complete all courses.
Hint 2
Use Kahn's algorithm (BFS-based topological sort): start with nodes that have no prerequisites (in-degree 0), process them, reduce in-degrees of their neighbors, and repeat.
Hint 3
If the topological sort processes all n nodes, there is no cycle. If it processes fewer, a cycle exists.
Solution
from collections import deque, defaultdict

def can_finish(num_courses, prerequisites):
"""Detect if all courses can be completed (no cycles).

Uses Kahn's algorithm for topological sorting.

Time: O(V + E), Space: O(V + E)
"""
# Build adjacency list and in-degree count
graph = defaultdict(list)
in_degree = [0] * num_courses

for course, prereq in prerequisites:
graph[prereq].append(course)
in_degree[course] += 1

# Start with all courses that have no prerequisites
queue = deque()
for i in range(num_courses):
if in_degree[i] == 0:
queue.append(i)

completed = 0

while queue:
course = queue.popleft()
completed += 1

for next_course in graph[course]:
in_degree[next_course] -= 1
if in_degree[next_course] == 0:
queue.append(next_course)

return completed == num_courses

def find_course_order(num_courses, prerequisites):
"""Return a valid course order, or empty list if impossible."""
graph = defaultdict(list)
in_degree = [0] * num_courses

for course, prereq in prerequisites:
graph[prereq].append(course)
in_degree[course] += 1

queue = deque(i for i in range(num_courses) if in_degree[i] == 0)
order = []

while queue:
course = queue.popleft()
order.append(course)
for next_course in graph[course]:
in_degree[next_course] -= 1
if in_degree[next_course] == 0:
queue.append(next_course)

return order if len(order) == num_courses else []

Complexity: O(V + E) time, O(V + E) space. V = number of courses, E = number of prerequisite pairs. This is optimal since we must examine every node and edge at least once.

Problem 10: Implement K-Means Clustering (ML - Medium)

Category: ML Implementation | Time limit: 25 minutes | Difficulty: Medium

Implement K-Means clustering from scratch using only NumPy. The function should take a data matrix and number of clusters, and return cluster assignments and centroids.

Constraints:

  • Data: (n_samples, n_features)
  • Use Euclidean distance
  • Run for a fixed number of iterations or until convergence
  • Initialize centroids by randomly selecting K data points
# Input: X.shape = (100, 2), k = 3
# Output: labels (100,) with values in {0, 1, 2}, centroids (3, 2)
Hint 1
K-Means alternates between two steps: (1) Assign each point to the nearest centroid. (2) Update centroids to be the mean of assigned points.
Hint 2
For efficient distance computation, use broadcasting: `distances = np.linalg.norm(X[:, None, :] - centroids[None, :, :], axis=2)`. This gives a (n_samples, k) matrix.
Hint 3
Handle empty clusters by reinitializing the centroid to a random data point.
Solution
import numpy as np

def kmeans(X, k, max_iters=100, tol=1e-6):
"""K-Means clustering implementation.

Args:
X: Data matrix (n_samples, n_features)
k: Number of clusters
max_iters: Maximum iterations
tol: Convergence tolerance for centroid movement

Returns:
labels: Cluster assignments (n_samples,)
centroids: Final centroids (k, n_features)

Time: O(max_iters * n * k * d)
Space: O(n * k) for distance matrix
"""
n_samples, n_features = X.shape

# Initialize centroids by randomly selecting k data points
indices = np.random.choice(n_samples, k, replace=False)
centroids = X[indices].copy()

for iteration in range(max_iters):
# Step 1: Assign each point to the nearest centroid
# distances[i][j] = distance from point i to centroid j
distances = np.linalg.norm(
X[:, np.newaxis, :] - centroids[np.newaxis, :, :],
axis=2
) # Shape: (n_samples, k)
labels = np.argmin(distances, axis=1)

# Step 2: Update centroids to mean of assigned points
new_centroids = np.zeros_like(centroids)
for j in range(k):
cluster_points = X[labels == j]
if len(cluster_points) > 0:
new_centroids[j] = cluster_points.mean(axis=0)
else:
# Empty cluster - reinitialize to random point
new_centroids[j] = X[np.random.randint(n_samples)]

# Check convergence
centroid_shift = np.linalg.norm(new_centroids - centroids)
centroids = new_centroids

if centroid_shift < tol:
break

return labels, centroids

def kmeans_inertia(X, labels, centroids):
"""Compute inertia (sum of squared distances to centroids).

Used for the elbow method to choose k.
"""
distances = np.linalg.norm(
X - centroids[labels], axis=1
)
return np.sum(distances ** 2)

# Usage:
# X = np.random.randn(200, 2)
# labels, centroids = kmeans(X, k=3)
# print(f"Inertia: {kmeans_inertia(X, labels, centroids):.2f}")

Complexity: O(iterations * n * k * d) time, where n = samples, k = clusters, d = features. The main optimization opportunity is using scipy.spatial.distance.cdist instead of broadcasting for large datasets.

Problem 11: SQL - Model A/B Test Analysis

Category: SQL | Time limit: 25 minutes | Difficulty: Medium

Given tables experiments(experiment_id, user_id, variant, timestamp) and conversions(user_id, conversion_timestamp, revenue), write a query to compute per-variant: total users, total conversions, conversion rate, and average revenue per converting user. Only count conversions that happened within 7 days of experiment assignment.

-- Expected output:
-- | variant | total_users | conversions | conv_rate | avg_revenue |
-- |---------|-------------|-------------|-----------|-------------|
-- | control | 5000 | 250 | 0.050 | 42.30 |
-- | treatment| 5000 | 310 | 0.062 | 45.10 |
Hint 1
LEFT JOIN experiments with conversions where the conversion happened after assignment and within 7 days. Group by variant.
Hint 2
Use COUNT(DISTINCT e.user_id) for total users, COUNT(DISTINCT c.user_id) for conversions. Conversion rate is the ratio.
Hint 3
Average revenue per converting user means AVG of revenue only for users who converted - use a conditional average or subquery.
Solution
SELECT
e.variant,
COUNT(DISTINCT e.user_id) AS total_users,
COUNT(DISTINCT c.user_id) AS conversions,
ROUND(
COUNT(DISTINCT c.user_id) * 1.0 / COUNT(DISTINCT e.user_id),
3
) AS conv_rate,
ROUND(AVG(c.revenue), 2) AS avg_revenue
FROM experiments e
LEFT JOIN conversions c
ON e.user_id = c.user_id
AND c.conversion_timestamp > e.timestamp
AND c.conversion_timestamp <= e.timestamp + INTERVAL '7 days'
GROUP BY e.variant
ORDER BY e.variant;

-- Note: avg_revenue here includes NULL for non-converting users
-- since it is a LEFT JOIN. AVG() in SQL ignores NULLs, so this
-- correctly computes average revenue per converting user.

-- If you want to be explicit:
-- AVG(CASE WHEN c.user_id IS NOT NULL THEN c.revenue END) AS avg_revenue

Complexity: Depends on indexes. With indexes on user_id and timestamp columns, this runs in O(n * log m) where n = experiment assignments and m = conversions.

Problem 12: Binary Tree Level-Order Traversal (DSA - Medium)

Category: Trees / BFS | Time limit: 20 minutes | Difficulty: Medium

Given the root of a binary tree, return the level-order traversal as a list of lists (each sublist contains the values at that level).

Constraints:

  • 0 <= number of nodes <= 2000
  • -1000 <= node.val <= 1000
# Input: Tree: 3
# / \
# 9 20
# / \
# 15 7
# Output: [[3], [9, 20], [15, 7]]

# ML context: Level-order traversal of decision trees for visualization,
# or processing hierarchical feature representations layer by layer.
Hint 1
Use BFS with a queue. Process one level at a time by tracking the current level size.
Hint 2
At each level, `level_size = len(queue)`. Pop exactly `level_size` nodes, add their children to the queue, and collect values into a level list.
Solution
from collections import deque

class TreeNode:
def __init__(self, val=0, left=None, right=None):
self.val = val
self.left = left
self.right = right

def level_order(root):
"""Level-order traversal of a binary tree.

Time: O(n), Space: O(n)
"""
if not root:
return []

result = []
queue = deque([root])

while queue:
level_size = len(queue)
level = []

for _ in range(level_size):
node = queue.popleft()
level.append(node.val)

if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)

result.append(level)

return result

def zigzag_level_order(root):
"""Zigzag (alternating direction) level-order traversal.

Even levels left-to-right, odd levels right-to-left.
"""
if not root:
return []

result = []
queue = deque([root])
left_to_right = True

while queue:
level_size = len(queue)
level = deque()

for _ in range(level_size):
node = queue.popleft()
if left_to_right:
level.append(node.val)
else:
level.appendleft(node.val)

if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)

result.append(list(level))
left_to_right = not left_to_right

return result

Complexity: O(n) time and space. Each node is visited exactly once.

Problem 13: Feature Scaling Pipeline (NumPy - Medium)

Category: NumPy / ML | Time limit: 25 minutes | Difficulty: Medium

Implement StandardScaler and MinMaxScaler from scratch using NumPy. Each should have fit, transform, and fit_transform methods. Handle edge cases: zero variance features, single-sample inputs.

# StandardScaler: z = (x - mean) / std
# MinMaxScaler: z = (x - min) / (max - min), scaled to [0, 1]
Hint 1
`fit` computes statistics (mean/std or min/max) from the training data. `transform` applies them to new data. `fit_transform` does both.
Hint 2
For zero variance features (std = 0 or max = min), the scaled value should be 0 (StandardScaler) or 0 (MinMaxScaler). Avoid division by zero.
Hint 3
Use `axis=0` to compute per-feature statistics. Store the statistics as instance attributes so they can be applied to test data.
Solution
import numpy as np

class StandardScaler:
"""Zero mean, unit variance scaling.

z = (x - mean) / std
"""
def __init__(self):
self.mean_ = None
self.std_ = None

def fit(self, X):
"""Compute mean and std from training data.

Time: O(n * d), Space: O(d)
"""
self.mean_ = np.mean(X, axis=0)
self.std_ = np.std(X, axis=0)
# Replace zero std with 1 to avoid division by zero
self.std_ = np.where(self.std_ == 0, 1.0, self.std_)
return self

def transform(self, X):
"""Apply scaling using stored mean and std.

Time: O(n * d), Space: O(n * d)
"""
return (X - self.mean_) / self.std_

def fit_transform(self, X):
return self.fit(X).transform(X)

def inverse_transform(self, X_scaled):
return X_scaled * self.std_ + self.mean_


class MinMaxScaler:
"""Scale features to [0, 1] range.

z = (x - min) / (max - min)
"""
def __init__(self, feature_range=(0, 1)):
self.feature_range = feature_range
self.min_ = None
self.scale_ = None

def fit(self, X):
"""Compute min and range from training data.

Time: O(n * d), Space: O(d)
"""
self.min_ = np.min(X, axis=0)
data_range = np.max(X, axis=0) - self.min_
# Replace zero range with 1 to avoid division by zero
self.scale_ = np.where(data_range == 0, 1.0, data_range)
return self

def transform(self, X):
"""Apply scaling using stored min and range.

Time: O(n * d), Space: O(n * d)
"""
low, high = self.feature_range
X_std = (X - self.min_) / self.scale_
return X_std * (high - low) + low

def fit_transform(self, X):
return self.fit(X).transform(X)

def inverse_transform(self, X_scaled):
low, high = self.feature_range
X_std = (X_scaled - low) / (high - low)
return X_std * self.scale_ + self.min_

# Verification:
# X_train = np.array([[1, 2], [3, 4], [5, 6]], dtype=float)
# scaler = StandardScaler()
# X_scaled = scaler.fit_transform(X_train)
# assert np.allclose(X_scaled.mean(axis=0), 0)
# assert np.allclose(X_scaled.std(axis=0), 1)
# assert np.allclose(scaler.inverse_transform(X_scaled), X_train)

Complexity: O(n * d) for both fit and transform. The key interview insight: always fit on training data, then transform both training and test data using the training statistics. Never fit on test data.

Common Trap

A common interview mistake is fitting the scaler on the entire dataset (train + test) before splitting. This causes data leakage - the model "sees" test data statistics during training. Always fit on training data only, then transform the test data with the same fitted scaler.

Problem 14: Trie-Based Autocomplete (DSA - Medium)

Category: Trie | Time limit: 25 minutes | Difficulty: Medium

Implement a trie that supports inserting words and returning the top-3 autocomplete suggestions for a given prefix, sorted by frequency.

Constraints:

  • Words are lowercase English letters only
  • Insert may be called with the same word multiple times (increasing its frequency)
# autocomplete = Autocomplete()
# autocomplete.insert("transformer", 100)
# autocomplete.insert("transfer", 50)
# autocomplete.insert("translate", 30)
# autocomplete.insert("tree", 80)
# autocomplete.search("tr") # ["transformer", "tree", "transfer"]
# autocomplete.search("tra") # ["transformer", "transfer", "translate"]
Hint 1
Build a trie where each node stores its children. When searching, navigate to the prefix node, then collect all words in the subtree.
Hint 2
Use DFS from the prefix node to find all words. Maintain a heap of size 3 to keep only the top-3 by frequency.
Hint 3
For efficiency, store the frequency at the terminal node. During DFS, collect (frequency, word) pairs and use a min-heap of size 3.
Solution
import heapq

class TrieNode:
def __init__(self):
self.children = {}
self.is_end = False
self.frequency = 0

class Autocomplete:
"""Trie-based autocomplete with frequency-ranked suggestions.

Insert: O(L) per word, L = word length
Search: O(P + N * L) where P = prefix length, N = words in subtree
Space: O(total characters across all words)
"""
def __init__(self):
self.root = TrieNode()

def insert(self, word, frequency=1):
"""Insert a word with its frequency (or increment)."""
node = self.root
for char in word:
if char not in node.children:
node.children[char] = TrieNode()
node = node.children[char]
node.is_end = True
node.frequency += frequency

def search(self, prefix, top_k=3):
"""Return top-k words matching the prefix, by frequency."""
# Navigate to prefix node
node = self.root
for char in prefix:
if char not in node.children:
return []
node = node.children[char]

# DFS to collect all words in subtree
results = []
self._dfs(node, prefix, results)

# Return top-k by frequency
results.sort(key=lambda x: -x[0])
return [word for freq, word in results[:top_k]]

def _dfs(self, node, current_word, results):
"""Collect all words in the subtree."""
if node.is_end:
results.append((node.frequency, current_word))

for char, child in node.children.items():
self._dfs(child, current_word + char, results)

Complexity: Insert is O(L). Search is O(P + N * L_avg) where N is the number of words matching the prefix. For a production system, you would precompute and cache the top-K at each prefix node.

Problem 15: SQL - Feature Store Query

Category: SQL | Time limit: 25 minutes | Difficulty: Medium

Given tables features(entity_id, feature_name, feature_value, timestamp) and prediction_requests(request_id, entity_id, request_timestamp), write a query to retrieve the most recent feature value for each feature name at the time of each prediction request (point-in-time join).

-- Features table (slowly changing features):
-- | entity_id | feature_name | feature_value | timestamp |
-- |-----------|-------------|---------------|---------------------|
-- | user_1 | age | 25 | 2024-01-01 00:00:00 |
-- | user_1 | age | 26 | 2024-06-15 00:00:00 |
-- | user_1 | score | 0.8 | 2024-03-01 00:00:00 |

-- Prediction request at 2024-04-01 for user_1 should get:
-- age = 25 (not 26, which is in the future), score = 0.8
Hint 1
This is a point-in-time join. For each prediction request, find the most recent feature value that was recorded before (or at) the request timestamp.
Hint 2
Use ROW_NUMBER() or LATERAL JOIN. Partition by (request_id, feature_name) and order by feature timestamp descending, then take the first row.
Hint 3
Join features with prediction_requests on entity_id where feature.timestamp <= request.timestamp. Then use a window function to keep only the most recent feature per (request, feature_name).
Solution
-- Approach: Window function to rank features per (request, feature)
WITH ranked_features AS (
SELECT
pr.request_id,
pr.entity_id,
pr.request_timestamp,
f.feature_name,
f.feature_value,
f.timestamp AS feature_timestamp,
ROW_NUMBER() OVER (
PARTITION BY pr.request_id, f.feature_name
ORDER BY f.timestamp DESC
) AS rn
FROM prediction_requests pr
INNER JOIN features f
ON pr.entity_id = f.entity_id
AND f.timestamp <= pr.request_timestamp
)
SELECT
request_id,
entity_id,
request_timestamp,
feature_name,
feature_value,
feature_timestamp
FROM ranked_features
WHERE rn = 1
ORDER BY request_id, feature_name;

-- To pivot features into columns (if feature names are known):
WITH ranked_features AS (
SELECT
pr.request_id,
pr.entity_id,
f.feature_name,
f.feature_value,
ROW_NUMBER() OVER (
PARTITION BY pr.request_id, f.feature_name
ORDER BY f.timestamp DESC
) AS rn
FROM prediction_requests pr
INNER JOIN features f
ON pr.entity_id = f.entity_id
AND f.timestamp <= pr.request_timestamp
)
SELECT
request_id,
entity_id,
MAX(CASE WHEN feature_name = 'age' THEN feature_value END) AS age,
MAX(CASE WHEN feature_name = 'score' THEN feature_value END) AS score
FROM ranked_features
WHERE rn = 1
GROUP BY request_id, entity_id;

Complexity: O(R * F * log F) where R = number of requests and F = number of feature records per entity. The point-in-time join pattern is fundamental to feature stores (Feast, Tecton) and is frequently asked at ML platform companies.

Company Variation

Point-in-time joins are a signature interview question at ML platform companies (Tecton, Databricks, Uber ML Platform). At product companies (Meta, Netflix), the same concept appears as "What was the user's state at the time of prediction?" Getting this wrong causes training-serving skew, which is one of the most common production ML bugs.

Problem 16: Implement Gradient Descent (ML - Medium)

Category: ML Implementation | Time limit: 25 minutes | Difficulty: Medium

Implement batch gradient descent for linear regression from scratch. Given training data X (n_samples, n_features) and y (n_samples,), find the optimal weights.

Constraints:

  • Use NumPy only - no sklearn or ML frameworks
  • Implement both the loss function and gradient computation
  • Include a bias term
# Model: y_pred = X @ w + b
# Loss: MSE = (1/n) * sum((y_pred - y)^2)
# Gradient w.r.t. w: (2/n) * X.T @ (y_pred - y)
# Gradient w.r.t. b: (2/n) * sum(y_pred - y)
Hint 1
Initialize weights to zeros or small random values. At each step, compute predictions, loss, gradients, and update weights.
Hint 2
The gradient of MSE w.r.t. weights w is: `(2/n) * X.T @ (X @ w + b - y)`. For b: `(2/n) * sum(X @ w + b - y)`.
Hint 3
A simpler approach: augment X with a column of ones to absorb the bias into the weight vector. Then you only have one gradient to compute.
Solution
import numpy as np

class LinearRegression:
"""Linear regression via batch gradient descent.

Time per iteration: O(n * d)
Space: O(d) for weights
"""
def __init__(self, learning_rate=0.01, n_iterations=1000, tol=1e-6):
self.lr = learning_rate
self.n_iterations = n_iterations
self.tol = tol
self.weights = None
self.bias = None
self.loss_history = []

def fit(self, X, y):
"""Fit linear regression using gradient descent."""
n_samples, n_features = X.shape
self.weights = np.zeros(n_features)
self.bias = 0.0
self.loss_history = []

for i in range(self.n_iterations):
# Forward pass
y_pred = X @ self.weights + self.bias

# Compute MSE loss
residuals = y_pred - y
loss = np.mean(residuals ** 2)
self.loss_history.append(loss)

# Check convergence
if i > 0 and abs(self.loss_history[-2] - loss) < self.tol:
break

# Compute gradients
dw = (2.0 / n_samples) * (X.T @ residuals)
db = (2.0 / n_samples) * np.sum(residuals)

# Update parameters
self.weights -= self.lr * dw
self.bias -= self.lr * db

return self

def predict(self, X):
return X @ self.weights + self.bias

def score(self, X, y):
"""R-squared score."""
y_pred = self.predict(X)
ss_res = np.sum((y - y_pred) ** 2)
ss_tot = np.sum((y - np.mean(y)) ** 2)
return 1 - ss_res / ss_tot

# Also implement the closed-form solution for comparison:
def linear_regression_closed_form(X, y):
"""Normal equation: w = (X^T X)^(-1) X^T y

Time: O(d^2 * n + d^3), Space: O(d^2)
Only practical when d (features) is small.
"""
# Add bias column
X_aug = np.column_stack([X, np.ones(len(X))])
# Normal equation
w = np.linalg.solve(X_aug.T @ X_aug, X_aug.T @ y)
return w[:-1], w[-1] # weights, bias

# Verification:
# X = np.random.randn(100, 3)
# true_w = np.array([2.0, -1.0, 0.5])
# y = X @ true_w + 0.5 + np.random.randn(100) * 0.1
# model = LinearRegression(learning_rate=0.01, n_iterations=5000)
# model.fit(X, y)
# print(f"Learned weights: {model.weights}") # Should be close to [2, -1, 0.5]
# print(f"Learned bias: {model.bias}") # Should be close to 0.5

Complexity: O(iterations * n * d) for gradient descent. O(n * d^2 + d^3) for the closed-form solution. Gradient descent is preferred when n and d are both large.

Problem 17: Merge Intervals for GPU Scheduling (DSA - Medium)

Category: Sorting + Intervals | Time limit: 20 minutes | Difficulty: Medium

Given a list of GPU time slot reservations (start, end), merge all overlapping reservations and return the consolidated schedule.

Constraints:

  • 1 <= intervals.length <= 10^4
  • 0 <= start_i < end_i <= 10^4
# Input: [[1, 3], [2, 6], [8, 10], [15, 18]]
# Output: [[1, 6], [8, 10], [15, 18]]
# [1,3] and [2,6] overlap -> merge to [1,6]
Hint 1
Sort intervals by start time. Then iterate: if the current interval overlaps with the last merged interval, extend the end. Otherwise, add a new interval.
Hint 2
Two intervals [a, b] and [c, d] overlap if c <= b (assuming a <= c after sorting).
Solution
def merge_intervals(intervals):
"""Merge overlapping intervals.

Time: O(n log n), Space: O(n)
"""
if not intervals:
return []

intervals.sort(key=lambda x: x[0])
merged = [intervals[0]]

for start, end in intervals[1:]:
if start <= merged[-1][1]:
# Overlap - extend the current interval
merged[-1][1] = max(merged[-1][1], end)
else:
# No overlap - start a new interval
merged.append([start, end])

return merged

def total_gpu_time(intervals):
"""Compute total GPU time after merging overlapping reservations."""
merged = merge_intervals(intervals)
return sum(end - start for start, end in merged)

Complexity: O(n log n) for sorting, O(n) for the merge pass. Total: O(n log n).

Problem 18: Implement Attention Score Computation (ML - Hard)

Category: ML Implementation | Time limit: 35 minutes | Difficulty: Hard

Implement the scaled dot-product attention mechanism from the "Attention Is All You Need" paper using only NumPy. Handle the optional causal mask.

# Attention(Q, K, V) = softmax(Q @ K^T / sqrt(d_k)) @ V
# Q: (batch, seq_len_q, d_k)
# K: (batch, seq_len_k, d_k)
# V: (batch, seq_len_k, d_v)
# Output: (batch, seq_len_q, d_v)

# With causal mask: set upper triangle of attention scores to -inf
# before softmax (prevents attending to future positions)
Hint 1
Compute `scores = Q @ K.transpose(0, 2, 1) / sqrt(d_k)`. Apply mask if needed. Apply softmax. Multiply by V.
Hint 2
For the causal mask, create an upper triangular matrix of -inf values and add it to the scores before softmax. Use `np.triu(np.ones(...), k=1) * -1e9`.
Hint 3
The softmax must be applied along the last axis (axis=-1) and must be numerically stable (subtract max before exp). The output shape should be (batch, seq_len_q, d_v).
Solution
import numpy as np

def softmax_stable(x, axis=-1):
"""Numerically stable softmax along specified axis."""
x_max = np.max(x, axis=axis, keepdims=True)
exp_x = np.exp(x - x_max)
return exp_x / np.sum(exp_x, axis=axis, keepdims=True)

def scaled_dot_product_attention(Q, K, V, mask=None):
"""Scaled dot-product attention.

Args:
Q: Query tensor (batch, seq_q, d_k)
K: Key tensor (batch, seq_k, d_k)
V: Value tensor (batch, seq_k, d_v)
mask: Optional boolean mask (seq_q, seq_k), True = mask out

Returns:
output: (batch, seq_q, d_v)
attention_weights: (batch, seq_q, seq_k)

Time: O(batch * seq_q * seq_k * d_k)
Space: O(batch * seq_q * seq_k)
"""
d_k = Q.shape[-1]

# Compute attention scores
# Q @ K^T -> (batch, seq_q, seq_k)
scores = np.matmul(Q, K.transpose(0, 2, 1)) / np.sqrt(d_k)

# Apply mask (set masked positions to large negative value)
if mask is not None:
scores = np.where(mask, -1e9, scores)

# Softmax over key dimension
attention_weights = softmax_stable(scores, axis=-1)

# Weighted sum of values
output = np.matmul(attention_weights, V)

return output, attention_weights

def causal_mask(seq_len):
"""Create a causal (autoregressive) mask.

Upper triangle is True (masked), diagonal and below is False.
Prevents attending to future positions.
"""
return np.triu(np.ones((seq_len, seq_len), dtype=bool), k=1)

def multi_head_attention(Q, K, V, W_q, W_k, W_v, W_o, n_heads, mask=None):
"""Multi-head attention.

Args:
Q, K, V: (batch, seq_len, d_model)
W_q, W_k, W_v: (d_model, d_model) projection weights
W_o: (d_model, d_model) output projection
n_heads: number of attention heads

Returns:
output: (batch, seq_len, d_model)
"""
batch, seq_len, d_model = Q.shape
d_k = d_model // n_heads

# Linear projections
Q_proj = Q @ W_q # (batch, seq_q, d_model)
K_proj = K @ W_k
V_proj = V @ W_v

# Split into heads: (batch, n_heads, seq_len, d_k)
Q_heads = Q_proj.reshape(batch, seq_len, n_heads, d_k).transpose(0, 2, 1, 3)
K_heads = K_proj.reshape(batch, seq_len, n_heads, d_k).transpose(0, 2, 1, 3)
V_heads = V_proj.reshape(batch, seq_len, n_heads, d_k).transpose(0, 2, 1, 3)

# Reshape for batch attention: (batch * n_heads, seq_len, d_k)
Q_flat = Q_heads.reshape(batch * n_heads, seq_len, d_k)
K_flat = K_heads.reshape(batch * n_heads, seq_len, d_k)
V_flat = V_heads.reshape(batch * n_heads, seq_len, d_k)

# Apply attention
attn_output, _ = scaled_dot_product_attention(Q_flat, K_flat, V_flat, mask)

# Reshape back: (batch, n_heads, seq_len, d_k) -> (batch, seq_len, d_model)
attn_output = attn_output.reshape(batch, n_heads, seq_len, d_k)
attn_output = attn_output.transpose(0, 2, 1, 3).reshape(batch, seq_len, d_model)

# Output projection
output = attn_output @ W_o

return output

# Verification:
# batch, seq_len, d_model = 2, 10, 64
# Q = K = V = np.random.randn(batch, seq_len, d_model)
# mask = causal_mask(seq_len)
# output, weights = scaled_dot_product_attention(Q, K, V, mask)
# assert output.shape == (batch, seq_len, d_model)
# assert weights.shape == (batch, seq_len, seq_len)
# assert np.allclose(weights.sum(axis=-1), 1.0) # Rows sum to 1
# Check causal: weights[:, 0, 1:] should be ~0 (first token doesn't attend to future)

Complexity: O(batch * n^2 * d) where n = sequence length, d = dimension. This quadratic cost in sequence length is why efficient attention variants (FlashAttention, linear attention) exist.

Problem 19: Sliding Window Maximum for Model Monitoring (DSA - Hard)

Category: Monotonic Deque | Time limit: 30 minutes | Difficulty: Hard

Given an array of model latency measurements and a window size K, return the maximum latency in each window. Solve it in O(n) time.

Constraints:

  • 1 <= len(latencies) <= 10^5
  • 1 <= k <= len(latencies)
# Input: latencies = [1, 3, -1, -3, 5, 3, 6, 7], k = 3
# Output: [3, 3, 5, 5, 6, 7]
# Windows: [1,3,-1] [3,-1,-3] [-1,-3,5] [-3,5,3] [5,3,6] [3,6,7]
# Max: [3, 3, 5, 5, 6, 7]
Hint 1
A brute force approach checks all K elements per window - O(nK). A heap can do O(n log n). But a monotonic deque achieves O(n).
Hint 2
Maintain a deque of indices in decreasing order of their values. The front of the deque is always the maximum of the current window. When a new element arrives: (1) remove from front if outside window, (2) remove from back if smaller than new element, (3) add new element.
Hint 3
Each element is pushed and popped at most once across the entire algorithm, giving O(n) amortized time.
Solution
from collections import deque

def sliding_window_max(arr, k):
"""Maximum in each sliding window of size k.

Uses a monotonic decreasing deque.

Time: O(n) amortized - each element pushed/popped at most once
Space: O(k) for the deque
"""
if not arr or k == 0:
return []

dq = deque() # Stores indices, values in decreasing order
result = []

for i in range(len(arr)):
# Remove indices outside the window
while dq and dq[0] < i - k + 1:
dq.popleft()

# Remove indices whose values are less than arr[i]
# They can never be the max while arr[i] is in the window
while dq and arr[dq[-1]] <= arr[i]:
dq.pop()

dq.append(i)

# Window is complete starting at index k-1
if i >= k - 1:
result.append(arr[dq[0]])

return result

# Extension: sliding window min (change <= to >=)
def sliding_window_min(arr, k):
"""Minimum in each sliding window of size k."""
dq = deque()
result = []

for i in range(len(arr)):
while dq and dq[0] < i - k + 1:
dq.popleft()
while dq and arr[dq[-1]] >= arr[i]:
dq.pop()
dq.append(i)
if i >= k - 1:
result.append(arr[dq[0]])

return result

Complexity: O(n) time, O(k) space. The amortized argument: each of the n elements is pushed onto the deque exactly once and popped at most once, so total operations across all iterations is at most 2n.

Instant Rejection

If you solve sliding window maximum with a nested loop - O(nk) - and cannot optimize to O(n) when prompted, this is a strong negative signal for ML infrastructure roles. The monotonic deque is a fundamental technique for streaming data problems, which are central to production ML monitoring. You must be able to explain why the deque approach is O(n) amortized, not just memorize the code.

Problem 20: Design a Mini Batch Data Loader (ML + DSA - Hard)

Category: ML System Design + Implementation | Time limit: 35 minutes | Difficulty: Hard

Implement a DataLoader class that takes a dataset and yields shuffled mini-batches. Support features: shuffling, dropping the last incomplete batch, and a simple collate function.

Constraints:

  • Dataset is a list of (features, label) tuples
  • Batches should be returned as NumPy arrays
  • Shuffling should be reproducible with a seed
  • Each epoch should see all data exactly once (when not dropping last batch)
# dataset = [(np.array([1, 2]), 0), (np.array([3, 4]), 1), ...]
# loader = DataLoader(dataset, batch_size=32, shuffle=True, drop_last=False)
# for features, labels in loader:
# # features.shape = (32, 2) or smaller for last batch
# # labels.shape = (32,) or smaller for last batch
# pass
Hint 1
Create an index array [0, 1, ..., n-1]. If shuffling, shuffle the indices. Then yield slices of size batch_size.
Hint 2
The collate function combines a list of (features, label) tuples into a batch: stack features into an array and labels into an array.
Hint 3
Implement `__iter__` to make the DataLoader iterable with `for batch in loader`. Use a generator (yield) for memory efficiency.
Solution
import numpy as np

class DataLoader:
"""Mini-batch data loader with shuffling.

Mimics PyTorch's DataLoader API for tabular / array data.

Time per epoch: O(n) for shuffling + O(n) for iteration
Space: O(batch_size) per batch + O(n) for indices
"""
def __init__(self, dataset, batch_size=32, shuffle=False,
drop_last=False, seed=None):
"""
Args:
dataset: List of (features, label) tuples
batch_size: Number of samples per batch
shuffle: Whether to shuffle at each epoch
drop_last: Drop the last batch if it is smaller than batch_size
seed: Random seed for reproducibility
"""
self.dataset = dataset
self.batch_size = batch_size
self.shuffle = shuffle
self.drop_last = drop_last
self.rng = np.random.RandomState(seed)

def __len__(self):
"""Number of batches per epoch."""
n = len(self.dataset)
if self.drop_last:
return n // self.batch_size
return (n + self.batch_size - 1) // self.batch_size

def __iter__(self):
"""Yield batches of (features_array, labels_array)."""
n = len(self.dataset)
indices = np.arange(n)

if self.shuffle:
self.rng.shuffle(indices)

for start in range(0, n, self.batch_size):
end = min(start + self.batch_size, n)

# Drop last incomplete batch if requested
if self.drop_last and (end - start) < self.batch_size:
break

batch_indices = indices[start:end]
batch = [self.dataset[i] for i in batch_indices]

# Collate: separate features and labels, stack into arrays
features = np.stack([item[0] for item in batch])
labels = np.array([item[1] for item in batch])

yield features, labels

def on_epoch_end(self):
"""Called at end of epoch - useful for custom logic."""
pass

# Advanced: DataLoader with multiple workers (simplified)
class PrefetchDataLoader:
"""DataLoader with prefetching for overlapping data loading and compute.

In production, this would use multiprocessing. Here we simulate
with a simple buffer.
"""
def __init__(self, dataset, batch_size=32, shuffle=False,
prefetch_factor=2, seed=None):
self.base_loader = DataLoader(dataset, batch_size, shuffle, seed=seed)
self.prefetch_factor = prefetch_factor

def __iter__(self):
"""Prefetch batches into a buffer."""
buffer = []
iterator = iter(self.base_loader)

# Fill initial buffer
for _ in range(self.prefetch_factor):
try:
buffer.append(next(iterator))
except StopIteration:
break

while buffer:
batch = buffer.pop(0)
# Refill buffer
try:
buffer.append(next(iterator))
except StopIteration:
pass
yield batch

# Verification:
# dataset = [(np.array([i, i+1], dtype=float), i % 3) for i in range(100)]
# loader = DataLoader(dataset, batch_size=32, shuffle=True, seed=42)
#
# all_indices = []
# for features, labels in loader:
# assert features.shape[1] == 2
# all_indices.extend(features[:, 0].astype(int).tolist())
#
# assert len(all_indices) == 100 # All samples seen
# assert sorted(all_indices) == list(range(100)) # No duplicates

Complexity: O(n) per epoch for shuffling and iteration. O(batch_size) memory per batch. The key design insight: shuffling indices rather than data avoids copying the entire dataset.

Spaced Repetition Checkpoints

DayReview ActivityTime
Day 0Solve Problems 1, 8, and 12 without hints. Time yourself.45 min
Day 3Solve Problems 5 (LRU Cache) and 9 (Topological Sort) from scratch. These are the most commonly asked.40 min
Day 7Solve the NumPy problems (2, 6, 13) and one SQL problem (7 or 11). Focus on vectorized thinking.40 min
Day 14Do a full mock: set a 60-minute timer and solve Problems 3, 10, 16, and 17 (one from each category).60 min
Day 21Implement attention (Problem 18) and the DataLoader (Problem 20) from memory. These test deep ML understanding.50 min
Day 28Final mock: pick 4 problems you found hardest. Solve under time pressure. Review any patterns you missed.60 min

Interview Cheat Sheet

Problem TypePattern to RecognizeKey Data StructureTime Target
Two Sum variants"Find pair with property X"Hash Map10 min
Top-K / Frequency"Most common / highest K"Heap + Hash Map15 min
Sliding Window"Window of size K", "subarray"Deque (monotonic)15 min
Cache Design"LRU / LFU / TTL cache"Hash Map + Linked List20 min
Graph / Dependencies"Prerequisites", "order"Adjacency List + BFS20 min
Tree Traversal"Level by level", "zigzag"Queue (BFS)15 min
Interval Problems"Merge / schedule overlapping"Sort + Scan15 min
Binary Search"Sorted", "minimum that satisfies"Two pointers15 min
Trie"Prefix search", "autocomplete"Trie nodes20 min
NumPy Vectorization"Without loops", "batch"Broadcasting, einsum15 min
SQL Joins/Windows"Per user", "rolling", "retention"JOIN + Window functions20 min
ML From Scratch"Implement softmax / attention / KMeans"NumPy arrays25 min

Next Steps

You have worked through 20 problems spanning all major categories in AI/ML coding interviews. Your next steps:

  1. Track your times. If any Medium problem takes you more than 25 minutes, revisit the underlying pattern chapter (Arrays, Trees, DP, etc.).
  2. Practice under pressure. Do timed mock interviews with a partner or use platforms like Interviewing.io.
  3. Review the pattern chapters. Go back to DSA Foundations through Complexity Analysis for any patterns where you feel weak.
  4. Build muscle memory. The Spaced Repetition Checkpoints above are designed to move these solutions from short-term to long-term memory. Follow the schedule.
© 2026 EngineersOfAI. All rights reserved.