Skip to main content

Dynamic Programming for ML

The Hidden DP in Everything You Deploy

A speech recognition engineer at a major tech company was debugging why their model occasionally produced sequences with repeated characters that made no phonetic sense. The model was trained with CTC loss - Connectionist Temporal Classification - and was supposed to handle the alignment between audio frames and text characters automatically. But something in the decoding was wrong.

The root cause took two days to find: the team had replaced the CTC greedy decoder with a custom "faster" implementation that greedily picked the most likely character at each timestep without considering the CTC blank token rules. The original CTC decoding uses a specific dynamic programming procedure to marginalize over all possible alignments of audio frames to characters - accounting for repeated characters, blank tokens, and the constraint that the same character can only appear twice in sequence if there is a blank token between them. The custom greedy decoder ignored all of this structure, producing hallucinated character sequences.

Dynamic programming is not just a technique for solving textbook problems about coins and knapsacks. It is embedded in the core algorithms of modern ML: the Viterbi algorithm that powers sequence labeling in NLP, the CTC decoding algorithm in every speech recognition and OCR system, the beam search decoder in every language model, the Bellman equation that defines value functions in reinforcement learning, and edit distance that underlies BLEU and ROUGE evaluation metrics.

The engineer who does not understand DP treats these as black boxes. The engineer who does understand it can debug the CTC decoder, implement a custom beam search with constraints, tune the Bellman backup operators for their RL environment, and understand why value iteration converges. This lesson builds that understanding - from the principles to the production code.


Why This Exists

The fundamental problem DP solves is optimization over structures with shared substructure. Many problems can be decomposed into subproblems, but naive recursion solves the same subproblem exponentially many times. DP solves each subproblem once and stores the result.

Two properties must hold:

  1. Optimal substructure: the optimal solution to the whole problem can be built from optimal solutions to subproblems.
  2. Overlapping subproblems: subproblems recur, so caching their solutions avoids redundant computation.

Without DP, edit distance between two strings of length nn and mm would be O(3n+m)O(3^{n+m}) via naive recursion. With DP: O(nm)O(nm). For comparing two 100-word strings: 32003^{200} operations vs 10,00010,000 operations. That is roughly 109310^{93} times faster.


Historical Context

Dynamic programming was invented by Richard Bellman in the 1950s at the RAND Corporation. The name is deliberately obscure - Bellman chose it specifically to hide the mathematical nature of his work from hostile government funders who were suspicious of mathematics. "Dynamic" sounded operational; "programming" meant planning (in the military sense, not computer sense). By the time anyone realized it was mathematics, the name had stuck.

Bellman's original application was optimal control theory - finding the optimal policy for a system over time. The Bellman equation, which defines the value of a state as the immediate reward plus the discounted value of the next state under the optimal policy, is the foundation of modern reinforcement learning.

The Viterbi algorithm was developed by Andrew Viterbi in 1967 for decoding convolutional codes in digital communications. It was independently discovered for HMM decoding in the 1970s and became the dominant sequence decoding algorithm for speech recognition in the 1980s and 1990s. Modern CRF models still use Viterbi decoding.

CTC (Connectionist Temporal Classification) was introduced by Alex Graves et al. in 2006, extending DP-based alignment ideas from HMMs to neural networks. It enabled end-to-end training of speech recognition models without pre-aligned training data.


Core Concepts

The Two Implementations: Memoization vs Tabulation

DP can be implemented in two equivalent ways:

Memoization (top-down): implement the natural recursion, but cache the result of each unique subproblem. When the recursive call encounters a subproblem it has already solved, return the cached result. Implemented with a dictionary or array of results.

Tabulation (bottom-up): identify the subproblem ordering (small problems before large), allocate a table to store all subproblem results, and fill in the table from smallest to largest. No recursion - iterative loops over the table.

Memoization is easier to implement (just add a cache to the naive recursion) but has function call overhead and risk of Python stack overflow for deep recursion. Tabulation is faster in practice (no function call overhead, better cache locality) and handles large problems without stack issues. For ML applications, tabulation is almost always preferred.

# Classic example: Fibonacci
# O(2^n) without caching, O(n) with DP

def fib_naive(n: int) -> int:
"""O(2^n) - exponential without caching."""
if n <= 1:
return n
return fib_naive(n-1) + fib_naive(n-2)

# Memoization (top-down)
from functools import lru_cache

@lru_cache(maxsize=None)
def fib_memo(n: int) -> int:
"""O(n) time, O(n) space."""
if n <= 1:
return n
return fib_memo(n-1) + fib_memo(n-2)

# Tabulation (bottom-up) - preferred for ML applications
def fib_dp(n: int) -> int:
"""O(n) time, O(1) space (rolling window)."""
if n <= 1:
return n
prev, curr = 0, 1
for _ in range(2, n+1):
prev, curr = curr, prev + curr
return curr

Edit Distance: The Foundation of NLP Evaluation

Edit distance (Levenshtein distance) is the minimum number of single-character edits (insertions, deletions, substitutions) needed to transform one string into another. It is the basis for:

  • BLEU score computation (word-level edit operations)
  • WER (Word Error Rate) in ASR evaluation
  • Fuzzy string matching in data processing pipelines
  • Approximate string matching in feature engineering

Subproblem: let dp[i][j]\text{dp}[i][j] = minimum edit distance to transform the first ii characters of string AA into the first jj characters of string BB.

Recurrence:

