Mock Coding Problems
20 full mock coding problems simulating real AI interview questions - a mix of DSA, NumPy/Pandas, SQL, and ML implementation. Each with problem statement, constraints, hints, full solution, and complexity analysis.
Reading time: ~60 min | Interview relevance: Critical | Roles: MLE, AI Engineer, Data Scientist, Research Engineer
The Real Interview Moment
You are in the final coding round at a top AI company. You have sixty minutes and four problems. The interviewer says: "These problems are designed to test the range of skills we need on the team. You will see a data structures problem, a NumPy problem, a SQL problem, and an ML implementation problem. Prioritize correctness over speed - a working solution with clear explanations beats a fast but buggy one."
You take a breath. You know the drill: read the problem carefully, clarify constraints, think before coding, and talk through your approach. You have practiced exactly this scenario dozens of times.
This chapter is your practice arena. Work through these problems under timed conditions. For each one, try to solve it independently before reading the hints or solution. Track your time - if a Medium takes you more than 25 minutes, revisit the underlying pattern.
How to Use This Chapter
Interview pacing strategy: In a 4-problem, 60-minute interview, spend at most 15 minutes per problem. If you are stuck after 5 minutes of coding, step back and re-examine your approach. A clear explanation of a brute-force solution is better than a confused attempt at an optimal one. Always start by stating your approach and time complexity before writing code.
Problem 1: Two Sum with Model Predictions (DSA - Easy)
Category: Hash Map | Time limit: 15 minutes | Difficulty: Easy
You have an array of model prediction scores and a target sum. Find two predictions whose scores sum to exactly the target. Return their indices.
Constraints:
- 2 <= len(predictions) <= 10^5
- -10^9 <= predictions[i] <= 10^9
- Exactly one solution exists
- You may not use the same element twice
# Input: predictions = [0.2, 0.7, 0.45, 0.3], target = 1.0
# Output: [1, 2] (0.7 + 0.3... wait, 0.7 + 0.45 = 1.15, 0.2 + 0.3 = 0.5)
# Let's use integers for clarity:
# Input: predictions = [2, 7, 11, 15], target = 9
# Output: [0, 1] (2 + 7 = 9)
Hint 1
Hint 2
Solution
def two_sum(nums, target):
"""Find two indices whose values sum to target.
Time: O(n), Space: O(n)
"""
seen = {} # value -> index
for i, num in enumerate(nums):
complement = target - num
if complement in seen:
return [seen[complement], i]
seen[num] = i
return []
Complexity: O(n) time (single pass), O(n) space (hash map). The brute force O(n^2) approach checks all pairs - the hash map reduces lookup from O(n) to O(1).
Problem 2: Batch Matrix Multiplication Check (NumPy - Easy)
Category: NumPy | Time limit: 15 minutes | Difficulty: Easy
Given two 3D arrays A of shape (batch, m, k) and B of shape (batch, k, n), compute the batch matrix multiplication and return the result of shape (batch, m, n). Also verify that the result matches a naive implementation.
Constraints:
- batch <= 1000
- m, k, n <= 100
- Use NumPy operations only (no explicit loops for the matrix multiplication)
# Input: A.shape = (32, 4, 8), B.shape = (32, 8, 3)
# Output: C.shape = (32, 4, 3)
# C[i] = A[i] @ B[i] for each i in batch
Hint 1
Hint 2
Solution
import numpy as np
def batch_matmul(A, B):
"""Batch matrix multiplication using NumPy.
Time: O(batch * m * k * n) - same as naive, but parallelized
Space: O(batch * m * n)
"""
# Method 1: Direct - handles batch dimensions automatically
C = A @ B # or np.matmul(A, B)
return C
def verify_batch_matmul(A, B, C):
"""Verify batch matmul result against a naive loop.
Returns True if results match within tolerance.
"""
batch = A.shape[0]
for i in range(batch):
expected = A[i] @ B[i]
if not np.allclose(C[i], expected, atol=1e-6):
return False
return True
def batch_matmul_einsum(A, B):
"""Alternative using Einstein summation - more explicit about indices.
'bij,bjk->bik' means:
- b = batch dimension (shared)
- j = contraction dimension (summed over)
- i, k = output dimensions
"""
return np.einsum('bij,bjk->bik', A, B)
# Example usage:
# A = np.random.randn(32, 4, 8)
# B = np.random.randn(32, 8, 3)
# C = batch_matmul(A, B)
# assert C.shape == (32, 4, 3)
# assert verify_batch_matmul(A, B, C)
Complexity: O(batch * m * k * n) arithmetic operations. NumPy executes this in optimized C/BLAS, making it orders of magnitude faster than a Python loop.
Problem 3: Top-K Frequent Words in Documents (DSA - Medium)
Category: Hash Map + Heap | Time limit: 25 minutes | Difficulty: Medium
Given a list of words from a document corpus, return the K most frequent words sorted by frequency (descending). If two words have the same frequency, sort them alphabetically.
Constraints:
- 1 <= len(words) <= 10^6
- 1 <= k <= number of unique words
- Words are lowercase alphabetic strings
# Input: words = ["neural", "network", "deep", "neural", "deep", "deep",
# "learning", "network", "neural"], k = 2
# Output: ["deep", "neural"]
# Frequencies: deep=3, neural=3, network=2, learning=1
# Tie broken alphabetically: "deep" before "neural"
Hint 1
Hint 2
Hint 3
Solution
from collections import Counter
import heapq
def top_k_frequent_words(words, k):
"""Find k most frequent words with alphabetical tie-breaking.
Time: O(n + m log k) where m = unique words
Space: O(m)
"""
counts = Counter(words)
# Sort by (-frequency, word) - nsmallest gives us the k smallest,
# which are the k highest frequencies (due to negation)
# and alphabetically first for ties
return heapq.nsmallest(k, counts.keys(),
key=lambda w: (-counts[w], w))
# Alternative: bucket sort approach for O(n) when k is large
def top_k_frequent_bucket(words, k):
"""O(n) approach using bucket sort.
Create buckets indexed by frequency. Each bucket contains words
with that frequency. Iterate from highest frequency down.
"""
counts = Counter(words)
max_freq = max(counts.values())
# Buckets: index = frequency, value = list of words
buckets = [[] for _ in range(max_freq + 1)]
for word, freq in counts.items():
buckets[freq].append(word)
result = []
for freq in range(max_freq, 0, -1):
bucket = sorted(buckets[freq]) # Alphabetical within same freq
for word in bucket:
result.append(word)
if len(result) == k:
return result
return result
Complexity: Heap approach: O(n + m log k). Bucket sort approach: O(n) amortized. The bucket approach is better when k is close to m.
Problem 4: Rolling Window Feature Engineering (Pandas - Medium)
Category: Pandas | Time limit: 25 minutes | Difficulty: Medium
Given a DataFrame with columns timestamp, user_id, and purchase_amount, compute for each row:
- The user's rolling 7-day total purchase amount (including the current row)
- The user's rolling 30-day average purchase amount
- The number of purchases the user made in the last 7 days
# Input DataFrame:
# | timestamp | user_id | purchase_amount |
# |------------|---------|-----------------|
# | 2024-01-01 | A | 50 |
# | 2024-01-03 | A | 30 |
# | 2024-01-05 | B | 100 |
# | 2024-01-06 | A | 20 |
# | 2024-01-08 | A | 40 |
# Output (additional columns for user A, row 2024-01-08):
# rolling_7d_sum = 20 + 40 = 60 (Jan 6 and Jan 8, within 7 days of Jan 8)
# Actually: 30 + 20 + 40 = 90 (Jan 3, 6, 8 all within 7 days)
# rolling_30d_avg = (50 + 30 + 20 + 40) / 4 = 35.0
# rolling_7d_count = 3
Hint 1
Hint 2
Hint 3
Solution
import pandas as pd
def compute_rolling_features(df):
"""Compute rolling window features per user.
Time: O(n log n) for sorting + O(n * window) for rolling
Space: O(n)
"""
df = df.copy()
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values(['user_id', 'timestamp'])
# Set timestamp as index for time-based rolling
df = df.set_index('timestamp')
# Compute rolling features per user
grouped = df.groupby('user_id')['purchase_amount']
df['rolling_7d_sum'] = grouped.transform(
lambda x: x.rolling('7D').sum()
)
df['rolling_30d_avg'] = grouped.transform(
lambda x: x.rolling('30D').mean()
)
df['rolling_7d_count'] = grouped.transform(
lambda x: x.rolling('7D').count()
)
return df.reset_index()
# Alternative using explicit merge for complex window logic:
def compute_rolling_features_manual(df):
"""Manual approach without time-based rolling - more control.
For each row, filter to same user + within time window + before/equal
current timestamp, then aggregate.
"""
df = df.copy()
df['timestamp'] = pd.to_datetime(df['timestamp'])
results = []
for idx, row in df.iterrows():
user_mask = df['user_id'] == row['user_id']
time_mask_7d = (
(df['timestamp'] >= row['timestamp'] - pd.Timedelta(days=6)) &
(df['timestamp'] <= row['timestamp'])
)
time_mask_30d = (
(df['timestamp'] >= row['timestamp'] - pd.Timedelta(days=29)) &
(df['timestamp'] <= row['timestamp'])
)
window_7d = df[user_mask & time_mask_7d]['purchase_amount']
window_30d = df[user_mask & time_mask_30d]['purchase_amount']
results.append({
'rolling_7d_sum': window_7d.sum(),
'rolling_30d_avg': window_30d.mean(),
'rolling_7d_count': len(window_7d),
})
return pd.concat([df, pd.DataFrame(results, index=df.index)], axis=1)
Complexity: Pandas rolling approach: O(n log n) for sorting, O(n) for rolling (amortized). Manual approach: O(n^2) - only use for verification or small data. In interviews, always mention you would use the Pandas rolling approach in production.
Problem 5: LRU Cache for Model Predictions (DSA - Medium)
Category: Hash Map + Doubly Linked List | Time limit: 25 minutes | Difficulty: Medium
Design a Least Recently Used (LRU) cache for storing model prediction results. The cache has a fixed capacity. When the cache is full and a new entry is added, evict the least recently used entry. Both get and put must be O(1).
Constraints:
- 1 <= capacity <= 3000
- 0 <= key <= 10^4
- 0 <= value <= 10^5
- At most 2 * 10^5 calls to get and put
# ML context: Caching inference results for repeated inputs.
# If the same input has been seen recently, return the cached prediction
# instead of running the model again.
# cache = LRUCache(2)
# cache.put(1, 0.95) # cache: {1: 0.95}
# cache.put(2, 0.87) # cache: {1: 0.95, 2: 0.87}
# cache.get(1) # returns 0.95, cache: {2: 0.87, 1: 0.95}
# cache.put(3, 0.92) # evicts key 2, cache: {1: 0.95, 3: 0.92}
# cache.get(2) # returns -1 (not found)
Hint 1
Hint 2
Hint 3
Solution
class ListNode:
def __init__(self, key=0, val=0):
self.key = key
self.val = val
self.prev = None
self.next = None
class LRUCache:
"""LRU Cache with O(1) get and put.
Uses a hash map + doubly linked list.
Time: O(1) for both get and put
Space: O(capacity)
"""
def __init__(self, capacity):
self.capacity = capacity
self.cache = {} # key -> ListNode
# Dummy head and tail to simplify edge cases
self.head = ListNode()
self.tail = ListNode()
self.head.next = self.tail
self.tail.prev = self.head
def _remove(self, node):
"""Remove a node from the doubly linked list."""
node.prev.next = node.next
node.next.prev = node.prev
def _add_to_tail(self, node):
"""Add a node right before the tail (most recently used)."""
node.prev = self.tail.prev
node.next = self.tail
self.tail.prev.next = node
self.tail.prev = node
def get(self, key):
if key not in self.cache:
return -1
node = self.cache[key]
self._remove(node)
self._add_to_tail(node)
return node.val
def put(self, key, value):
if key in self.cache:
node = self.cache[key]
node.val = value
self._remove(node)
self._add_to_tail(node)
else:
if len(self.cache) >= self.capacity:
# Evict LRU (head.next)
lru = self.head.next
self._remove(lru)
del self.cache[lru.key]
new_node = ListNode(key, value)
self.cache[key] = new_node
self._add_to_tail(new_node)
Complexity: O(1) for both get and put. Space is O(capacity). Python's collections.OrderedDict provides this functionality built-in, but interviewers want to see the manual implementation.
Do not use Python's OrderedDict as your primary solution unless the interviewer explicitly allows it. The point of this problem is to test your understanding of hash maps and linked lists working together. Mention OrderedDict as a follow-up: "In production, I would use OrderedDict.move_to_end() and popitem(last=False)."
Problem 6: Cosine Similarity Matrix (NumPy - Medium)
Category: NumPy | Time limit: 25 minutes | Difficulty: Medium
Given a matrix of document embeddings (shape: n_docs x embed_dim), compute the pairwise cosine similarity matrix without using explicit Python loops.
Constraints:
- 1 <= n_docs <= 10000
- 1 <= embed_dim <= 768
- Output shape: (n_docs, n_docs)
- Output[i][j] = cosine similarity between document i and document j
# Cosine similarity: cos(A, B) = (A . B) / (||A|| * ||B||)
# Input: embeddings.shape = (1000, 768)
# Output: similarity_matrix.shape = (1000, 1000)
Hint 1
Hint 2
Hint 3
Solution
import numpy as np
def cosine_similarity_matrix(embeddings):
"""Compute pairwise cosine similarity matrix.
Time: O(n^2 * d) for the matrix multiply
Space: O(n^2) for the output + O(n * d) for normalized copy
"""
# Compute L2 norms
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
norms = np.maximum(norms, 1e-8) # Avoid division by zero
# Normalize to unit vectors
normalized = embeddings / norms
# Cosine similarity = dot product of unit vectors
similarity = normalized @ normalized.T
# Clip to [-1, 1] to handle floating point errors
return np.clip(similarity, -1.0, 1.0)
def top_k_similar(embeddings, query_idx, k):
"""Find the k most similar documents to a query document.
More efficient than computing the full matrix when you only
need similarities for one query.
Time: O(n * d), Space: O(n)
"""
query = embeddings[query_idx]
query_norm = np.linalg.norm(query)
if query_norm < 1e-8:
return []
# Compute similarities with all documents
norms = np.linalg.norm(embeddings, axis=1)
norms = np.maximum(norms, 1e-8)
similarities = (embeddings @ query) / (norms * query_norm)
# Get top-k indices (excluding the query itself)
similarities[query_idx] = -np.inf
top_indices = np.argpartition(similarities, -k)[-k:]
top_indices = top_indices[np.argsort(-similarities[top_indices])]
return list(zip(top_indices, similarities[top_indices]))
Complexity: Full matrix: O(n^2 * d). Single query top-K: O(n * d + n log K) using argpartition. The key NumPy insight is that normalizing first converts cosine similarity to a single matrix multiplication.
Problem 7: SQL - User Retention Analysis
Category: SQL | Time limit: 25 minutes | Difficulty: Medium
Given tables users(user_id, signup_date) and logins(user_id, login_date), write a SQL query to compute the 7-day retention rate: the percentage of users who signed up on each date and logged in again within 7 days of signup (not counting the signup day itself).
-- Expected output:
-- | signup_date | total_signups | retained_users | retention_rate |
-- |-------------|---------------|----------------|----------------|
-- | 2024-01-01 | 100 | 42 | 0.42 |
-- | 2024-01-02 | 85 | 38 | 0.447 |
Hint 1
Hint 2
Hint 3
Solution
SELECT
u.signup_date,
COUNT(DISTINCT u.user_id) AS total_signups,
COUNT(DISTINCT l.user_id) AS retained_users,
ROUND(
COUNT(DISTINCT l.user_id) * 1.0 / COUNT(DISTINCT u.user_id),
3
) AS retention_rate
FROM users u
LEFT JOIN logins l
ON u.user_id = l.user_id
AND l.login_date > u.signup_date
AND l.login_date <= u.signup_date + INTERVAL '7 days'
GROUP BY u.signup_date
ORDER BY u.signup_date;
Alternative using a subquery (more explicit):
WITH retained AS (
SELECT DISTINCT u.user_id, u.signup_date
FROM users u
INNER JOIN logins l
ON u.user_id = l.user_id
AND l.login_date > u.signup_date
AND l.login_date <= u.signup_date + INTERVAL '7 days'
)
SELECT
u.signup_date,
COUNT(DISTINCT u.user_id) AS total_signups,
COUNT(DISTINCT r.user_id) AS retained_users,
ROUND(
COUNT(DISTINCT r.user_id) * 1.0 / COUNT(DISTINCT u.user_id),
3
) AS retention_rate
FROM users u
LEFT JOIN retained r
ON u.user_id = r.user_id AND u.signup_date = r.signup_date
GROUP BY u.signup_date
ORDER BY u.signup_date;
Complexity: O(n * m) in the worst case for the join, where n = users and m = logins. Indexes on user_id and login_date reduce this significantly.
Problem 8: Implement Softmax from Scratch (ML - Medium)
Category: ML Implementation | Time limit: 25 minutes | Difficulty: Medium
Implement the softmax function and its numerically stable variant. Then implement the cross-entropy loss. Do not use any ML framework - only NumPy.
Constraints:
- Input is a 2D array of logits (batch_size, num_classes)
- Must handle numerical overflow/underflow
- Cross-entropy loss takes logits and integer labels
# Input: logits = np.array([[2.0, 1.0, 0.1], [1.0, 2.0, 3.0]])
# labels = np.array([0, 2])
# softmax(logits[0]) = [0.659, 0.242, 0.099]
# loss = -log(0.659) + (-log(0.950)) / 2 = (0.417 + 0.051) / 2 = 0.234
Hint 1
Hint 2
Hint 3
Solution
import numpy as np
def softmax(logits):
"""Numerically stable softmax.
Subtracting max prevents overflow in exp().
softmax(x) = softmax(x - c) for any constant c.
Input: (batch_size, num_classes)
Output: (batch_size, num_classes) - probabilities summing to 1
"""
# Subtract max for numerical stability (per row)
shifted = logits - np.max(logits, axis=1, keepdims=True)
exp_vals = np.exp(shifted)
return exp_vals / np.sum(exp_vals, axis=1, keepdims=True)
def log_softmax(logits):
"""Numerically stable log-softmax.
log_softmax(x_i) = x_i - log(sum(exp(x_j)))
With stability: x_i - max(x) - log(sum(exp(x_j - max(x))))
"""
max_vals = np.max(logits, axis=1, keepdims=True)
shifted = logits - max_vals
log_sum_exp = np.log(np.sum(np.exp(shifted), axis=1, keepdims=True))
return shifted - log_sum_exp
def cross_entropy_loss(logits, labels):
"""Cross-entropy loss from logits and integer labels.
Input: logits (batch_size, num_classes), labels (batch_size,)
Output: scalar loss (mean over batch)
Uses log-softmax for numerical stability.
"""
batch_size = logits.shape[0]
log_probs = log_softmax(logits)
# Select the log probability of the correct class for each sample
correct_log_probs = log_probs[np.arange(batch_size), labels]
# Negative log likelihood, averaged over batch
return -np.mean(correct_log_probs)
# Verification:
# logits = np.array([[2.0, 1.0, 0.1], [1.0, 2.0, 3.0]])
# labels = np.array([0, 2])
# probs = softmax(logits)
# print(probs) # [[0.659, 0.242, 0.099], [0.090, 0.245, 0.665]]
# print(cross_entropy_loss(logits, labels)) # ~0.417 + 0.407 / 2 ≈ some value
Complexity: O(batch_size * num_classes) time and space. The key insight tested here is numerical stability - interviewers want to see the max-subtraction trick without being prompted.
If you implement softmax as np.exp(x) / np.sum(np.exp(x)) without the max-subtraction stability trick, it is an immediate red flag for any ML role. This naive implementation produces inf for logits above ~710 (float64 exp overflow). Every production ML system uses the stabilized version. If you cannot explain why softmax(x) = softmax(x - max(x)), you will fail this question.
Problem 9: Graph - Course Prerequisites (DSA - Medium)
Category: Graph / Topological Sort | Time limit: 25 minutes | Difficulty: Medium
There are n ML courses numbered 0 to n-1. Some courses have prerequisites. Given a list of prerequisite pairs [course, prerequisite], determine if it is possible to complete all courses (detect if there is a cycle).
Constraints:
- 1 <= n <= 2000
- 0 <= prerequisites.length <= 5000
- No duplicate prerequisite pairs
# Input: n = 4, prerequisites = [[1, 0], [2, 1], [3, 2]]
# Output: True (order: 0 -> 1 -> 2 -> 3)
# Input: n = 2, prerequisites = [[1, 0], [0, 1]]
# Output: False (cycle: 0 requires 1, 1 requires 0)
Hint 1
Hint 2
Hint 3
Solution
from collections import deque, defaultdict
def can_finish(num_courses, prerequisites):
"""Detect if all courses can be completed (no cycles).
Uses Kahn's algorithm for topological sorting.
Time: O(V + E), Space: O(V + E)
"""
# Build adjacency list and in-degree count
graph = defaultdict(list)
in_degree = [0] * num_courses
for course, prereq in prerequisites:
graph[prereq].append(course)
in_degree[course] += 1
# Start with all courses that have no prerequisites
queue = deque()
for i in range(num_courses):
if in_degree[i] == 0:
queue.append(i)
completed = 0
while queue:
course = queue.popleft()
completed += 1
for next_course in graph[course]:
in_degree[next_course] -= 1
if in_degree[next_course] == 0:
queue.append(next_course)
return completed == num_courses
def find_course_order(num_courses, prerequisites):
"""Return a valid course order, or empty list if impossible."""
graph = defaultdict(list)
in_degree = [0] * num_courses
for course, prereq in prerequisites:
graph[prereq].append(course)
in_degree[course] += 1
queue = deque(i for i in range(num_courses) if in_degree[i] == 0)
order = []
while queue:
course = queue.popleft()
order.append(course)
for next_course in graph[course]:
in_degree[next_course] -= 1
if in_degree[next_course] == 0:
queue.append(next_course)
return order if len(order) == num_courses else []
Complexity: O(V + E) time, O(V + E) space. V = number of courses, E = number of prerequisite pairs. This is optimal since we must examine every node and edge at least once.
Problem 10: Implement K-Means Clustering (ML - Medium)
Category: ML Implementation | Time limit: 25 minutes | Difficulty: Medium
Implement K-Means clustering from scratch using only NumPy. The function should take a data matrix and number of clusters, and return cluster assignments and centroids.
Constraints:
- Data: (n_samples, n_features)
- Use Euclidean distance
- Run for a fixed number of iterations or until convergence
- Initialize centroids by randomly selecting K data points
# Input: X.shape = (100, 2), k = 3
# Output: labels (100,) with values in {0, 1, 2}, centroids (3, 2)
Hint 1
Hint 2
Hint 3
Solution
import numpy as np
def kmeans(X, k, max_iters=100, tol=1e-6):
"""K-Means clustering implementation.
Args:
X: Data matrix (n_samples, n_features)
k: Number of clusters
max_iters: Maximum iterations
tol: Convergence tolerance for centroid movement
Returns:
labels: Cluster assignments (n_samples,)
centroids: Final centroids (k, n_features)
Time: O(max_iters * n * k * d)
Space: O(n * k) for distance matrix
"""
n_samples, n_features = X.shape
# Initialize centroids by randomly selecting k data points
indices = np.random.choice(n_samples, k, replace=False)
centroids = X[indices].copy()
for iteration in range(max_iters):
# Step 1: Assign each point to the nearest centroid
# distances[i][j] = distance from point i to centroid j
distances = np.linalg.norm(
X[:, np.newaxis, :] - centroids[np.newaxis, :, :],
axis=2
) # Shape: (n_samples, k)
labels = np.argmin(distances, axis=1)
# Step 2: Update centroids to mean of assigned points
new_centroids = np.zeros_like(centroids)
for j in range(k):
cluster_points = X[labels == j]
if len(cluster_points) > 0:
new_centroids[j] = cluster_points.mean(axis=0)
else:
# Empty cluster - reinitialize to random point
new_centroids[j] = X[np.random.randint(n_samples)]
# Check convergence
centroid_shift = np.linalg.norm(new_centroids - centroids)
centroids = new_centroids
if centroid_shift < tol:
break
return labels, centroids
def kmeans_inertia(X, labels, centroids):
"""Compute inertia (sum of squared distances to centroids).
Used for the elbow method to choose k.
"""
distances = np.linalg.norm(
X - centroids[labels], axis=1
)
return np.sum(distances ** 2)
# Usage:
# X = np.random.randn(200, 2)
# labels, centroids = kmeans(X, k=3)
# print(f"Inertia: {kmeans_inertia(X, labels, centroids):.2f}")
Complexity: O(iterations * n * k * d) time, where n = samples, k = clusters, d = features. The main optimization opportunity is using scipy.spatial.distance.cdist instead of broadcasting for large datasets.
Problem 11: SQL - Model A/B Test Analysis
Category: SQL | Time limit: 25 minutes | Difficulty: Medium
Given tables experiments(experiment_id, user_id, variant, timestamp) and conversions(user_id, conversion_timestamp, revenue), write a query to compute per-variant: total users, total conversions, conversion rate, and average revenue per converting user. Only count conversions that happened within 7 days of experiment assignment.
-- Expected output:
-- | variant | total_users | conversions | conv_rate | avg_revenue |
-- |---------|-------------|-------------|-----------|-------------|
-- | control | 5000 | 250 | 0.050 | 42.30 |
-- | treatment| 5000 | 310 | 0.062 | 45.10 |
Hint 1
Hint 2
Hint 3
Solution
SELECT
e.variant,
COUNT(DISTINCT e.user_id) AS total_users,
COUNT(DISTINCT c.user_id) AS conversions,
ROUND(
COUNT(DISTINCT c.user_id) * 1.0 / COUNT(DISTINCT e.user_id),
3
) AS conv_rate,
ROUND(AVG(c.revenue), 2) AS avg_revenue
FROM experiments e
LEFT JOIN conversions c
ON e.user_id = c.user_id
AND c.conversion_timestamp > e.timestamp
AND c.conversion_timestamp <= e.timestamp + INTERVAL '7 days'
GROUP BY e.variant
ORDER BY e.variant;
-- Note: avg_revenue here includes NULL for non-converting users
-- since it is a LEFT JOIN. AVG() in SQL ignores NULLs, so this
-- correctly computes average revenue per converting user.
-- If you want to be explicit:
-- AVG(CASE WHEN c.user_id IS NOT NULL THEN c.revenue END) AS avg_revenue
Complexity: Depends on indexes. With indexes on user_id and timestamp columns, this runs in O(n * log m) where n = experiment assignments and m = conversions.
Problem 12: Binary Tree Level-Order Traversal (DSA - Medium)
Category: Trees / BFS | Time limit: 20 minutes | Difficulty: Medium
Given the root of a binary tree, return the level-order traversal as a list of lists (each sublist contains the values at that level).
Constraints:
- 0 <= number of nodes <= 2000
- -1000 <= node.val <= 1000
# Input: Tree: 3
# / \
# 9 20
# / \
# 15 7
# Output: [[3], [9, 20], [15, 7]]
# ML context: Level-order traversal of decision trees for visualization,
# or processing hierarchical feature representations layer by layer.
Hint 1
Hint 2
Solution
from collections import deque
class TreeNode:
def __init__(self, val=0, left=None, right=None):
self.val = val
self.left = left
self.right = right
def level_order(root):
"""Level-order traversal of a binary tree.
Time: O(n), Space: O(n)
"""
if not root:
return []
result = []
queue = deque([root])
while queue:
level_size = len(queue)
level = []
for _ in range(level_size):
node = queue.popleft()
level.append(node.val)
if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)
result.append(level)
return result
def zigzag_level_order(root):
"""Zigzag (alternating direction) level-order traversal.
Even levels left-to-right, odd levels right-to-left.
"""
if not root:
return []
result = []
queue = deque([root])
left_to_right = True
while queue:
level_size = len(queue)
level = deque()
for _ in range(level_size):
node = queue.popleft()
if left_to_right:
level.append(node.val)
else:
level.appendleft(node.val)
if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)
result.append(list(level))
left_to_right = not left_to_right
return result
Complexity: O(n) time and space. Each node is visited exactly once.
Problem 13: Feature Scaling Pipeline (NumPy - Medium)
Category: NumPy / ML | Time limit: 25 minutes | Difficulty: Medium
Implement StandardScaler and MinMaxScaler from scratch using NumPy. Each should have fit, transform, and fit_transform methods. Handle edge cases: zero variance features, single-sample inputs.
# StandardScaler: z = (x - mean) / std
# MinMaxScaler: z = (x - min) / (max - min), scaled to [0, 1]
Hint 1
Hint 2
Hint 3
Solution
import numpy as np
class StandardScaler:
"""Zero mean, unit variance scaling.
z = (x - mean) / std
"""
def __init__(self):
self.mean_ = None
self.std_ = None
def fit(self, X):
"""Compute mean and std from training data.
Time: O(n * d), Space: O(d)
"""
self.mean_ = np.mean(X, axis=0)
self.std_ = np.std(X, axis=0)
# Replace zero std with 1 to avoid division by zero
self.std_ = np.where(self.std_ == 0, 1.0, self.std_)
return self
def transform(self, X):
"""Apply scaling using stored mean and std.
Time: O(n * d), Space: O(n * d)
"""
return (X - self.mean_) / self.std_
def fit_transform(self, X):
return self.fit(X).transform(X)
def inverse_transform(self, X_scaled):
return X_scaled * self.std_ + self.mean_
class MinMaxScaler:
"""Scale features to [0, 1] range.
z = (x - min) / (max - min)
"""
def __init__(self, feature_range=(0, 1)):
self.feature_range = feature_range
self.min_ = None
self.scale_ = None
def fit(self, X):
"""Compute min and range from training data.
Time: O(n * d), Space: O(d)
"""
self.min_ = np.min(X, axis=0)
data_range = np.max(X, axis=0) - self.min_
# Replace zero range with 1 to avoid division by zero
self.scale_ = np.where(data_range == 0, 1.0, data_range)
return self
def transform(self, X):
"""Apply scaling using stored min and range.
Time: O(n * d), Space: O(n * d)
"""
low, high = self.feature_range
X_std = (X - self.min_) / self.scale_
return X_std * (high - low) + low
def fit_transform(self, X):
return self.fit(X).transform(X)
def inverse_transform(self, X_scaled):
low, high = self.feature_range
X_std = (X_scaled - low) / (high - low)
return X_std * self.scale_ + self.min_
# Verification:
# X_train = np.array([[1, 2], [3, 4], [5, 6]], dtype=float)
# scaler = StandardScaler()
# X_scaled = scaler.fit_transform(X_train)
# assert np.allclose(X_scaled.mean(axis=0), 0)
# assert np.allclose(X_scaled.std(axis=0), 1)
# assert np.allclose(scaler.inverse_transform(X_scaled), X_train)
Complexity: O(n * d) for both fit and transform. The key interview insight: always fit on training data, then transform both training and test data using the training statistics. Never fit on test data.
A common interview mistake is fitting the scaler on the entire dataset (train + test) before splitting. This causes data leakage - the model "sees" test data statistics during training. Always fit on training data only, then transform the test data with the same fitted scaler.
Problem 14: Trie-Based Autocomplete (DSA - Medium)
Category: Trie | Time limit: 25 minutes | Difficulty: Medium
Implement a trie that supports inserting words and returning the top-3 autocomplete suggestions for a given prefix, sorted by frequency.
Constraints:
- Words are lowercase English letters only
- Insert may be called with the same word multiple times (increasing its frequency)
# autocomplete = Autocomplete()
# autocomplete.insert("transformer", 100)
# autocomplete.insert("transfer", 50)
# autocomplete.insert("translate", 30)
# autocomplete.insert("tree", 80)
# autocomplete.search("tr") # ["transformer", "tree", "transfer"]
# autocomplete.search("tra") # ["transformer", "transfer", "translate"]
Hint 1
Hint 2
Hint 3
Solution
import heapq
class TrieNode:
def __init__(self):
self.children = {}
self.is_end = False
self.frequency = 0
class Autocomplete:
"""Trie-based autocomplete with frequency-ranked suggestions.
Insert: O(L) per word, L = word length
Search: O(P + N * L) where P = prefix length, N = words in subtree
Space: O(total characters across all words)
"""
def __init__(self):
self.root = TrieNode()
def insert(self, word, frequency=1):
"""Insert a word with its frequency (or increment)."""
node = self.root
for char in word:
if char not in node.children:
node.children[char] = TrieNode()
node = node.children[char]
node.is_end = True
node.frequency += frequency
def search(self, prefix, top_k=3):
"""Return top-k words matching the prefix, by frequency."""
# Navigate to prefix node
node = self.root
for char in prefix:
if char not in node.children:
return []
node = node.children[char]
# DFS to collect all words in subtree
results = []
self._dfs(node, prefix, results)
# Return top-k by frequency
results.sort(key=lambda x: -x[0])
return [word for freq, word in results[:top_k]]
def _dfs(self, node, current_word, results):
"""Collect all words in the subtree."""
if node.is_end:
results.append((node.frequency, current_word))
for char, child in node.children.items():
self._dfs(child, current_word + char, results)
Complexity: Insert is O(L). Search is O(P + N * L_avg) where N is the number of words matching the prefix. For a production system, you would precompute and cache the top-K at each prefix node.
Problem 15: SQL - Feature Store Query
Category: SQL | Time limit: 25 minutes | Difficulty: Medium
Given tables features(entity_id, feature_name, feature_value, timestamp) and prediction_requests(request_id, entity_id, request_timestamp), write a query to retrieve the most recent feature value for each feature name at the time of each prediction request (point-in-time join).
-- Features table (slowly changing features):
-- | entity_id | feature_name | feature_value | timestamp |
-- |-----------|-------------|---------------|---------------------|
-- | user_1 | age | 25 | 2024-01-01 00:00:00 |
-- | user_1 | age | 26 | 2024-06-15 00:00:00 |
-- | user_1 | score | 0.8 | 2024-03-01 00:00:00 |
-- Prediction request at 2024-04-01 for user_1 should get:
-- age = 25 (not 26, which is in the future), score = 0.8
Hint 1
Hint 2
Hint 3
Solution
-- Approach: Window function to rank features per (request, feature)
WITH ranked_features AS (
SELECT
pr.request_id,
pr.entity_id,
pr.request_timestamp,
f.feature_name,
f.feature_value,
f.timestamp AS feature_timestamp,
ROW_NUMBER() OVER (
PARTITION BY pr.request_id, f.feature_name
ORDER BY f.timestamp DESC
) AS rn
FROM prediction_requests pr
INNER JOIN features f
ON pr.entity_id = f.entity_id
AND f.timestamp <= pr.request_timestamp
)
SELECT
request_id,
entity_id,
request_timestamp,
feature_name,
feature_value,
feature_timestamp
FROM ranked_features
WHERE rn = 1
ORDER BY request_id, feature_name;
-- To pivot features into columns (if feature names are known):
WITH ranked_features AS (
SELECT
pr.request_id,
pr.entity_id,
f.feature_name,
f.feature_value,
ROW_NUMBER() OVER (
PARTITION BY pr.request_id, f.feature_name
ORDER BY f.timestamp DESC
) AS rn
FROM prediction_requests pr
INNER JOIN features f
ON pr.entity_id = f.entity_id
AND f.timestamp <= pr.request_timestamp
)
SELECT
request_id,
entity_id,
MAX(CASE WHEN feature_name = 'age' THEN feature_value END) AS age,
MAX(CASE WHEN feature_name = 'score' THEN feature_value END) AS score
FROM ranked_features
WHERE rn = 1
GROUP BY request_id, entity_id;
Complexity: O(R * F * log F) where R = number of requests and F = number of feature records per entity. The point-in-time join pattern is fundamental to feature stores (Feast, Tecton) and is frequently asked at ML platform companies.
Point-in-time joins are a signature interview question at ML platform companies (Tecton, Databricks, Uber ML Platform). At product companies (Meta, Netflix), the same concept appears as "What was the user's state at the time of prediction?" Getting this wrong causes training-serving skew, which is one of the most common production ML bugs.
Problem 16: Implement Gradient Descent (ML - Medium)
Category: ML Implementation | Time limit: 25 minutes | Difficulty: Medium
Implement batch gradient descent for linear regression from scratch. Given training data X (n_samples, n_features) and y (n_samples,), find the optimal weights.
Constraints:
- Use NumPy only - no sklearn or ML frameworks
- Implement both the loss function and gradient computation
- Include a bias term
# Model: y_pred = X @ w + b
# Loss: MSE = (1/n) * sum((y_pred - y)^2)
# Gradient w.r.t. w: (2/n) * X.T @ (y_pred - y)
# Gradient w.r.t. b: (2/n) * sum(y_pred - y)
Hint 1
Hint 2
Hint 3
Solution
import numpy as np
class LinearRegression:
"""Linear regression via batch gradient descent.
Time per iteration: O(n * d)
Space: O(d) for weights
"""
def __init__(self, learning_rate=0.01, n_iterations=1000, tol=1e-6):
self.lr = learning_rate
self.n_iterations = n_iterations
self.tol = tol
self.weights = None
self.bias = None
self.loss_history = []
def fit(self, X, y):
"""Fit linear regression using gradient descent."""
n_samples, n_features = X.shape
self.weights = np.zeros(n_features)
self.bias = 0.0
self.loss_history = []
for i in range(self.n_iterations):
# Forward pass
y_pred = X @ self.weights + self.bias
# Compute MSE loss
residuals = y_pred - y
loss = np.mean(residuals ** 2)
self.loss_history.append(loss)
# Check convergence
if i > 0 and abs(self.loss_history[-2] - loss) < self.tol:
break
# Compute gradients
dw = (2.0 / n_samples) * (X.T @ residuals)
db = (2.0 / n_samples) * np.sum(residuals)
# Update parameters
self.weights -= self.lr * dw
self.bias -= self.lr * db
return self
def predict(self, X):
return X @ self.weights + self.bias
def score(self, X, y):
"""R-squared score."""
y_pred = self.predict(X)
ss_res = np.sum((y - y_pred) ** 2)
ss_tot = np.sum((y - np.mean(y)) ** 2)
return 1 - ss_res / ss_tot
# Also implement the closed-form solution for comparison:
def linear_regression_closed_form(X, y):
"""Normal equation: w = (X^T X)^(-1) X^T y
Time: O(d^2 * n + d^3), Space: O(d^2)
Only practical when d (features) is small.
"""
# Add bias column
X_aug = np.column_stack([X, np.ones(len(X))])
# Normal equation
w = np.linalg.solve(X_aug.T @ X_aug, X_aug.T @ y)
return w[:-1], w[-1] # weights, bias
# Verification:
# X = np.random.randn(100, 3)
# true_w = np.array([2.0, -1.0, 0.5])
# y = X @ true_w + 0.5 + np.random.randn(100) * 0.1
# model = LinearRegression(learning_rate=0.01, n_iterations=5000)
# model.fit(X, y)
# print(f"Learned weights: {model.weights}") # Should be close to [2, -1, 0.5]
# print(f"Learned bias: {model.bias}") # Should be close to 0.5
Complexity: O(iterations * n * d) for gradient descent. O(n * d^2 + d^3) for the closed-form solution. Gradient descent is preferred when n and d are both large.
Problem 17: Merge Intervals for GPU Scheduling (DSA - Medium)
Category: Sorting + Intervals | Time limit: 20 minutes | Difficulty: Medium
Given a list of GPU time slot reservations (start, end), merge all overlapping reservations and return the consolidated schedule.
Constraints:
- 1 <= intervals.length <= 10^4
- 0 <= start_i < end_i <= 10^4
# Input: [[1, 3], [2, 6], [8, 10], [15, 18]]
# Output: [[1, 6], [8, 10], [15, 18]]
# [1,3] and [2,6] overlap -> merge to [1,6]
Hint 1
Hint 2
Solution
def merge_intervals(intervals):
"""Merge overlapping intervals.
Time: O(n log n), Space: O(n)
"""
if not intervals:
return []
intervals.sort(key=lambda x: x[0])
merged = [intervals[0]]
for start, end in intervals[1:]:
if start <= merged[-1][1]:
# Overlap - extend the current interval
merged[-1][1] = max(merged[-1][1], end)
else:
# No overlap - start a new interval
merged.append([start, end])
return merged
def total_gpu_time(intervals):
"""Compute total GPU time after merging overlapping reservations."""
merged = merge_intervals(intervals)
return sum(end - start for start, end in merged)
Complexity: O(n log n) for sorting, O(n) for the merge pass. Total: O(n log n).
Problem 18: Implement Attention Score Computation (ML - Hard)
Category: ML Implementation | Time limit: 35 minutes | Difficulty: Hard
Implement the scaled dot-product attention mechanism from the "Attention Is All You Need" paper using only NumPy. Handle the optional causal mask.
# Attention(Q, K, V) = softmax(Q @ K^T / sqrt(d_k)) @ V
# Q: (batch, seq_len_q, d_k)
# K: (batch, seq_len_k, d_k)
# V: (batch, seq_len_k, d_v)
# Output: (batch, seq_len_q, d_v)
# With causal mask: set upper triangle of attention scores to -inf
# before softmax (prevents attending to future positions)
Hint 1
Hint 2
Hint 3
Solution
import numpy as np
def softmax_stable(x, axis=-1):
"""Numerically stable softmax along specified axis."""
x_max = np.max(x, axis=axis, keepdims=True)
exp_x = np.exp(x - x_max)
return exp_x / np.sum(exp_x, axis=axis, keepdims=True)
def scaled_dot_product_attention(Q, K, V, mask=None):
"""Scaled dot-product attention.
Args:
Q: Query tensor (batch, seq_q, d_k)
K: Key tensor (batch, seq_k, d_k)
V: Value tensor (batch, seq_k, d_v)
mask: Optional boolean mask (seq_q, seq_k), True = mask out
Returns:
output: (batch, seq_q, d_v)
attention_weights: (batch, seq_q, seq_k)
Time: O(batch * seq_q * seq_k * d_k)
Space: O(batch * seq_q * seq_k)
"""
d_k = Q.shape[-1]
# Compute attention scores
# Q @ K^T -> (batch, seq_q, seq_k)
scores = np.matmul(Q, K.transpose(0, 2, 1)) / np.sqrt(d_k)
# Apply mask (set masked positions to large negative value)
if mask is not None:
scores = np.where(mask, -1e9, scores)
# Softmax over key dimension
attention_weights = softmax_stable(scores, axis=-1)
# Weighted sum of values
output = np.matmul(attention_weights, V)
return output, attention_weights
def causal_mask(seq_len):
"""Create a causal (autoregressive) mask.
Upper triangle is True (masked), diagonal and below is False.
Prevents attending to future positions.
"""
return np.triu(np.ones((seq_len, seq_len), dtype=bool), k=1)
def multi_head_attention(Q, K, V, W_q, W_k, W_v, W_o, n_heads, mask=None):
"""Multi-head attention.
Args:
Q, K, V: (batch, seq_len, d_model)
W_q, W_k, W_v: (d_model, d_model) projection weights
W_o: (d_model, d_model) output projection
n_heads: number of attention heads
Returns:
output: (batch, seq_len, d_model)
"""
batch, seq_len, d_model = Q.shape
d_k = d_model // n_heads
# Linear projections
Q_proj = Q @ W_q # (batch, seq_q, d_model)
K_proj = K @ W_k
V_proj = V @ W_v
# Split into heads: (batch, n_heads, seq_len, d_k)
Q_heads = Q_proj.reshape(batch, seq_len, n_heads, d_k).transpose(0, 2, 1, 3)
K_heads = K_proj.reshape(batch, seq_len, n_heads, d_k).transpose(0, 2, 1, 3)
V_heads = V_proj.reshape(batch, seq_len, n_heads, d_k).transpose(0, 2, 1, 3)
# Reshape for batch attention: (batch * n_heads, seq_len, d_k)
Q_flat = Q_heads.reshape(batch * n_heads, seq_len, d_k)
K_flat = K_heads.reshape(batch * n_heads, seq_len, d_k)
V_flat = V_heads.reshape(batch * n_heads, seq_len, d_k)
# Apply attention
attn_output, _ = scaled_dot_product_attention(Q_flat, K_flat, V_flat, mask)
# Reshape back: (batch, n_heads, seq_len, d_k) -> (batch, seq_len, d_model)
attn_output = attn_output.reshape(batch, n_heads, seq_len, d_k)
attn_output = attn_output.transpose(0, 2, 1, 3).reshape(batch, seq_len, d_model)
# Output projection
output = attn_output @ W_o
return output
# Verification:
# batch, seq_len, d_model = 2, 10, 64
# Q = K = V = np.random.randn(batch, seq_len, d_model)
# mask = causal_mask(seq_len)
# output, weights = scaled_dot_product_attention(Q, K, V, mask)
# assert output.shape == (batch, seq_len, d_model)
# assert weights.shape == (batch, seq_len, seq_len)
# assert np.allclose(weights.sum(axis=-1), 1.0) # Rows sum to 1
# Check causal: weights[:, 0, 1:] should be ~0 (first token doesn't attend to future)
Complexity: O(batch * n^2 * d) where n = sequence length, d = dimension. This quadratic cost in sequence length is why efficient attention variants (FlashAttention, linear attention) exist.
Problem 19: Sliding Window Maximum for Model Monitoring (DSA - Hard)
Category: Monotonic Deque | Time limit: 30 minutes | Difficulty: Hard
Given an array of model latency measurements and a window size K, return the maximum latency in each window. Solve it in O(n) time.
Constraints:
- 1 <= len(latencies) <= 10^5
- 1 <= k <= len(latencies)
# Input: latencies = [1, 3, -1, -3, 5, 3, 6, 7], k = 3
# Output: [3, 3, 5, 5, 6, 7]
# Windows: [1,3,-1] [3,-1,-3] [-1,-3,5] [-3,5,3] [5,3,6] [3,6,7]
# Max: [3, 3, 5, 5, 6, 7]
Hint 1
Hint 2
Hint 3
Solution
from collections import deque
def sliding_window_max(arr, k):
"""Maximum in each sliding window of size k.
Uses a monotonic decreasing deque.
Time: O(n) amortized - each element pushed/popped at most once
Space: O(k) for the deque
"""
if not arr or k == 0:
return []
dq = deque() # Stores indices, values in decreasing order
result = []
for i in range(len(arr)):
# Remove indices outside the window
while dq and dq[0] < i - k + 1:
dq.popleft()
# Remove indices whose values are less than arr[i]
# They can never be the max while arr[i] is in the window
while dq and arr[dq[-1]] <= arr[i]:
dq.pop()
dq.append(i)
# Window is complete starting at index k-1
if i >= k - 1:
result.append(arr[dq[0]])
return result
# Extension: sliding window min (change <= to >=)
def sliding_window_min(arr, k):
"""Minimum in each sliding window of size k."""
dq = deque()
result = []
for i in range(len(arr)):
while dq and dq[0] < i - k + 1:
dq.popleft()
while dq and arr[dq[-1]] >= arr[i]:
dq.pop()
dq.append(i)
if i >= k - 1:
result.append(arr[dq[0]])
return result
Complexity: O(n) time, O(k) space. The amortized argument: each of the n elements is pushed onto the deque exactly once and popped at most once, so total operations across all iterations is at most 2n.
If you solve sliding window maximum with a nested loop - O(nk) - and cannot optimize to O(n) when prompted, this is a strong negative signal for ML infrastructure roles. The monotonic deque is a fundamental technique for streaming data problems, which are central to production ML monitoring. You must be able to explain why the deque approach is O(n) amortized, not just memorize the code.
Problem 20: Design a Mini Batch Data Loader (ML + DSA - Hard)
Category: ML System Design + Implementation | Time limit: 35 minutes | Difficulty: Hard
Implement a DataLoader class that takes a dataset and yields shuffled mini-batches. Support features: shuffling, dropping the last incomplete batch, and a simple collate function.
Constraints:
- Dataset is a list of (features, label) tuples
- Batches should be returned as NumPy arrays
- Shuffling should be reproducible with a seed
- Each epoch should see all data exactly once (when not dropping last batch)
# dataset = [(np.array([1, 2]), 0), (np.array([3, 4]), 1), ...]
# loader = DataLoader(dataset, batch_size=32, shuffle=True, drop_last=False)
# for features, labels in loader:
# # features.shape = (32, 2) or smaller for last batch
# # labels.shape = (32,) or smaller for last batch
# pass
Hint 1
Hint 2
Hint 3
Solution
import numpy as np
class DataLoader:
"""Mini-batch data loader with shuffling.
Mimics PyTorch's DataLoader API for tabular / array data.
Time per epoch: O(n) for shuffling + O(n) for iteration
Space: O(batch_size) per batch + O(n) for indices
"""
def __init__(self, dataset, batch_size=32, shuffle=False,
drop_last=False, seed=None):
"""
Args:
dataset: List of (features, label) tuples
batch_size: Number of samples per batch
shuffle: Whether to shuffle at each epoch
drop_last: Drop the last batch if it is smaller than batch_size
seed: Random seed for reproducibility
"""
self.dataset = dataset
self.batch_size = batch_size
self.shuffle = shuffle
self.drop_last = drop_last
self.rng = np.random.RandomState(seed)
def __len__(self):
"""Number of batches per epoch."""
n = len(self.dataset)
if self.drop_last:
return n // self.batch_size
return (n + self.batch_size - 1) // self.batch_size
def __iter__(self):
"""Yield batches of (features_array, labels_array)."""
n = len(self.dataset)
indices = np.arange(n)
if self.shuffle:
self.rng.shuffle(indices)
for start in range(0, n, self.batch_size):
end = min(start + self.batch_size, n)
# Drop last incomplete batch if requested
if self.drop_last and (end - start) < self.batch_size:
break
batch_indices = indices[start:end]
batch = [self.dataset[i] for i in batch_indices]
# Collate: separate features and labels, stack into arrays
features = np.stack([item[0] for item in batch])
labels = np.array([item[1] for item in batch])
yield features, labels
def on_epoch_end(self):
"""Called at end of epoch - useful for custom logic."""
pass
# Advanced: DataLoader with multiple workers (simplified)
class PrefetchDataLoader:
"""DataLoader with prefetching for overlapping data loading and compute.
In production, this would use multiprocessing. Here we simulate
with a simple buffer.
"""
def __init__(self, dataset, batch_size=32, shuffle=False,
prefetch_factor=2, seed=None):
self.base_loader = DataLoader(dataset, batch_size, shuffle, seed=seed)
self.prefetch_factor = prefetch_factor
def __iter__(self):
"""Prefetch batches into a buffer."""
buffer = []
iterator = iter(self.base_loader)
# Fill initial buffer
for _ in range(self.prefetch_factor):
try:
buffer.append(next(iterator))
except StopIteration:
break
while buffer:
batch = buffer.pop(0)
# Refill buffer
try:
buffer.append(next(iterator))
except StopIteration:
pass
yield batch
# Verification:
# dataset = [(np.array([i, i+1], dtype=float), i % 3) for i in range(100)]
# loader = DataLoader(dataset, batch_size=32, shuffle=True, seed=42)
#
# all_indices = []
# for features, labels in loader:
# assert features.shape[1] == 2
# all_indices.extend(features[:, 0].astype(int).tolist())
#
# assert len(all_indices) == 100 # All samples seen
# assert sorted(all_indices) == list(range(100)) # No duplicates
Complexity: O(n) per epoch for shuffling and iteration. O(batch_size) memory per batch. The key design insight: shuffling indices rather than data avoids copying the entire dataset.
Spaced Repetition Checkpoints
| Day | Review Activity | Time |
|---|---|---|
| Day 0 | Solve Problems 1, 8, and 12 without hints. Time yourself. | 45 min |
| Day 3 | Solve Problems 5 (LRU Cache) and 9 (Topological Sort) from scratch. These are the most commonly asked. | 40 min |
| Day 7 | Solve the NumPy problems (2, 6, 13) and one SQL problem (7 or 11). Focus on vectorized thinking. | 40 min |
| Day 14 | Do a full mock: set a 60-minute timer and solve Problems 3, 10, 16, and 17 (one from each category). | 60 min |
| Day 21 | Implement attention (Problem 18) and the DataLoader (Problem 20) from memory. These test deep ML understanding. | 50 min |
| Day 28 | Final mock: pick 4 problems you found hardest. Solve under time pressure. Review any patterns you missed. | 60 min |
Interview Cheat Sheet
| Problem Type | Pattern to Recognize | Key Data Structure | Time Target |
|---|---|---|---|
| Two Sum variants | "Find pair with property X" | Hash Map | 10 min |
| Top-K / Frequency | "Most common / highest K" | Heap + Hash Map | 15 min |
| Sliding Window | "Window of size K", "subarray" | Deque (monotonic) | 15 min |
| Cache Design | "LRU / LFU / TTL cache" | Hash Map + Linked List | 20 min |
| Graph / Dependencies | "Prerequisites", "order" | Adjacency List + BFS | 20 min |
| Tree Traversal | "Level by level", "zigzag" | Queue (BFS) | 15 min |
| Interval Problems | "Merge / schedule overlapping" | Sort + Scan | 15 min |
| Binary Search | "Sorted", "minimum that satisfies" | Two pointers | 15 min |
| Trie | "Prefix search", "autocomplete" | Trie nodes | 20 min |
| NumPy Vectorization | "Without loops", "batch" | Broadcasting, einsum | 15 min |
| SQL Joins/Windows | "Per user", "rolling", "retention" | JOIN + Window functions | 20 min |
| ML From Scratch | "Implement softmax / attention / KMeans" | NumPy arrays | 25 min |
Next Steps
You have worked through 20 problems spanning all major categories in AI/ML coding interviews. Your next steps:
- Track your times. If any Medium problem takes you more than 25 minutes, revisit the underlying pattern chapter (Arrays, Trees, DP, etc.).
- Practice under pressure. Do timed mock interviews with a partner or use platforms like Interviewing.io.
- Review the pattern chapters. Go back to DSA Foundations through Complexity Analysis for any patterns where you feel weak.
- Build muscle memory. The Spaced Repetition Checkpoints above are designed to move these solutions from short-term to long-term memory. Follow the schedule.