i & \text{if } j = 0 \text{ (delete all of A)} \\ j & \text{if } i = 0 \text{ (insert all of B)} \\ \text{dp}[i-1][j-1] & \text{if } A[i] = B[j] \text{ (no edit needed)} \\ 1 + \min\begin{pmatrix}\text{dp}[i-1][j] \\ \text{dp}[i][j-1] \\ \text{dp}[i-1][j-1]\end{pmatrix} & \text{otherwise (delete, insert, or substitute)} \end{cases}$$ Time: $O(nm)$, Space: $O(nm)$ or $O(\min(n,m))$ with the rolling-row optimization. ```python import numpy as np from typing import List, Tuple, Optional def edit_distance( a: str, b: str, return_alignment: bool = False ) -> Tuple[int, Optional[List[Tuple[str, str, str]]]]: """ Levenshtein edit distance between strings a and b. O(nm) time, O(nm) space (for alignment traceback). Args: a, b: input strings return_alignment: if True, return the alignment operations Returns: (distance, operations) where operations is a list of (op, char_from_a, char_from_b) tuples if return_alignment=True """ n, m = len(a), len(b) # dp[i][j] = edit distance between a[:i] and b[:j] dp = np.zeros((n+1, m+1), dtype=np.int32) # Base cases for i in range(n+1): dp[i][0] = i # delete all of a[:i] for j in range(m+1): dp[0][j] = j # insert all of b[:j] # Fill DP table for i in range(1, n+1): for j in range(1, m+1): if a[i-1] == b[j-1]: dp[i][j] = dp[i-1][j-1] else: dp[i][j] = 1 + min( dp[i-1][j], # delete a[i-1] dp[i][j-1], # insert b[j-1] dp[i-1][j-1] # substitute a[i-1] -> b[j-1] ) distance = int(dp[n][m]) if not return_alignment: return distance, None # Traceback to find alignment operations operations = [] i, j = n, m while i > 0 or j > 0: if i > 0 and j > 0 and a[i-1] == b[j-1]: operations.append(("match", a[i-1], b[j-1])) i -= 1 j -= 1 elif i > 0 and j > 0 and dp[i][j] == dp[i-1][j-1] + 1: operations.append(("substitute", a[i-1], b[j-1])) i -= 1 j -= 1 elif i > 0 and dp[i][j] == dp[i-1][j] + 1: operations.append(("delete", a[i-1], "-")) i -= 1 else: operations.append(("insert", "-", b[j-1])) j -= 1 operations.reverse() return distance, operations # Word-level edit distance for WER (Word Error Rate) def word_error_rate(reference: str, hypothesis: str) -> float: """ Compute WER for speech recognition evaluation. WER = edit_distance(ref_words, hyp_words) / len(ref_words) """ ref_words = reference.lower().split() hyp_words = hypothesis.lower().split() dist, _ = edit_distance(' '.join(ref_words), ' '.join(hyp_words)) # Note: for word-level we need to adapt edit_distance to work on word tokens # Here we simplify by computing character distance on the word sequences # Proper word-level edit distance n, m = len(ref_words), len(hyp_words) dp = np.zeros((n+1, m+1), dtype=np.int32) for i in range(n+1): dp[i][0] = i for j in range(m+1): dp[0][j] = j for i in range(1, n+1): for j in range(1, m+1): if ref_words[i-1] == hyp_words[j-1]: dp[i][j] = dp[i-1][j-1] else: dp[i][j] = 1 + min(dp[i-1][j], dp[i][j-1], dp[i-1][j-1]) return dp[n][m] / max(len(ref_words), 1) # Demo ref = "the quick brown fox jumps over the lazy dog" hyp = "the quick brown fox jumped over lazy dog" wer = word_error_rate(ref, hyp) dist, ops = edit_distance("intention", "execution", return_alignment=True) print(f"Edit distance('intention', 'execution') = {dist}") for op, ca, cb in ops: print(f" {op:12s}: '{ca}' -> '{cb}'") print(f"\nWER: {wer:.3f} ({wer*100:.1f}%)") ``` ### Viterbi Algorithm: Optimal Sequence Decoding The Viterbi algorithm finds the most probable sequence of hidden states given an observed sequence, in a model where each state emits an observation and transitions to the next state according to learned probabilities. This is HMM (Hidden Markov Model) decoding, but the same algorithm applies to CRF (Conditional Random Field) sequence labeling used in modern NLP. In NER (Named Entity Recognition) or POS tagging, you want: $$\hat{y} = \arg\max_y P(y | x) = \arg\max_y \prod_t P(y_t | y_{t-1}, x_t)$$ Naive approach: enumerate all $K^T$ possible label sequences where $K$ = number of labels and $T$ = sequence length. For $K=10$ labels and $T=100$ tokens: $10^{100}$ sequences. Completely infeasible. Viterbi uses DP: $\text{dp}[t][k]$ = the maximum probability of any path that ends in state $k$ at time $t$. $$\text{dp}[t][k] = \max_{k'} \left( \text{dp}[t-1][k'] \cdot P(\text{transition: } k' \to k) \right) \cdot P(\text{emission: observation}_t | k)$$ Time: $O(TK^2)$ instead of $O(K^T)$. For $K=10$, $T=100$: $10,000$ operations vs $10^{100}$. ```python import numpy as np from typing import List, Tuple, Dict class ViterbiDecoder: """ Viterbi algorithm for sequence labeling. Used in: - NER (Named Entity Recognition): O-PER-ORG-LOC labels - POS tagging: NN-VB-DT labels - HMM speech recognition - CRF decoding (uses the same DP structure) Complexity: O(T * K^2) where T = sequence length, K = number of states """ def __init__(self, states: List[str]): self.states = states self.K = len(states) self.state_to_idx = {s: i for i, s in enumerate(states)} def decode( self, emission_scores: np.ndarray, # (T, K): log P(observation_t | state_k) transition_scores: np.ndarray, # (K, K): log P(state_j | state_i) start_scores: np.ndarray, # (K,): log P(initial state = k) ) -> Tuple[List[str], float]: """ Find the most likely state sequence given observation scores. Args: emission_scores: (T, K) log-probabilities from the model transition_scores: (K, K) log-probabilities of transitioning from state i to j start_scores: (K,) log-probabilities of starting in each state Returns: (best_sequence, best_score) """ T, K = emission_scores.shape assert K == self.K # dp[t][k] = max log-prob of any path ending at state k at time t dp = np.full((T, K), -np.inf) # backpointer[t][k] = which state at t-1 led to the best path ending at k at t backptr = np.zeros((T, K), dtype=np.int32) # Initialization: t=0 dp[0] = start_scores + emission_scores[0] # Forward pass: t = 1, ..., T-1 for t in range(1, T): for k in range(K): # For each possible current state k, find the best previous state k' # dp[t-1][k'] + transition[k'][k] + emission[t][k] scores = dp[t-1] + transition_scores[:, k] backptr[t][k] = np.argmax(scores) dp[t][k] = scores[backptr[t][k]] + emission_scores[t][k] # Backtrack: find the best final state best_last_state = int(np.argmax(dp[T-1])) best_score = float(dp[T-1][best_last_state]) # Trace back the best path path = [best_last_state] for t in range(T-1, 0, -1): path.append(int(backptr[t][path[-1]])) path.reverse() best_sequence = [self.states[k] for k in path] return best_sequence, best_score # Demo: NER sequence labeling # BIO tagging scheme: B-PER=begin person, I-PER=inside person, O=outside states = ["O", "B-PER", "I-PER", "B-ORG", "I-ORG", "B-LOC", "I-LOC"] decoder = ViterbiDecoder(states) K = len(states) T = 8 # "Barack Obama works at OpenAI in San Francisco" # Mock emission scores (from a neural network, log-softmax output) np.random.seed(42) emission_scores = np.log(np.random.dirichlet(np.ones(K), size=T)) # Manually set some plausible emissions for demonstration # "Barack" -> B-PER likely emission_scores[0, 1] = -0.1 # B-PER # "Obama" -> I-PER likely emission_scores[1, 2] = -0.1 # I-PER # "OpenAI" -> B-ORG likely emission_scores[3, 3] = -0.1 # B-ORG # "San" -> B-LOC likely emission_scores[6, 5] = -0.1 # B-LOC # "Francisco" -> I-LOC likely emission_scores[7, 6] = -0.1 # I-LOC # Transition scores (log-probabilities) # Encode: I-X can only follow B-X or I-X transition_log = np.log(np.ones((K, K)) / K) # uniform baseline # Penalize invalid BIO transitions heavily transition_log[0, 2] = -10.0 # O -> I-PER invalid transition_log[0, 4] = -10.0 # O -> I-ORG invalid transition_log[0, 6] = -10.0 # O -> I-LOC invalid transition_log[1, 4] = -5.0 # B-PER -> I-ORG unlikely transition_log[3, 2] = -5.0 # B-ORG -> I-PER unlikely start_scores = np.log(np.array([0.5, 0.2, 0.01, 0.15, 0.01, 0.12, 0.01])) tokens = ["Barack", "Obama", "works", "at", "OpenAI", "in", "San", "Francisco"] sequence, score = decoder.decode(emission_scores, transition_log, start_scores) print("Viterbi NER decoding:") for token, label in zip(tokens, sequence): print(f" {token:12s} {label}") print(f"Best path log-score: {score:.3f}") ``` ### CTC: Dynamic Programming for Sequence Alignment CTC (Connectionist Temporal Classification) is the training criterion and decoding algorithm for sequence-to-sequence models where the input (e.g., audio frames) is longer than the output (e.g., character sequence) and the alignment is unknown. The key insight: define an extended label set with a special blank token $\epsilon$. An output sequence "cat" can be generated by many frame-level sequences: "cccaaattt", "c-a-t", "cc--at", etc. (where - represents $\epsilon$). CTC computes the probability of the output as the sum over all valid alignments. The forward algorithm (similar to Viterbi but summing instead of maximizing): Let $\alpha(t, s)$ = probability of having emitted the first $s$ labels of the output (including blanks) after processing $t$ input frames. $$\alpha(t, s) = \begin{cases} \alpha(t-1, s) + \alpha(t-1, s-1) & \text{if } l_s = \epsilon \text{ or } l_s = l_{s-2} \\ \alpha(t-1, s) + \alpha(t-1, s-1) + \alpha(t-1, s-2) & \text{if } l_s \neq \epsilon \text{ and } l_s \neq l_{s-2} \end{cases}$$ where multiplication at each step is by the emission probability of the current label from the model. ```python import numpy as np from typing import List def ctc_greedy_decode(log_probs: np.ndarray, blank_id: int = 0) -> List[int]: """ CTC greedy decode: take argmax at each timestep, remove blanks, collapse repeats. Args: log_probs: (T, vocab_size) log-probabilities from the model blank_id: index of the blank token Returns: decoded token sequence """ # Step 1: argmax at each timestep best_path = np.argmax(log_probs, axis=1) # (T,) # Step 2: collapse consecutive repeats # "aaabbbccc" -> "abc" collapsed = [best_path[0]] for t in range(1, len(best_path)): if best_path[t] != best_path[t-1]: collapsed.append(best_path[t]) # Step 3: remove blank tokens decoded = [token for token in collapsed if token != blank_id] return decoded def ctc_forward_log_prob( log_probs: np.ndarray, targets: List[int], blank_id: int = 0 ) -> float: """ Compute log P(targets | log_probs) summing over all valid CTC alignments. This is what CTC loss uses during training. Args: log_probs: (T, vocab_size) log-probabilities from model targets: target token sequence (without blanks) blank_id: index of blank token Returns: log probability of the target sequence """ T = len(log_probs) # Extended label sequence: insert blank between each label and at start/end # "cat" -> [blank, c, blank, a, blank, t, blank] extended = [blank_id] for token in targets: extended.append(token) extended.append(blank_id) S = len(extended) # 2*len(targets) + 1 NEG_INF = -1e30 # alpha[s] = log probability of being at position s in extended at time t alpha = np.full(S, NEG_INF) alpha[0] = log_probs[0, blank_id] # start at first blank if S > 1: alpha[1] = log_probs[0, extended[1]] # or first real token def log_sum_exp(a, b): """Numerically stable log(exp(a) + exp(b)).""" if a == NEG_INF: return b if b == NEG_INF: return a if a > b: return a + np.log1p(np.exp(b - a)) return b + np.log1p(np.exp(a - b)) # Forward pass over time for t in range(1, T): new_alpha = np.full(S, NEG_INF) for s in range(S): l = extended[s] # Can stay at same position new_alpha[s] = log_sum_exp(new_alpha[s], alpha[s]) # Can advance from previous position if s > 0: new_alpha[s] = log_sum_exp(new_alpha[s], alpha[s-1]) # Can skip the blank if current and two positions back are different labels if s > 1 and extended[s] != blank_id and extended[s] != extended[s-2]: new_alpha[s] = log_sum_exp(new_alpha[s], alpha[s-2]) # Multiply by emission probability if new_alpha[s] != NEG_INF: new_alpha[s] += log_probs[t, l] alpha = new_alpha # Final probability: sum of ending at last blank or last real token if S >= 2: log_p = log_sum_exp(alpha[S-1], alpha[S-2]) else: log_p = alpha[S-1] return float(log_p) # Demo T, vocab_size = 20, 28 # 20 frames, 26 letters + blank + space blank_id = 0 # Simulate model outputs (log-probabilities) np.random.seed(42) raw_probs = np.random.dirichlet(np.ones(vocab_size), size=T) log_probs = np.log(raw_probs + 1e-8) # Greedy decode decoded = ctc_greedy_decode(log_probs, blank_id=blank_id) print(f"CTC greedy decoded: {decoded}") # Compute CTC log-prob for target "cat" = [3, 1, 20] (a=1, c=3, t=20) targets = [3, 1, 20] log_p = ctc_forward_log_prob(log_probs, targets, blank_id=blank_id) print(f"CTC log P(cat | frames) = {log_p:.4f}") ``` ### Dynamic Time Warping for Time Series Similarity Dynamic Time Warping (DTW) measures similarity between two time series that may be misaligned in time. Instead of comparing point-to-point, DTW finds the optimal alignment (warping path) between the two sequences by stretching or compressing the time axis. This is DP: $\text{dtw}[i][j]$ = minimum accumulated distance to align the first $i$ points of series $A$ with the first $j$ points of series $B$. $$\text{dtw}[i][j] = d(A_i, B_j) + \min\begin{pmatrix}\text{dtw}[i-1][j] \\ \text{dtw}[i][j-1] \\ \text{dtw}[i-1][j-1]\end{pmatrix}$$ ML applications: - Comparing multivariate sensor time series (IoT anomaly detection) - Audio template matching (word spotting without full recognition) - Accelerometer gesture recognition - Financial time series similarity for market regime detection - Comparing training loss curves across experiments ```python import numpy as np from typing import Tuple, List def dtw_distance( series_a: np.ndarray, series_b: np.ndarray, window: int = None, return_path: bool = False ) -> Tuple[float, List[Tuple[int, int]]]: """ Dynamic Time Warping distance between two time series. Args: series_a: (n,) or (n, d) time series series_b: (m,) or (m, d) time series window: Sakoe-Chiba band constraint (None = no constraint) Using a window prevents extreme warpings and speeds up DTW return_path: if True, return the alignment path Returns: (dtw_distance, warping_path) Complexity: O(n*m) without window, O(n*w) with window w """ n, m = len(series_a), len(series_b) INF = float('inf') # Cost matrix: element-wise squared distance def point_distance(a, b): return float(np.sum((np.array(a) - np.array(b)) ** 2)) # DTW matrix dp = np.full((n+1, m+1), INF) dp[0][0] = 0.0 for i in range(1, n+1): # Apply Sakoe-Chiba band constraint if specified j_start = max(1, i - window) if window else 1 j_end = min(m+1, i + window + 1) if window else m+1 for j in range(j_start, j_end): cost = point_distance(series_a[i-1], series_b[j-1]) dp[i][j] = cost + min(dp[i-1][j], dp[i][j-1], dp[i-1][j-1]) distance = float(dp[n][m]) if not return_path: return distance, [] # Traceback the warping path path = [(n, m)] i, j = n, m while i > 1 or j > 1: if i == 1: j -= 1 elif j == 1: i -= 1 else: best = min(dp[i-1][j], dp[i][j-1], dp[i-1][j-1]) if dp[i-1][j-1] == best: i -= 1; j -= 1 elif dp[i-1][j] == best: i -= 1 else: j -= 1 path.append((i, j)) path.reverse() return distance, path # Demo: comparing training loss curves np.random.seed(42) n_steps = 100 # Simulate two training runs with different convergence speed steps_a = np.arange(n_steps) loss_a = 2.0 * np.exp(-steps_a / 30) + 0.5 + 0.05 * np.random.randn(n_steps) # Run B converges faster but plateaus later loss_b = 2.0 * np.exp(-steps_a / 20) + 0.6 + 0.05 * np.random.randn(n_steps) # Euclidean distance (point-to-point) euclidean = np.sqrt(np.sum((loss_a - loss_b) ** 2)) # DTW distance (allows time warping) dtw_dist, path = dtw_distance(loss_a, loss_b, window=15, return_path=True) print(f"Euclidean distance: {euclidean:.3f}") print(f"DTW distance: {dtw_dist:.3f}") print(f"Path length: {len(path)} (vs {n_steps} diagonal)") print("DTW allows comparing curves that converge at different rates") ``` ### Bellman Equation and Value Iteration in RL Reinforcement learning is, at its core, a DP problem. The agent must find the optimal policy - the mapping from states to actions that maximizes total expected reward. The Bellman equation expresses the value of a state recursively: $$V^*(s) = \max_a \left[ R(s, a) + \gamma \sum_{s'} P(s' | s, a) V^*(s') \right]$$ where: - $V^*(s)$ = optimal value of state $s$ (maximum expected discounted future reward) - $R(s, a)$ = immediate reward of taking action $a$ in state $s$ - $\gamma \in [0, 1)$ = discount factor - $P(s' | s, a)$ = transition probability to state $s'$ The Bellman equation has optimal substructure: the value of $s$ depends on the values of successor states $s'$. Value iteration solves this by iterating the Bellman operator until convergence. $$V_{k+1}(s) = \max_a \left[ R(s, a) + \gamma \sum_{s'} P(s' | s, a) V_k(s') \right]$$ This converges to $V^*$ in the limit because the Bellman operator is a contraction mapping with factor $\gamma < 1$. ```python import numpy as np from typing import Dict, List, Tuple class MDPValueIteration: """ Value iteration for solving Markov Decision Processes. Bellman backup: V_{k+1}(s) = max_a [ R(s,a) + gamma * sum_s' P(s'|s,a) * V_k(s') ] Converges when max |V_{k+1}(s) - V_k(s)| < epsilon (contraction mapping) Application: resource allocation for ML serving - States: server load levels (0%, 25%, 50%, 75%, 100%) - Actions: scale up, maintain, scale down - Rewards: throughput - cost """ def __init__( self, n_states: int, n_actions: int, transition_probs: np.ndarray, # (n_states, n_actions, n_states) rewards: np.ndarray, # (n_states, n_actions) gamma: float = 0.95 ): self.n_states = n_states self.n_actions = n_actions self.P = transition_probs self.R = rewards self.gamma = gamma def value_iteration( self, epsilon: float = 1e-6, max_iters: int = 1000 ) -> Tuple[np.ndarray, np.ndarray, int]: """ Run value iteration until convergence. Returns: (V_star, pi_star, n_iterations) V_star: optimal value function (n_states,) pi_star: optimal policy (n_states,) - action index for each state """ V = np.zeros(self.n_states) for iteration in range(max_iters): V_new = np.zeros(self.n_states) for s in range(self.n_states): # Q(s, a) = R(s, a) + gamma * sum_{s'} P(s'|s,a) * V(s') Q_values = self.R[s] + self.gamma * np.dot(self.P[s], V) V_new[s] = np.max(Q_values) # Check convergence delta = np.max(np.abs(V_new - V)) V = V_new if delta < epsilon: break # Extract optimal policy pi_star = np.zeros(self.n_states, dtype=np.int32) for s in range(self.n_states): Q_values = self.R[s] + self.gamma * np.dot(self.P[s], V) pi_star[s] = np.argmax(Q_values) return V, pi_star, iteration + 1 def policy_iteration( self, max_iters: int = 100 ) -> Tuple[np.ndarray, np.ndarray, int]: """ Policy iteration: alternates between policy evaluation and improvement. Often converges faster than value iteration in practice. """ # Initialize with arbitrary policy (all action 0) pi = np.zeros(self.n_states, dtype=np.int32) for iteration in range(max_iters): # Policy evaluation: compute V^pi (solve linear system) # V^pi(s) = R(s, pi(s)) + gamma * sum_{s'} P(s'|s,pi(s)) * V^pi(s') # Rearranged: (I - gamma * P^pi) * V = R^pi R_pi = np.array([self.R[s, pi[s]] for s in range(self.n_states)]) P_pi = np.array([self.P[s, pi[s]] for s in range(self.n_states)]) A = np.eye(self.n_states) - self.gamma * P_pi V = np.linalg.solve(A, R_pi) # Policy improvement pi_new = np.zeros(self.n_states, dtype=np.int32) for s in range(self.n_states): Q_values = self.R[s] + self.gamma * np.dot(self.P[s], V) pi_new[s] = np.argmax(Q_values) if np.all(pi_new == pi): break # Policy converged pi = pi_new return V, pi, iteration + 1 # Demo: simple grid world # 4-state MDP: [IDLE, LOW_LOAD, HIGH_LOAD, OVERLOADED] # Actions: [SCALE_DOWN=0, MAINTAIN=1, SCALE_UP=2] n_states, n_actions = 4, 3 states = ["IDLE", "LOW", "HIGH", "OVERLOADED"] actions = ["scale_down", "maintain", "scale_up"] # Transition probabilities P[s, a, s'] (must sum to 1 over s') P = np.zeros((n_states, n_actions, n_states)) # From IDLE: P[0, 0] = [0.9, 0.1, 0.0, 0.0] # scale_down stays idle mostly P[0, 1] = [0.7, 0.2, 0.1, 0.0] # maintain, some chance of load increase P[0, 2] = [0.8, 0.2, 0.0, 0.0] # scale_up wasteful but stays idle # From LOW_LOAD: P[1, 0] = [0.3, 0.6, 0.1, 0.0] # scale_down can become idle or stay low P[1, 1] = [0.1, 0.5, 0.3, 0.1] # maintain, can go high P[1, 2] = [0.1, 0.7, 0.2, 0.0] # scale_up, helps prevent overload # From HIGH_LOAD: P[2, 0] = [0.0, 0.1, 0.4, 0.5] # scale_down risky P[2, 1] = [0.0, 0.1, 0.6, 0.3] # maintain, some overload risk P[2, 2] = [0.0, 0.2, 0.7, 0.1] # scale_up prevents overload # From OVERLOADED: P[3, 0] = [0.0, 0.0, 0.2, 0.8] # scale_down makes it worse P[3, 1] = [0.0, 0.0, 0.3, 0.7] # maintain, still bad P[3, 2] = [0.0, 0.1, 0.6, 0.3] # scale_up helps # Rewards R[s, a]: throughput - cost # Overloaded is very bad (-10), scaling up costs more R = np.array([ # scale_down maintain scale_up [ 1.0, 0.5, -0.5], # IDLE [ 2.0, 3.0, 2.0], # LOW_LOAD [ 1.0, 2.0, 3.0], # HIGH_LOAD [ -5.0, -8.0, -3.0], # OVERLOADED ]) mdp = MDPValueIteration(n_states, n_actions, P, R, gamma=0.95) V_star, pi_star, n_iters = mdp.value_iteration(epsilon=1e-8) print(f"Value iteration converged in {n_iters} iterations") print("\nOptimal Value Function V*(s):") for s, v in zip(states, V_star): print(f" {s:15s}: {v:.3f}") print("\nOptimal Policy pi*(s):") for s, a in zip(states, pi_star): print(f" {s:15s}: {actions[a]}") ``` ### Beam Search as Approximate DP True sequence generation DP finds the optimal sequence by maintaining the full probability table over all partial sequences. With vocabulary size $V$ and sequence length $T$, this requires $O(V^T)$ space - completely infeasible. Beam search is approximate DP: at each step, keep only the $k$ highest-probability partial sequences (the "beam"). This limits space to $O(k \cdot T \cdot V)$ and makes the algorithm tractable, at the cost of potentially missing the globally optimal sequence. The relationship to exact DP: beam search with $k = V^T$ is exact DP. Beam search with $k = 1$ is greedy decoding. Practical beam search uses $k = 4$ to $k = 50$ and achieves near-optimal quality for most tasks. ```python import heapq import math import numpy as np from dataclasses import dataclass, field from typing import List, Callable, Tuple @dataclass(order=True) class Hypothesis: """A beam hypothesis: a partial sequence with its score.""" neg_score: float # negative because heapq is a min-heap token_ids: List[int] = field(compare=False) @property def score(self) -> float: return -self.neg_score @property def last_token(self) -> int: return self.token_ids[-1] if self.token_ids else -1 def beam_search( model_fn: Callable[[List[int]], np.ndarray], # returns log-probs over vocab beam_width: int = 4, max_length: int = 50, vocab_size: int = 32000, eos_token_id: int = 2, bos_token_id: int = 1, length_penalty: float = 0.6, # Google's length penalty for neural MT ) -> List[Hypothesis]: """ Beam search decoder. Complexity: O(T * k * V * log k) where: T = decoding steps, k = beam width, V = vocab size The length penalty prevents short sequences from dominating: score(hyp) = log_prob / ((5 + len) / 6)^alpha """ # Initialize with start token initial_hyp = Hypothesis(neg_score=0.0, token_ids=[bos_token_id]) beam = [initial_hyp] completed = [] for step in range(max_length): if not beam: break # Collect all new candidates from expanding the beam # Use a heap to keep only top-k candidates candidates = [] for hyp in beam: if hyp.last_token == eos_token_id: completed.append(hyp) continue # Get model predictions for this partial sequence log_probs = model_fn(hyp.token_ids) # (vocab_size,) # Only consider top-beam_width candidates per beam hypothesis # This bounds the candidates to k^2 per step (not k*V) top_k = np.argpartition(log_probs, -beam_width * 2)[-beam_width * 2:] for token_id in top_k: new_score = hyp.score + float(log_probs[token_id]) new_ids = hyp.token_ids + [int(token_id)] # Apply length penalty (encourages longer sequences) penalized_score = new_score / ((5 + len(new_ids)) / 6) ** length_penalty new_hyp = Hypothesis( neg_score=-penalized_score, token_ids=new_ids ) candidates.append(new_hyp) # Keep top beam_width candidates if not candidates: break beam = heapq.nsmallest(beam_width, candidates) # Add remaining beam hypotheses to completed completed.extend(beam) # Sort completed hypotheses by score (descending) completed.sort(key=lambda h: h.score, reverse=True) return completed # Demo with mock model np.random.seed(42) vocab_size = 100 def mock_lm(token_ids: List[int]) -> np.ndarray: """Deterministic mock language model.""" rng = np.random.default_rng(sum(token_ids[:5])) logits = rng.standard_normal(vocab_size) # Slight tendency to end sequence (EOS=2) as length grows logits[2] += 0.1 * len(token_ids) log_probs = logits - np.log(np.sum(np.exp(logits - logits.max()))) - logits.max() return log_probs.astype(np.float32) results = beam_search( model_fn=mock_lm, beam_width=4, max_length=15, vocab_size=vocab_size, eos_token_id=2, bos_token_id=1 ) print("Beam search results (top 3 hypotheses):") for i, hyp in enumerate(results[:3]): print(f" [{i+1}] score={hyp.score:.3f}, length={len(hyp.token_ids)}, " f"tokens={hyp.token_ids[:8]}...") ``` ### Knapsack Problem in ML Resource Allocation The 0/1 knapsack problem appears in ML serving: given a set of model variants (or LoRA adapters, or quantization configs) with different latency costs and accuracy values, which subset should be deployed given a total latency budget? Let items have weights $w_i$ (latency) and values $v_i$ (accuracy gain), with total capacity $W$ (latency budget). Find the subset with maximum total value within the capacity constraint. DP: $\text{dp}[i][c]$ = maximum value using first $i$ items with capacity $c$. $$\text{dp}[i][c] = \max(\text{dp}[i-1][c], v_i + \text{dp}[i-1][c - w_i])$$ ```python def knapsack_01(weights: List[int], values: List[float], capacity: int): """ 0/1 Knapsack: which model variants to serve within a latency budget. O(n * W) time, O(n * W) space. Example application: weights = [latency_ms_of_each_model_variant] values = [accuracy_of_each_model_variant] capacity = 100 # 100ms total latency budget """ n = len(weights) dp = [[0.0] * (capacity + 1) for _ in range(n + 1)] for i in range(1, n + 1): w_i, v_i = weights[i-1], values[i-1] for c in range(capacity + 1): # Don't take item i dp[i][c] = dp[i-1][c] # Take item i if it fits if c >= w_i: dp[i][c] = max(dp[i][c], v_i + dp[i-1][c - w_i]) # Traceback which items to include selected = [] c = capacity for i in range(n, 0, -1): if dp[i][c] != dp[i-1][c]: selected.append(i-1) # item i-1 was selected c -= weights[i-1] selected.reverse() return dp[n][capacity], selected # Demo: selecting model variants within latency budget models = ["tiny-7b", "base-13b", "large-30b", "xl-70b", "retrieval-reranker"] latencies_ms = [20, 40, 80, 150, 30] accuracy_gains = [0.60, 0.75, 0.85, 0.92, 0.10] # relative accuracies budget_ms = 100 # 100ms end-to-end latency budget best_value, selected_idx = knapsack_01(latencies_ms, accuracy_gains, budget_ms) print(f"Optimal model selection within {budget_ms}ms budget:") total_latency = 0 for idx in selected_idx: print(f" {models[idx]}: {latencies_ms[idx]}ms, accuracy={accuracy_gains[idx]:.2f}") total_latency += latencies_ms[idx] print(f" Total latency: {total_latency}ms, Total accuracy: {best_value:.2f}") ``` --- ## Mermaid: DP Pattern Recognition Guide ```mermaid flowchart TD Problem["New Problem"]:::blue Q1{"Sequence of<br/>decisions<br/>over time?"}:::purple Q2{"Find optimal<br/>alignment or<br/>edit sequence?"}:::purple Q3{"Maximize reward<br/>over future<br/>states?"}:::purple Q4{"Best path<br/>through labeled<br/>sequence space?"}:::purple RL["Bellman Equation<br/>Value Iteration<br/>Policy Gradient"]:::teal Align["Edit Distance<br/>CTC Forward<br/>DTW"]:::green Decode["Viterbi<br/>CRF Decoding<br/>HMM Decoding"]:::green Gen["Beam Search<br/>Top-k Sampling<br/>Greedy Decode"]:::orange Problem --> Q1 Problem --> Q2 Problem --> Q3 Problem --> Q4 Q1 -->|"yes"| Gen Q2 -->|"yes"| Align Q3 -->|"yes"| RL Q4 -->|"yes"| Decode classDef blue fill:#dbeafe,color:#1e293b,stroke:#2563eb classDef purple fill:#ede9fe,color:#4c1d95,stroke:#7c3aed classDef teal fill:#ccfbf1,color:#134e4a,stroke:#14b8a6 classDef green fill:#dcfce7,color:#14532d,stroke:#16a34a classDef orange fill:#ffedd5,color:#7c2d12,stroke:#ea580c ``` --- ## Mermaid: Viterbi Forward Pass (3-State HMM) ```mermaid flowchart LR T0["Time t=0"]:::blue T1["Time t=1"]:::blue T2["Time t=2"]:::blue S0A["State A<br/>prob=0.6"]:::green S0B["State B<br/>prob=0.3"]:::teal S0C["State C<br/>prob=0.1"]:::orange S1A["State A<br/>dp=0.4"]:::green S1B["State B<br/>dp=0.5"]:::teal S1C["State C<br/>dp=0.1"]:::orange S2A["State A<br/>dp=0.2"]:::green S2B["State B<br/>dp=0.7 BEST"]:::teal S2C["State C<br/>dp=0.1"]:::orange T0 --> S0A & S0B & S0C T1 --> S1A & S1B & S1C T2 --> S2A & S2B & S2C S0A -->|"trans * emit"| S1A S0B -->|"trans * emit"| S1A S0A -->|"trans * emit"| S1B S0B -->|"trans * emit"| S1B S1A -->|"trans * emit"| S2B S1B -->|"trans * emit"| S2B classDef blue fill:#dbeafe,color:#1e293b,stroke:#2563eb classDef green fill:#dcfce7,color:#14532d,stroke:#16a34a classDef teal fill:#ccfbf1,color:#134e4a,stroke:#14b8a6 classDef orange fill:#ffedd5,color:#7c2d12,stroke:#ea580c ``` --- ## Production Engineering Notes **1. Log-space arithmetic is mandatory for DP over probabilities.** When multiplying many probabilities together (as in Viterbi or CTC), the product underflows to zero even in float32 for sequences longer than ~100 tokens ($0.9^{100} \approx 2.7 \times 10^{-5}$, but at the product of many small probabilities this becomes zero). Always work in log-space: replace multiplication with addition ($\log(ab) = \log a + \log b$) and use `logsumexp` for sums of probabilities ($\log(\sum e^{a_i})$ computed stably). **2. Beam search is not guaranteed optimal - and that is fine.** Beam search finds a high-quality sequence but not necessarily the highest-probability sequence. For MT and summarization, this is acceptable and even sometimes beneficial (the global optimum under a language model may not be the human-preferred output). For structured prediction tasks where correctness matters (code generation, formal verification), consider breadth-first search or exhaustive search for short outputs. **3. CTC greedy decode vs beam decode.** CTC greedy decoding (argmax at each frame, collapse repeats, remove blanks) is $O(T)$ but misses valid alternatives. CTC beam search decoding marginalizes over all valid blank-label interleavings and finds a better transcription at the cost of $O(T \cdot k \cdot V)$. The Whisper speech model uses CTC-style greedy decoding for speed but beam search for accuracy. Know your latency-accuracy tradeoff. **4. Value iteration speed.** Tabular value iteration over small state spaces ($< 10^5$ states) is fast with vectorized NumPy. For large state spaces, use approximation methods: neural value functions (DQN), actor-critic methods, or sparse DP on only the reachable states. Value iteration requires the full transition matrix in memory - $O(S^2 A)$ space which is infeasible for $S > 10^4$ states without sparsity exploitation. **5. Edit distance optimization for long strings.** The standard $O(nm)$ edit distance is expensive for long strings. Optimizations: (1) only compute a diagonal band of width $2d+1$ if you know the edit distance is at most $d$: $O(nd)$ time; (2) use Ukkonen's algorithm for $O(\min(n, m) \cdot d)$ with early termination; (3) for approximate matching with threshold $\theta$, use BK-trees to find all corpus strings within distance $\theta$ in $O(\log N)$ instead of $O(N)$. --- :::danger Using DP When Subproblems Are Not Independent DP requires that subproblem solutions are reusable across the full problem. If subproblems have side effects or their optimal solution depends on context outside the DP table, the algorithm gives wrong answers. Example: in CTC decoding, the blank token has specific interaction rules - a greedy decoder that ignores these rules will silently produce wrong outputs. Always verify that your DP subproblem definition captures all relevant state. If you find you need to pass extra context into the DP table, that context must become part of the state dimensions. ::: :::warning Beam Search Width Is a Hyperparameter That Affects Quality, Not Just Speed Reducing beam width from 5 to 1 (greedy) reduces memory and compute but also reduces output quality - sometimes dramatically. For tasks where beam search finds meaningfully better outputs (translation, complex code generation), tuning beam width is part of model evaluation. For tasks where outputs are robust to greedy decoding (simple summarization, most chatbot responses), beam width matters less. Always measure quality vs beam width for your specific task before deploying with a fixed beam width. ::: :::tip Memoization First, Tabulation for Production When prototyping a new DP-based ML component, start with memoization (add `@lru_cache` to your recursive function). It is faster to implement and easier to verify. Once the algorithm is correct, convert to tabulation for the production version: iterative tabulation avoids Python recursion limits, has better cache locality, and is easier to vectorize with NumPy. The transition from memoized recursion to tabulation is mechanical: identify the base cases, determine the iteration order (usually small indices first), and replace recursive calls with table lookups. ::: --- ## Interview Q&A **Q1: What are the two conditions required for DP, and give an ML example that satisfies both?** The two conditions are: (1) Optimal substructure - the optimal solution to the problem contains optimal solutions to subproblems; (2) Overlapping subproblems - the same subproblems are solved multiple times in the naive recursive approach. ML example: Viterbi decoding for NER. (1) Optimal substructure: the most probable label sequence ending at position $t$ in state $k$ consists of the most probable path to position $t-1$ in some state $k'$, plus the transition $k' \to k$ and emission of observation $t$. The optimal full path contains the optimal prefix. (2) Overlapping subproblems: for a sequence of length $T$ with $K$ states, the naive approach would enumerate $K^T$ paths. Each state-at-time pair $(t, k)$ is visited by $K^{t-1}$ paths - all paths that reach state $k$ at time $t$. DP computes the best path to each $(t, k)$ once, reducing $O(K^T)$ to $O(TK^2)$. **Q2: How does the Viterbi algorithm differ from the forward algorithm in HMMs?** Both use the same DP recurrence structure but differ in the aggregation operation. The forward algorithm computes $\alpha(t, k) = P(\text{observations}_1..t \text{ and state}_t = k)$ by summing over all paths: $\alpha(t, k) = P(o_t | k) \sum_{k'} P(k | k') \alpha(t-1, k')$. The total likelihood is $\sum_k \alpha(T, k)$. Used for training (E-step in Baum-Welch, forward-backward algorithm). The Viterbi algorithm computes $\text{dp}(t, k) = \max_\text{paths} P(\text{observations}_1..t \text{ and path ends at state}_k \text{ at time } t)$ by taking the max: $\text{dp}(t, k) = P(o_t | k) \max_{k'} P(k | k') \text{dp}(t-1, k')$. The best total path is $\max_k \text{dp}(T, k)$ with traceback. Used for inference/decoding. Both are $O(TK^2)$ but Viterbi requires storing the backpointer table (which previous state was optimal), while forward-backward stores both forward and backward variables for computing posteriors. **Q3: What is CTC loss and how does the DP make it trainable?** CTC (Connectionist Temporal Classification) trains sequence-to-sequence models where the alignment between input frames and output tokens is unknown. The DP makes it trainable by computing the probability of the target transcript as the sum over all valid frame-level alignments (including all possible insertions of blank tokens and repetitions). The CTC forward variable $\alpha(t, s)$ = probability of emitting the first $s$ extended-label symbols after processing $t$ frames. The recurrence is $O(T \cdot S)$ where $S = 2|\text{target}| + 1$ (extended with blanks). The total probability is $\alpha(T, S-1) + \alpha(T, S-2)$. The backward variable $\beta(t, s)$ is computed symmetrically. Together, they give $P(\hat{l}_{t,s}) = \alpha(t,s) \cdot \beta(t,s) / P(\text{target})$, the gradient signal for training. Without this DP, training would require knowing the alignment - which is exactly the information we do not have in speech recognition (we know "cat" was said, but not which frames correspond to which phoneme). **Q4: How does the Bellman equation lead to value iteration, and when does value iteration converge?** The Bellman optimality equation is a fixed-point condition: $V^* = \mathcal{T} V^*$ where $\mathcal{T}$ is the Bellman operator $(\mathcal{T}V)(s) = \max_a [R(s,a) + \gamma \sum_{s'} P(s'|s,a) V(s')]$. Value iteration applies $\mathcal{T}$ repeatedly starting from any initial $V_0$. Convergence is guaranteed because $\mathcal{T}$ is a $\gamma$-contraction in the sup-norm: $\|\mathcal{T}V - \mathcal{T}V'\|_\infty \leq \gamma \|V - V'\|_\infty$. Since $\gamma < 1$, repeated application converges geometrically to the unique fixed point $V^*$. After $k$ iterations: $\|V_k - V^*\|_\infty \leq \gamma^k \|V_0 - V^*\|_\infty$. For $\gamma = 0.99$ and initial error 10, you need $k = \log(10/\epsilon) / \log(1/0.99) \approx 100/\epsilon$ iterations to reach precision $\epsilon$. Policy iteration often converges faster in practice because each policy improvement step is more informative than a single Bellman backup. But value iteration is simpler to implement and sufficient for small MDPs. **Q5: Why is beam search not strict dynamic programming and what does it miss?** Strict DP over sequence generation would maintain the exact probability of every partial sequence. Beam search prunes all but the top-$k$ partial sequences at each step - permanently discarding lower-probability prefixes even if they could lead to the globally optimal complete sequence. Concretely: suppose the globally optimal sequence starts with the 5th most likely token at step 1. With beam width $k=4$, that token is pruned after step 1 and can never be recovered. Beam search is guaranteed to find the optimum only if the optimal sequence is never pruned, which cannot be guaranteed in general. The alternative is exact search (exponential in sequence length) or optimal search with pruning bounds (A* decoding). In practice, beam search finds sequences that are qualitatively indistinguishable from the optimum for most generation tasks, and the approximation error is acceptable. Diversity beam search (maintaining beams that are dissimilar to each other) helps when you need multiple distinct good outputs. **Q6: How would you use dynamic time warping to compare training loss curves from different runs?** Training loss curves often converge at different rates across runs with different hyperparameters or hardware configurations. Point-to-point comparison (Euclidean distance) penalizes runs that converge faster - two identical runs where one just progresses faster would appear dissimilar. DTW finds the optimal time alignment: it can match the "rapid initial descent" in the fast-converging run with the correspondingly shaped region in the slow-converging run, regardless of when it occurs. DTW distance $= 0$ for identical curves with different time scales. Implementation: apply the DTW DP with a Sakoe-Chiba band constraint (window parameter) to prevent degenerate alignments (matching the entire fast run to a single point in the slow run). The window should be about 10-20% of the sequence length. Practical use: cluster training runs by DTW similarity of their loss curves to find hyperparameter regions with similar learning dynamics. Use DTW distance as a feature for anomaly detection in training monitoring systems - a run with unusually high DTW distance from all similar runs may have a data loading bug or hardware issue.
© 2026 EngineersOfAI. All rights reserved.