Skip to main content

Python Build a Trading Signal System: Practice Problems & Exercises

Practice: Build a Trading Signal System

11 problems2 Easy4 Medium5 Hard90–120 min
← Back to lesson

Easy

#1OHLCV Candle ValidatorEasy
data-modelingvalidationohlcv

Build an OHLCVCandle dataclass with a validate() method that enforces OHLCV (Open, High, Low, Close, Volume) price consistency rules.

from dataclasses import dataclass
from typing import List, Tuple

@dataclass
class OHLCVCandle:
open: float
high: float
low: float
close: float
volume: float
timestamp: str = ""

def validate(self) -> Tuple[bool, List[str]]:
errors = []
if self.high < max(self.open, self.close, self.low):
errors.append("high must be >= open, close, low")
if self.low > min(self.open, self.close, self.high):
errors.append("low must be <= open, close, high")
if self.volume < 0:
errors.append("volume must be non-negative")
for field_name, val in [("open", self.open), ("high", self.high),
("low", self.low), ("close", self.close)]:
if val <= 0:
errors.append(f"{field_name} must be positive")
return len(errors) == 0, errors

# Test
valid = OHLCVCandle(open=100.0, high=105.0, low=98.0, close=103.0, volume=1_000_000)
ok, errs = valid.validate()
print(f"Valid: {ok}")

bad1 = OHLCVCandle(open=100.0, high=95.0, low=98.0, close=103.0, volume=1_000_000)
ok, errs = bad1.validate()
print(f"Invalid reason: {'; '.join(errs)}")

bad2 = OHLCVCandle(open=100.0, high=105.0, low=98.0, close=103.0, volume=-500)
ok, errs = bad2.validate()
print(f"Invalid reason: {'; '.join(errs)}")
Solution
from dataclasses import dataclass
from typing import List, Tuple

@dataclass
class OHLCVCandle:
open: float
high: float
low: float
close: float
volume: float
timestamp: str = ""

def validate(self) -> Tuple[bool, List[str]]:
errors = []
if self.high < max(self.open, self.close, self.low):
errors.append("high must be >= open, close, low")
if self.low > min(self.open, self.close, self.high):
errors.append("low must be <= open, close, high")
if self.volume < 0:
errors.append("volume must be non-negative")
for field_name, val in [("open", self.open), ("high", self.high),
("low", self.low), ("close", self.close)]:
if val <= 0:
errors.append(f"{field_name} must be positive")
return len(errors) == 0, errors

valid = OHLCVCandle(open=100.0, high=105.0, low=98.0, close=103.0, volume=1_000_000)
ok, errs = valid.validate()
print(f"Valid: {ok}")

bad1 = OHLCVCandle(open=100.0, high=95.0, low=98.0, close=103.0, volume=1_000_000)
ok, errs = bad1.validate()
print(f"Invalid reason: {'; '.join(errs)}")

bad2 = OHLCVCandle(open=100.0, high=105.0, low=98.0, close=103.0, volume=-500)
ok, errs = bad2.validate()
print(f"Invalid reason: {'; '.join(errs)}")

OHLCV in trading systems: Every candlestick chart you have ever seen is built from OHLCV data. Validation at ingestion time prevents corrupted candles from propagating into signal calculations, causing false buy/sell signals. Bad data -> bad signals -> bad trades -> very real financial losses.

Expected Output
Valid: True\nInvalid reason: high must be >= open, close, low; low must be <= open, close, high\nInvalid reason: volume must be non-negative
Hints

Hint 1: OHLCV constraints: high >= max(open, close, low), low <= min(open, close, high), volume >= 0, all prices > 0.

Hint 2: Return a list of violation strings rather than a single boolean — it makes debugging much easier.

#2Simple Moving AverageEasy
moving-averagesmaindicators

Implement simple_moving_average(prices, window) that returns a list of SMA values, with None for positions where insufficient data exists.

from typing import List, Optional

def simple_moving_average(prices: List[float], window: int) -> List[Optional[float]]:
result = []
for i in range(len(prices)):
if i < window - 1:
result.append(None)
else:
result.append(sum(prices[i - window + 1 : i + 1]) / window)
return result

prices = [10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0]

# Only use first 7 for cleaner output
p7 = prices[:7]
print(f"SMA(3): {simple_moving_average(p7, 3)}")
print(f"SMA(5): {simple_moving_average(p7, 5)}")
Solution
from typing import List, Optional

def simple_moving_average(prices: List[float], window: int) -> List[Optional[float]]:
result = []
for i in range(len(prices)):
if i < window - 1:
result.append(None)
else:
result.append(sum(prices[i - window + 1 : i + 1]) / window)
return result

prices = [10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0]
print(f"SMA(3): {simple_moving_average(prices, 3)}")
print(f"SMA(5): {simple_moving_average(prices, 5)}")

Why SMA matters: SMA is the foundation for almost every trend-following trading strategy. The "golden cross" (50-day SMA crosses above 200-day SMA) is one of the most-watched signals in equity markets. In production, you would use pandas.Series.rolling(window).mean() for performance, but implementing it from scratch builds intuition for windowed calculations.

Expected Output
SMA(3): [None, None, 12.0, 13.0, 14.0, 15.0, 16.0]\nSMA(5): [None, None, None, None, 13.0, 14.0, 15.0]
Hints

Hint 1: Return None for positions where you do not have enough data yet (i.e., index < window - 1). For valid positions, sum the window slice and divide by window.

Hint 2: Use a list comprehension: for each index i, return None if i < window - 1, else sum(prices[i-window+1 : i+1]) / window.


Medium

#3Exponential Moving AverageMedium
emaindicatorssmoothing

Implement exponential_moving_average(prices, window) using the standard EMA multiplier formula, seeded with the first SMA.

from typing import List, Optional

def exponential_moving_average(prices: List[float], window: int) -> List[Optional[float]]:
multiplier = 2.0 / (window + 1)
result: List[Optional[float]] = [None] * len(prices)

if len(prices) < window:
return result

seed = sum(prices[:window]) / window
result[window - 1] = seed

for i in range(window, len(prices)):
prev_ema = result[i - 1]
result[i] = prices[i] * multiplier + prev_ema * (1 - multiplier)

return result

prices = [10.0, 11.0, 15.0, 12.0, 13.0, 14.0, 15.0]
print(f"EMA(3): {[round(v, 4) if v is not None else None for v in exponential_moving_average(prices, 3)]}")
Solution
from typing import List, Optional

def exponential_moving_average(prices: List[float], window: int) -> List[Optional[float]]:
multiplier = 2.0 / (window + 1)
result: List[Optional[float]] = [None] * len(prices)

if len(prices) < window:
return result

seed = sum(prices[:window]) / window
result[window - 1] = seed

for i in range(window, len(prices)):
prev_ema = result[i - 1]
result[i] = prices[i] * multiplier + prev_ema * (1 - multiplier)

return result

prices = [10.0, 11.0, 15.0, 12.0, 13.0, 14.0, 15.0]
ema = exponential_moving_average(prices, 3)
print(f"EMA(3): {[round(v, 4) if v is not None else None for v in ema]}")

EMA vs SMA: The EMA reacts faster to recent price moves because recent prices have exponentially higher weight. For a 10-period EMA, the most recent price contributes 18.2% (multiplier = 2/11), while for a 10-period SMA it contributes only 10%. MACD (Moving Average Convergence/Divergence) — one of the most popular indicators — is built entirely from two EMAs and their difference.

from typing import List, Optional

def exponential_moving_average(
    prices: List[float],
    window: int,
) -> List[Optional[float]]:
    """Compute EMA using the standard multiplier formula.
    
    multiplier = 2 / (window + 1)
    EMA[i] = price[i] * multiplier + EMA[i-1] * (1 - multiplier)
    
    Seed: first EMA value = SMA of first 'window' prices.
    Return None for positions before the seed.
    """
    pass
Expected Output
EMA(3): [None, None, 12.0, 12.5, 13.25, 14.125, 15.0625]
Hints

Hint 1: Seed the EMA at index (window - 1) with the SMA of the first `window` prices. Then apply the multiplier formula for all subsequent prices.

Hint 2: multiplier = 2 / (window + 1). EMA reacts faster to recent prices than SMA because recent prices have exponentially higher weight.

#4Moving Average Crossover SignalMedium
crossoversignalsbuy-sellstrategy

Implement a moving average crossover signal generator that produces BUY/SELL/HOLD signals based on fast vs slow MA crossovers.

from typing import List, Optional, Tuple

def sma(prices: List[float], window: int) -> List[Optional[float]]:
result = []
for i in range(len(prices)):
if i < window - 1:
result.append(None)
else:
result.append(sum(prices[i - window + 1 : i + 1]) / window)
return result

class CrossoverSignal:
def __init__(self, fast_window: int, slow_window: int):
self.fast_window = fast_window
self.slow_window = slow_window

def generate(self, prices: List[float]) -> List[Tuple[float, str]]:
fast_ma = sma(prices, self.fast_window)
slow_ma = sma(prices, self.slow_window)
results = []

prev_diff = None
for i, price in enumerate(prices):
f = fast_ma[i]
s = slow_ma[i]
if f is None or s is None:
results.append((price, "HOLD"))
continue
diff = f - s
signal = "HOLD"
if prev_diff is not None:
if prev_diff < 0 and diff >= 0:
signal = "BUY"
elif prev_diff > 0 and diff <= 0:
signal = "SELL"
prev_diff = diff
results.append((price, signal))
return results

# Test with prices that produce a crossover
prices = [100, 99, 98, 97, 96, 98, 101, 103, 108, 107, 104, 100]
cs = CrossoverSignal(fast_window=3, slow_window=5)
signals = cs.generate(prices)

for price, signal in signals:
if signal != "HOLD":
print(f"Price={price:<8} Signal={signal}")
else:
# Show last few HOLDs for context
pass

for price, signal in signals[-4:]:
print(f"Price={price:<8} Signal={signal}")
Solution
from typing import List, Optional, Tuple

def sma(prices: List[float], window: int) -> List[Optional[float]]:
result = []
for i in range(len(prices)):
if i < window - 1:
result.append(None)
else:
result.append(sum(prices[i - window + 1 : i + 1]) / window)
return result

class CrossoverSignal:
def __init__(self, fast_window: int, slow_window: int):
self.fast_window = fast_window
self.slow_window = slow_window

def generate(self, prices: List[float]) -> List[Tuple[float, str]]:
fast_ma = sma(prices, self.fast_window)
slow_ma = sma(prices, self.slow_window)
results = []
prev_diff = None
for i, price in enumerate(prices):
f, s = fast_ma[i], slow_ma[i]
if f is None or s is None:
results.append((price, "HOLD"))
continue
diff = f - s
signal = "HOLD"
if prev_diff is not None:
if prev_diff < 0 and diff >= 0:
signal = "BUY"
elif prev_diff > 0 and diff <= 0:
signal = "SELL"
prev_diff = diff
results.append((price, signal))
return results

prices = [100, 99, 98, 97, 96, 98, 101, 103, 108, 107, 104, 100]
cs = CrossoverSignal(fast_window=3, slow_window=5)
for price, signal in cs.generate(prices):
print(f"Price={price:<8} Signal={signal}")

Signal lag: MA crossover signals are inherently lagging — they confirm a trend after it has already started. This is the fundamental tradeoff in technical analysis: lagging indicators (MAs) are less noisy but enter/exit later; leading indicators (RSI, oscillators) trigger earlier but produce more false signals.

from typing import List, Optional, Tuple

class CrossoverSignal:
    """Generate BUY/SELL/HOLD signals from a moving average crossover.
    
    BUY  when fast MA crosses ABOVE slow MA (golden cross)
    SELL when fast MA crosses BELOW slow MA (death cross)
    HOLD otherwise
    
    Returns list of (price, signal) tuples.
    """
    def __init__(self, fast_window: int, slow_window: int):
        pass

    def generate(self, prices: List[float]) -> List[Tuple[float, str]]:
        pass
Expected Output
Price=103.0  Signal=HOLD\nPrice=108.0  Signal=BUY\nPrice=107.0  Signal=HOLD\nPrice=100.0  Signal=SELL
Hints

Hint 1: Compute both MAs first (you can reuse SMA from the earlier problem). Then iterate through positions where both MAs are valid.

Hint 2: A crossover is detected by comparing the sign of (fast - slow) at the current bar vs the previous bar. If the sign changes from negative to positive, it is a BUY.

#5Position and P&L TrackerMedium
portfoliopnlposition-tracking

Build a PositionTracker that tracks open positions with average cost basis and computes both realized and unrealized P&L.

from dataclasses import dataclass, field
from typing import Dict, Tuple

@dataclass
class Trade:
symbol: str
side: str
quantity: float
price: float

class PositionTracker:
def __init__(self):
self._positions: Dict[str, Tuple[float, float]] = {}
self._realized: Dict[str, float] = {}

def execute(self, trade: Trade) -> None:
qty, avg_cost = self._positions.get(trade.symbol, (0.0, 0.0))
realized = self._realized.get(trade.symbol, 0.0)

if trade.side == "BUY":
new_qty = qty + trade.quantity
new_avg = (qty * avg_cost + trade.quantity * trade.price) / new_qty
self._positions[trade.symbol] = (new_qty, new_avg)

elif trade.side == "SELL":
sell_qty = min(trade.quantity, qty)
realized += (trade.price - avg_cost) * sell_qty
new_qty = qty - sell_qty
self._realized[trade.symbol] = realized
if new_qty > 0:
self._positions[trade.symbol] = (new_qty, avg_cost)
else:
del self._positions[trade.symbol]

def unrealized_pnl(self, symbol: str, current_price: float) -> float:
qty, avg_cost = self._positions.get(symbol, (0.0, 0.0))
return (current_price - avg_cost) * qty

def realized_pnl(self, symbol: str) -> float:
return self._realized.get(symbol, 0.0)

def summary(self) -> None:
for symbol, (qty, avg) in self._positions.items():
print(f"Position: {qty:.0f} @ avg {avg:.2f}")

# Test
tracker = PositionTracker()
tracker.execute(Trade("AAPL", "BUY", 10, 100.0))
tracker.summary()
print(f"Unrealized P&L: {tracker.unrealized_pnl('AAPL', 105.0):.2f}")

tracker.execute(Trade("AAPL", "SELL", 5, 105.0))
print(f"Realized P&L: {tracker.realized_pnl('AAPL'):.2f}")
tracker.summary()
Solution
from dataclasses import dataclass
from typing import Dict, Tuple

@dataclass
class Trade:
symbol: str
side: str
quantity: float
price: float

class PositionTracker:
def __init__(self):
self._positions: Dict[str, Tuple[float, float]] = {}
self._realized: Dict[str, float] = {}

def execute(self, trade: Trade) -> None:
qty, avg_cost = self._positions.get(trade.symbol, (0.0, 0.0))
if trade.side == "BUY":
new_qty = qty + trade.quantity
new_avg = (qty * avg_cost + trade.quantity * trade.price) / new_qty
self._positions[trade.symbol] = (new_qty, new_avg)
elif trade.side == "SELL":
sell_qty = min(trade.quantity, qty)
pnl = (trade.price - avg_cost) * sell_qty
self._realized[trade.symbol] = self._realized.get(trade.symbol, 0.0) + pnl
new_qty = qty - sell_qty
if new_qty > 0:
self._positions[trade.symbol] = (new_qty, avg_cost)
else:
del self._positions[trade.symbol]

def unrealized_pnl(self, symbol: str, current_price: float) -> float:
qty, avg_cost = self._positions.get(symbol, (0.0, 0.0))
return (current_price - avg_cost) * qty

def realized_pnl(self, symbol: str) -> float:
return self._realized.get(symbol, 0.0)

def summary(self) -> None:
for symbol, (qty, avg) in self._positions.items():
print(f"Position: {qty:.0f} @ avg {avg:.2f}")

tracker = PositionTracker()
tracker.execute(Trade("AAPL", "BUY", 10, 100.0))
tracker.summary()
print(f"Unrealized P&L: {tracker.unrealized_pnl('AAPL', 105.0):.2f}")
tracker.execute(Trade("AAPL", "SELL", 5, 105.0))
print(f"Realized P&L: {tracker.realized_pnl('AAPL'):.2f}")
tracker.summary()

Realized vs unrealized: Realized P&L is locked in once you close (sell) a position. Unrealized P&L fluctuates with the current market price and disappears if the price reverts before you sell. Tax authorities typically only care about realized gains. Risk systems track unrealized to monitor current exposure.

from dataclasses import dataclass, field
from typing import List, Optional

@dataclass
class Trade:
    symbol: str
    side: str  # 'BUY' or 'SELL'
    quantity: float
    price: float

class PositionTracker:
    """Track positions and compute realized/unrealized P&L.
    
    - execute(trade) opens/reduces positions
    - unrealized_pnl(symbol, current_price) -> float
    - realized_pnl(symbol) -> float
    - summary() prints all positions
    """
    pass
Expected Output
Position: 10 @ avg 100.00\nUnrealized P&L: 50.00\nRealized P&L: 25.00\nPosition: 5 @ avg 100.00
Hints

Hint 1: Track (quantity, avg_cost) per symbol. BUY increases quantity and updates avg_cost using weighted average. SELL reduces quantity and realizes (sell_price - avg_cost) * quantity.

Hint 2: Avg cost update: new_avg = (old_qty * old_avg + new_qty * new_price) / (old_qty + new_qty). Realized P&L accumulates separately.

#6Relative Strength Index (RSI)Medium
rsimomentumindicators

Implement RSI (Relative Strength Index) using Wilder's smoothing method. RSI oscillates between 0-100; above 70 is "overbought", below 30 is "oversold".

from typing import List, Optional

def compute_rsi(prices: List[float], period: int = 14) -> List[Optional[float]]:
if len(prices) < period + 1:
return [None] * len(prices)

deltas = [prices[i] - prices[i - 1] for i in range(1, len(prices))]
gains = [max(d, 0.0) for d in deltas]
losses = [abs(min(d, 0.0)) for d in deltas]

result: List[Optional[float]] = [None] * (period + 1)

avg_gain = sum(gains[:period]) / period
avg_loss = sum(losses[:period]) / period

def rsi_from_avgs(ag, al):
if al == 0:
return 100.0
if ag == 0:
return 0.0
rs = ag / al
return 100 - (100 / (1 + rs))

result.append(rsi_from_avgs(avg_gain, avg_loss))

for i in range(period, len(deltas)):
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
result.append(rsi_from_avgs(avg_gain, avg_loss))

return result[:len(prices)]

# Generate test data: prices trending up then reversing
import math
prices = [100 + 10 * math.sin(i / 3.0) for i in range(50)]
rsi_values = compute_rsi(prices, period=14)

valid = [(i, round(v, 1)) for i, v in enumerate(rsi_values) if v is not None]
print(f"First valid RSI at index {valid[0][0]}: {valid[0][1]}")
overbought = [(i, v) for i, v in valid if v > 70]
oversold = [(i, v) for i, v in valid if v < 30]
print(f"Overbought signals: {len(overbought)}")
print(f"Oversold signals: {len(oversold)}")
Solution
from typing import List, Optional
import math

def compute_rsi(prices: List[float], period: int = 14) -> List[Optional[float]]:
if len(prices) < period + 1:
return [None] * len(prices)

deltas = [prices[i] - prices[i - 1] for i in range(1, len(prices))]
gains = [max(d, 0.0) for d in deltas]
losses = [abs(min(d, 0.0)) for d in deltas]

result: List[Optional[float]] = [None] * (period + 1)

avg_gain = sum(gains[:period]) / period
avg_loss = sum(losses[:period]) / period

def rsi_val(ag, al):
if al == 0:
return 100.0
if ag == 0:
return 0.0
return 100 - (100 / (1 + ag / al))

result.append(rsi_val(avg_gain, avg_loss))

for i in range(period, len(deltas)):
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
result.append(rsi_val(avg_gain, avg_loss))

return result[:len(prices)]

prices = [100 + 10 * math.sin(i / 3.0) for i in range(50)]
rsi_values = compute_rsi(prices, period=14)
valid = [(i, round(v, 1)) for i, v in enumerate(rsi_values) if v is not None]
print(f"First valid RSI at index {valid[0][0]}: {valid[0][1]}")
overbought = [(i, v) for i, v in valid if v > 70]
oversold = [(i, v) for i, v in valid if v < 30]
print(f"Overbought signals: {len(overbought)}")
print(f"Oversold signals: {len(oversold)}")

RSI interpretation: RSI above 70 suggests the asset is overbought (recent gains may be overdone, price could pull back). RSI below 30 suggests oversold. But in strong trends, RSI can stay overbought for extended periods — always use RSI as one signal among several, never in isolation.

from typing import List, Optional

def compute_rsi(prices: List[float], period: int = 14) -> List[Optional[float]]:
    """Compute RSI using the standard Wilder smoothing method.
    
    Steps:
    1. Compute price changes: delta[i] = price[i] - price[i-1]
    2. Separate into gains (positive deltas) and losses (absolute negative deltas)
    3. Seed: avg_gain = mean(first 'period' gains), avg_loss = mean(first 'period' losses)
    4. Smooth: avg_gain = (prev_avg_gain * (period-1) + current_gain) / period
    5. RS = avg_gain / avg_loss; RSI = 100 - 100 / (1 + RS)
    Return None for positions with insufficient data.
    """
    pass
Expected Output
RSI values computed (14-period)\nFirst valid RSI: ~50-70 range\nOverbought (RSI > 70): positions printed\nOversold (RSI < 30): positions printed
Hints

Hint 1: Wilder smoothing is different from a regular EMA. The formula is: avg = (prev_avg * (period - 1) + current_value) / period (no 2/(n+1) multiplier).

Hint 2: Handle division by zero: if avg_loss == 0, RSI = 100 (pure uptrend). If avg_gain == 0, RSI = 0 (pure downtrend).


Hard

#7Order Book SimulationHard
order-bookmatching-enginebid-ask

Build a price-time priority order book with limit order placement, cancellation, and order matching.

from dataclasses import dataclass, field
from typing import List, Optional, Tuple, Dict
import heapq

@dataclass
class Order:
order_id: int
side: str
price: float
quantity: float

class OrderBook:
def __init__(self):
self._bids: list = []
self._asks: list = []
self._orders: Dict[int, Order] = {}
self._cancelled: set = set()

def add_order(self, order: Order) -> None:
self._orders[order.order_id] = order
if order.side == "BUY":
heapq.heappush(self._bids, (-order.price, order.order_id, order))
else:
heapq.heappush(self._asks, (order.price, order.order_id, order))

def cancel_order(self, order_id: int) -> bool:
if order_id in self._orders:
self._cancelled.add(order_id)
return True
return False

def _peek_bid(self) -> Optional[Order]:
while self._bids:
_, oid, o = self._bids[0]
if oid in self._cancelled or o.quantity <= 0:
heapq.heappop(self._bids)
continue
return o
return None

def _peek_ask(self) -> Optional[Order]:
while self._asks:
_, oid, o = self._asks[0]
if oid in self._cancelled or o.quantity <= 0:
heapq.heappop(self._asks)
continue
return o
return None

def best_bid(self) -> Optional[float]:
o = self._peek_bid()
return o.price if o else None

def best_ask(self) -> Optional[float]:
o = self._peek_ask()
return o.price if o else None

def match(self) -> List[Tuple]:
fills = []
while True:
bid = self._peek_bid()
ask = self._peek_ask()
if bid is None or ask is None:
break
if bid.price < ask.price:
break
fill_price = ask.price
fill_qty = min(bid.quantity, ask.quantity)
fills.append((bid.order_id, ask.order_id, fill_price, fill_qty))
bid.quantity -= fill_qty
ask.quantity -= fill_qty
return fills

# Test
book = OrderBook()
book.add_order(Order(1, "BUY", 101.0, 10.0))
book.add_order(Order(2, "BUY", 99.0, 5.0))
book.add_order(Order(3, "SELL", 101.0, 5.0))
book.add_order(Order(4, "SELL", 103.0, 10.0))

print(f"Best bid: {book.best_bid()}")
print(f"Best ask: {book.best_ask()}")
print(f"Spread: {book.best_ask() - book.best_bid():.1f}")

fills = book.match()
for buy_id, sell_id, price, qty in fills:
print(f"Fill: buy_order={buy_id} sell_order={sell_id} price={price} qty={qty}")
Solution
from dataclasses import dataclass
from typing import List, Optional, Tuple, Dict
import heapq

@dataclass
class Order:
order_id: int
side: str
price: float
quantity: float

class OrderBook:
def __init__(self):
self._bids: list = []
self._asks: list = []
self._orders: Dict[int, Order] = {}
self._cancelled: set = set()

def add_order(self, order: Order) -> None:
self._orders[order.order_id] = order
if order.side == "BUY":
heapq.heappush(self._bids, (-order.price, order.order_id, order))
else:
heapq.heappush(self._asks, (order.price, order.order_id, order))

def cancel_order(self, order_id: int) -> bool:
if order_id in self._orders:
self._cancelled.add(order_id)
return True
return False

def _peek_bid(self) -> Optional[Order]:
while self._bids:
_, oid, o = self._bids[0]
if oid in self._cancelled or o.quantity <= 0:
heapq.heappop(self._bids)
continue
return o
return None

def _peek_ask(self) -> Optional[Order]:
while self._asks:
_, oid, o = self._asks[0]
if oid in self._cancelled or o.quantity <= 0:
heapq.heappop(self._asks)
continue
return o
return None

def best_bid(self) -> Optional[float]:
o = self._peek_bid()
return o.price if o else None

def best_ask(self) -> Optional[float]:
o = self._peek_ask()
return o.price if o else None

def match(self) -> List[Tuple]:
fills = []
while True:
bid = self._peek_bid()
ask = self._peek_ask()
if not bid or not ask or bid.price < ask.price:
break
fill_qty = min(bid.quantity, ask.quantity)
fills.append((bid.order_id, ask.order_id, ask.price, fill_qty))
bid.quantity -= fill_qty
ask.quantity -= fill_qty
return fills

book = OrderBook()
book.add_order(Order(1, "BUY", 101.0, 10.0))
book.add_order(Order(2, "BUY", 99.0, 5.0))
book.add_order(Order(3, "SELL", 101.0, 5.0))
book.add_order(Order(4, "SELL", 103.0, 10.0))

print(f"Best bid: {book.best_bid()}")
print(f"Best ask: {book.best_ask()}")
print(f"Spread: {book.best_ask() - book.best_bid():.1f}")

for buy_id, sell_id, price, qty in book.match():
print(f"Fill: buy_order={buy_id} sell_order={sell_id} price={price} qty={qty}")

Lazy deletion: Instead of removing cancelled orders from the heap immediately (O(n) operation), we use a _cancelled set and skip them when peeking. This is the standard approach for heap-based priority queues with cancellation — O(1) cancel, O(log n) amortized peek.

from dataclasses import dataclass, field
from typing import List, Optional, Tuple
import heapq

@dataclass
class Order:
    order_id: int
    side: str  # 'BUY' or 'SELL'
    price: float
    quantity: float

class OrderBook:
    """Price-time priority order book.
    
    - add_order(order) adds a limit order
    - cancel_order(order_id) cancels an order
    - best_bid() -> Optional[float]
    - best_ask() -> Optional[float]
    - match() -> List[Tuple] of (buy_id, sell_id, price, qty) fills
    """
    pass
Expected Output
Best bid: 99.0\nBest ask: 101.0\nSpread: 2.0\nFill: buy_order=1 sell_order=3 price=101.0 qty=5.0
Hints

Hint 1: Use a max-heap for bids (negate prices) and a min-heap for asks. Python only has min-heaps, so store (-price, order_id, order) for bids.

Hint 2: Matching condition: if best_bid >= best_ask, a fill happens at the ask price (price was already agreed when the order was placed). Fill quantity is min(bid_qty, ask_qty).

#8Backtesting EngineHard
backtestingstrategyperformance-metrics

Build a backtesting engine that simulates a strategy on historical prices and computes Sharpe ratio, max drawdown, and win rate.

from dataclasses import dataclass
from typing import List, Callable, Optional
import math

@dataclass
class BacktestResult:
total_return: float
sharpe_ratio: float
max_drawdown: float
num_trades: int
win_rate: float

def __str__(self):
return (
f"Total return: {self.total_return:.2f}%\n"
f"Sharpe ratio: {self.sharpe_ratio:.2f}\n"
f"Max drawdown: {self.max_drawdown:.2f}%\n"
f"Trades: {self.num_trades}\n"
f"Win rate: {self.win_rate:.1f}%"
)

class BacktestEngine:
def __init__(self, initial_capital: float = 10_000):
self.initial_capital = initial_capital

def run(self, prices: List[float], strategy: Callable) -> BacktestResult:
capital = self.initial_capital
position = 0.0
entry_price = 0.0
equity_curve = [capital]
trade_returns = []

for i in range(len(prices) - 1):
signal = strategy(prices[:i + 1], position)
next_price = prices[i + 1]

if signal == "BUY" and position == 0:
position = capital / next_price
entry_price = next_price

elif signal == "SELL" and position > 0:
capital = position * next_price
trade_ret = (next_price - entry_price) / entry_price
trade_returns.append(trade_ret)
position = 0.0
entry_price = 0.0

current_equity = capital + position * prices[i + 1]
equity_curve.append(current_equity)

if position > 0:
capital = position * prices[-1]
trade_ret = (prices[-1] - entry_price) / entry_price
trade_returns.append(trade_ret)

final_equity = capital
total_return = (final_equity - self.initial_capital) / self.initial_capital * 100

peak = equity_curve[0]
max_dd = 0.0
for v in equity_curve:
if v > peak:
peak = v
dd = (peak - v) / peak * 100
if dd > max_dd:
max_dd = dd

daily_returns = [
(equity_curve[i] - equity_curve[i - 1]) / equity_curve[i - 1]
for i in range(1, len(equity_curve))
]
if len(daily_returns) > 1:
mean_r = sum(daily_returns) / len(daily_returns)
std_r = math.sqrt(sum((r - mean_r) ** 2 for r in daily_returns) / len(daily_returns))
sharpe = (mean_r / std_r * math.sqrt(252)) if std_r > 0 else 0.0
else:
sharpe = 0.0

wins = sum(1 for r in trade_returns if r > 0)
win_rate = (wins / len(trade_returns) * 100) if trade_returns else 0.0

return BacktestResult(
total_return=total_return,
sharpe_ratio=sharpe,
max_drawdown=max_dd,
num_trades=len(trade_returns),
win_rate=win_rate,
)

# Simple MA crossover strategy
def ma_crossover_strategy(prices, position):
if len(prices) < 10:
return "HOLD"
fast = sum(prices[-3:]) / 3
slow = sum(prices[-10:]) / 10
if fast > slow and position == 0:
return "BUY"
if fast < slow and position > 0:
return "SELL"
return "HOLD"

import math
prices = [100 + 20 * math.sin(i / 10.0) + i * 0.1 for i in range(200)]
engine = BacktestEngine(initial_capital=10_000)
result = engine.run(prices, ma_crossover_strategy)
print(result)
Solution
from dataclasses import dataclass
from typing import List, Callable
import math

@dataclass
class BacktestResult:
total_return: float
sharpe_ratio: float
max_drawdown: float
num_trades: int
win_rate: float

def __str__(self):
return (
f"Total return: {self.total_return:.2f}%\n"
f"Sharpe ratio: {self.sharpe_ratio:.2f}\n"
f"Max drawdown: {self.max_drawdown:.2f}%\n"
f"Trades: {self.num_trades}\n"
f"Win rate: {self.win_rate:.1f}%"
)

class BacktestEngine:
def __init__(self, initial_capital: float = 10_000):
self.initial_capital = initial_capital

def run(self, prices: List[float], strategy: Callable) -> BacktestResult:
capital = self.initial_capital
position = entry_price = 0.0
equity_curve = [capital]
trade_returns = []

for i in range(len(prices) - 1):
signal = strategy(prices[:i + 1], position)
next_price = prices[i + 1]
if signal == "BUY" and position == 0:
position = capital / next_price
entry_price = next_price
elif signal == "SELL" and position > 0:
capital = position * next_price
trade_returns.append((next_price - entry_price) / entry_price)
position = entry_price = 0.0
equity_curve.append(capital + position * prices[i + 1])

if position > 0:
capital = position * prices[-1]
trade_returns.append((prices[-1] - entry_price) / entry_price)

total_return = (capital - self.initial_capital) / self.initial_capital * 100

peak = max_dd = 0.0
peak = equity_curve[0]
for v in equity_curve:
peak = max(peak, v)
max_dd = max(max_dd, (peak - v) / peak * 100)

daily_returns = [(equity_curve[i] - equity_curve[i-1]) / equity_curve[i-1]
for i in range(1, len(equity_curve))]
mean_r = sum(daily_returns) / len(daily_returns) if daily_returns else 0
std_r = math.sqrt(sum((r - mean_r)**2 for r in daily_returns) / len(daily_returns)) if daily_returns else 0
sharpe = (mean_r / std_r * math.sqrt(252)) if std_r > 0 else 0.0

wins = sum(1 for r in trade_returns if r > 0)
return BacktestResult(
total_return=total_return,
sharpe_ratio=sharpe,
max_drawdown=max_dd,
num_trades=len(trade_returns),
win_rate=(wins / len(trade_returns) * 100) if trade_returns else 0.0,
)

def ma_crossover_strategy(prices, position):
if len(prices) < 10:
return "HOLD"
fast = sum(prices[-3:]) / 3
slow = sum(prices[-10:]) / 10
if fast > slow and position == 0:
return "BUY"
if fast < slow and position > 0:
return "SELL"
return "HOLD"

prices = [100 + 20 * math.sin(i / 10.0) + i * 0.1 for i in range(200)]
print(BacktestEngine(10_000).run(prices, ma_crossover_strategy))

Look-ahead bias: The engine executes trades at prices[i+1] (next bar), not prices[i] (current bar). This prevents look-ahead bias — in reality you cannot trade at today's close with knowledge of today's close. Even a 1-bar delay makes the difference between a profitable backtest and a realistic one.

from dataclasses import dataclass, field
from typing import List, Callable, Optional

@dataclass
class BacktestResult:
    total_return: float
    sharpe_ratio: float
    max_drawdown: float
    num_trades: int
    win_rate: float

class BacktestEngine:
    """Event-driven backtesting engine.
    
    - Takes a list of prices and a strategy function
    - Strategy: (prices_so_far, position) -> 'BUY' | 'SELL' | 'HOLD'
    - Executes trades at next bar open (no look-ahead bias)
    - Computes Sharpe, max drawdown, win rate
    """
    def __init__(self, initial_capital: float = 10_000):
        pass

    def run(self, prices: List[float], strategy: Callable) -> BacktestResult:
        pass
Expected Output
Total return: X.XX%\nSharpe ratio: X.XX\nMax drawdown: X.XX%\nTrades: N\nWin rate: XX.X%
Hints

Hint 1: Iterate through prices. At each bar, call strategy(prices[:i], current_position). If BUY and no position, buy at prices[i+1]. If SELL and have position, sell at prices[i+1].

Hint 2: Max drawdown: track the running peak equity value. Drawdown = (peak - current) / peak. Record the maximum drawdown seen.

#9Risk-Adjusted Position SizerHard
risk-managementposition-sizingkelly-criterion

Implement three position sizing models: fixed fractional risk, Kelly criterion, and volatility-adjusted sizing using ATR.

class PositionSizer:
@staticmethod
def fixed_fractional(
capital: float,
risk_pct: float,
entry: float,
stop: float,
) -> int:
"""Risk exactly risk_pct% of capital on this trade."""
dollar_risk = capital * (risk_pct / 100)
risk_per_share = abs(entry - stop)
if risk_per_share == 0:
return 0
return int(dollar_risk / risk_per_share)

@staticmethod
def kelly_criterion(
win_rate: float,
avg_win: float,
avg_loss: float,
) -> float:
"""Return the optimal fraction of capital to bet (capped at 25%)."""
if avg_win == 0:
return 0.0
kelly = (win_rate * avg_win - (1 - win_rate) * avg_loss) / avg_win
return max(0.0, min(kelly, 0.25))

@staticmethod
def volatility_adjusted(
capital: float,
price: float,
atr: float,
risk_pct: float,
atr_multiplier: float = 2.0,
) -> int:
"""Size position so that 1 ATR move risks risk_pct% of capital."""
stop_distance = atr * atr_multiplier
dollar_risk = capital * (risk_pct / 100)
if stop_distance == 0:
return 0
return int(dollar_risk / stop_distance)

# Test
capital = 10_000.0
sizer = PositionSizer()

shares_ff = sizer.fixed_fractional(capital, risk_pct=2.0, entry=100.0, stop=90.0)
print(f"Fixed fractional: {shares_ff} shares (risking ${shares_ff * 10:.0f} on ${capital:.0f})")

kelly_f = sizer.kelly_criterion(win_rate=0.55, avg_win=2.0, avg_loss=1.0)
print(f"Kelly fraction: {kelly_f:.2f} ({kelly_f*100:.0f}% of capital)")

shares_vol = sizer.volatility_adjusted(capital, price=100.0, atr=1.50, risk_pct=2.0)
print(f"Volatility adjusted: {shares_vol} shares (1 ATR = $1.50 stop)")
Solution
class PositionSizer:
@staticmethod
def fixed_fractional(capital, risk_pct, entry, stop) -> int:
dollar_risk = capital * (risk_pct / 100)
risk_per_share = abs(entry - stop)
return int(dollar_risk / risk_per_share) if risk_per_share else 0

@staticmethod
def kelly_criterion(win_rate, avg_win, avg_loss) -> float:
if avg_win == 0:
return 0.0
kelly = (win_rate * avg_win - (1 - win_rate) * avg_loss) / avg_win
return max(0.0, min(kelly, 0.25))

@staticmethod
def volatility_adjusted(capital, price, atr, risk_pct, atr_multiplier=2.0) -> int:
stop_distance = atr * atr_multiplier
dollar_risk = capital * (risk_pct / 100)
return int(dollar_risk / stop_distance) if stop_distance else 0

sizer = PositionSizer()
capital = 10_000.0

shares_ff = sizer.fixed_fractional(capital, 2.0, 100.0, 90.0)
print(f"Fixed fractional: {shares_ff} shares (risking ${shares_ff * 10:.0f} on ${capital:.0f})")

kelly_f = sizer.kelly_criterion(0.55, 2.0, 1.0)
print(f"Kelly fraction: {kelly_f:.2f} ({kelly_f*100:.0f}% of capital)")

shares_vol = sizer.volatility_adjusted(capital, 100.0, 1.50, 2.0)
print(f"Volatility adjusted: {shares_vol} shares (1 ATR = $1.50 stop)")

Half-Kelly in practice: Full Kelly sizing is theoretically optimal but leads to very large drawdowns in practice. Most professional traders use half-Kelly (multiply the Kelly fraction by 0.5) to reduce volatility while retaining most of the expected growth advantage. The kelly_criterion cap at 0.25 is a similar safety constraint.

from typing import List

class PositionSizer:
    """Multiple position sizing models.
    
    - fixed_fractional(capital, risk_pct, entry, stop) -> shares
    - kelly_criterion(win_rate, avg_win, avg_loss) -> fraction of capital
    - volatility_adjusted(capital, price, atr, risk_pct) -> shares
    
    All methods return the number of shares (or fraction) to trade.
    """
    pass
Expected Output
Fixed fractional: 200 shares (risking $200 on $10,000)\nKelly fraction: 0.25 (25% of capital)\nVolatility adjusted: 133 shares (1 ATR = $1.50 stop)
Hints

Hint 1: Fixed fractional: shares = (capital * risk_pct) / (entry - stop). This risks exactly risk_pct% of capital if the stop is hit.

Hint 2: Kelly criterion: f = (win_rate * avg_win - (1 - win_rate) * avg_loss) / avg_win. Represents the theoretically optimal fraction of capital to risk per trade.

#10VWAP Calculator with Streaming UpdatesHard
vwapstreamingprice-analysis

Build a streaming VWAP calculator that updates in O(1) per tick and tracks session statistics.

from typing import Optional

class StreamingVWAP:
def __init__(self):
self._cumulative_pv: float = 0.0
self._cumulative_volume: float = 0.0
self._session_high: Optional[float] = None
self._session_low: Optional[float] = None

def update(self, price: float, volume: float) -> None:
self._cumulative_pv += price * volume
self._cumulative_volume += volume
if self._session_high is None or price > self._session_high:
self._session_high = price
if self._session_low is None or price < self._session_low:
self._session_low = price

def current_vwap(self) -> Optional[float]:
if self._cumulative_volume == 0:
return None
return self._cumulative_pv / self._cumulative_volume

def is_above_vwap(self, price: float) -> bool:
vwap = self.current_vwap()
return vwap is not None and price > vwap

def reset(self) -> None:
self._cumulative_pv = 0.0
self._cumulative_volume = 0.0
self._session_high = None
self._session_low = None

@property
def session_high(self) -> Optional[float]:
return self._session_high

@property
def session_low(self) -> Optional[float]:
return self._session_low

@property
def total_volume(self) -> float:
return self._cumulative_volume

# Test
vwap = StreamingVWAP()
trades = [
(99.0, 2000),
(100.0, 5000),
(102.0, 3000),
(104.0, 1000),
(103.0, 4000),
]
for price, volume in trades:
vwap.update(price, volume)

print(f"After {len(trades)} trades:")
print(f"VWAP: {vwap.current_vwap():.2f}")
print(f"Session high: {vwap.session_high}, low: {vwap.session_low}")
print(f"Total volume: {vwap.total_volume:.0f}")
print(f"Current price 103.0 above VWAP: {vwap.is_above_vwap(103.0)}")
Solution
from typing import Optional

class StreamingVWAP:
def __init__(self):
self._cumulative_pv = 0.0
self._cumulative_volume = 0.0
self._session_high: Optional[float] = None
self._session_low: Optional[float] = None

def update(self, price: float, volume: float) -> None:
self._cumulative_pv += price * volume
self._cumulative_volume += volume
if self._session_high is None or price > self._session_high:
self._session_high = price
if self._session_low is None or price < self._session_low:
self._session_low = price

def current_vwap(self) -> Optional[float]:
return self._cumulative_pv / self._cumulative_volume if self._cumulative_volume else None

def is_above_vwap(self, price: float) -> bool:
v = self.current_vwap()
return v is not None and price > v

def reset(self) -> None:
self._cumulative_pv = self._cumulative_volume = 0.0
self._session_high = self._session_low = None

@property
def session_high(self): return self._session_high
@property
def session_low(self): return self._session_low
@property
def total_volume(self): return self._cumulative_volume

vwap = StreamingVWAP()
for price, volume in [(99.0, 2000), (100.0, 5000), (102.0, 3000), (104.0, 1000), (103.0, 4000)]:
vwap.update(price, volume)

print(f"After 5 trades:")
print(f"VWAP: {vwap.current_vwap():.2f}")
print(f"Session high: {vwap.session_high}, low: {vwap.session_low}")
print(f"Total volume: {vwap.total_volume:.0f}")
print(f"Current price 103.0 above VWAP: {vwap.is_above_vwap(103.0)}")

VWAP as benchmark: Institutional traders use VWAP as a benchmark — executing below VWAP on a buy order means you got a better-than-average price for the day. "VWAP trading" strategies aim to break up large orders and execute them proportionally to volume throughout the day, minimizing market impact.

from typing import List, Tuple

class StreamingVWAP:
    """Volume-Weighted Average Price with streaming updates.
    
    VWAP = sum(price * volume) / sum(volume)
    
    - update(price, volume) adds a new trade tick
    - current_vwap() -> float
    - is_above_vwap(price) -> bool (price above current VWAP?)
    - reset() restarts VWAP for a new session
    
    Also compute session high, low, and total volume.
    """
    pass
Expected Output
After 5 trades:\nVWAP: 101.50\nSession high: 104.0, low: 99.0\nTotal volume: 15000\nCurrent price 103.0 above VWAP: True
Hints

Hint 1: VWAP tracks two running sums: cumulative_pv (price * volume) and cumulative_volume. VWAP = cumulative_pv / cumulative_volume.

Hint 2: This is O(1) per update — no need to recompute from scratch. Just add new (price * volume) to cumulative_pv and add volume to cumulative_volume.

#11Portfolio Correlation MatrixHard
portfoliocorrelationriskdiversification

Build a correlation matrix calculator for a portfolio of assets, with helpers to identify over-correlated positions and measure diversification.

from typing import List, Dict, Tuple, Optional
import math

def mean(xs: List[float]) -> float:
return sum(xs) / len(xs)

def stdev(xs: List[float]) -> float:
m = mean(xs)
return math.sqrt(sum((x - m) ** 2 for x in xs) / len(xs))

def pearson(xs: List[float], ys: List[float]) -> float:
n = len(xs)
mx, my = mean(xs), mean(ys)
cov = sum((xs[i] - mx) * (ys[i] - my) for i in range(n)) / n
sx, sy = stdev(xs), stdev(ys)
if sx == 0 or sy == 0:
return 0.0
return cov / (sx * sy)

class CorrelationMatrix:
def __init__(self):
self._matrix: Dict[str, Dict[str, float]] = {}
self._assets: List[str] = []

def compute(self, returns_dict: Dict[str, List[float]]) -> None:
self._assets = list(returns_dict.keys())
for a1 in self._assets:
self._matrix[a1] = {}
for a2 in self._assets:
self._matrix[a1][a2] = pearson(returns_dict[a1], returns_dict[a2])

def get(self, a1: str, a2: str) -> float:
return self._matrix[a1][a2]

def most_correlated(self, threshold: float = 0.7) -> List[Tuple[str, str, float]]:
pairs = []
for i, a1 in enumerate(self._assets):
for a2 in self._assets[i + 1:]:
corr = self._matrix[a1][a2]
if abs(corr) > threshold:
pairs.append((a1, a2, round(corr, 2)))
return sorted(pairs, key=lambda x: abs(x[2]), reverse=True)

def least_correlated(self) -> Optional[Tuple[str, str, float]]:
best = None
best_corr = float('inf')
for i, a1 in enumerate(self._assets):
for a2 in self._assets[i + 1:]:
corr = abs(self._matrix[a1][a2])
if corr < best_corr:
best_corr = corr
best = (a1, a2, round(self._matrix[a1][a2], 2))
return best

def diversification_score(self) -> float:
off_diag = []
for i, a1 in enumerate(self._assets):
for a2 in self._assets[i + 1:]:
off_diag.append(abs(self._matrix[a1][a2]))
if not off_diag:
return 1.0
return round(1 - sum(off_diag) / len(off_diag), 2)

# Test data: daily returns for 3 assets
aapl_ret = [0.01, 0.02, -0.01, 0.015, 0.02, -0.005, 0.01, 0.03, -0.02, 0.01]
msft_ret = [0.009, 0.019, -0.011, 0.014, 0.018, -0.006, 0.011, 0.028, -0.019, 0.009]
gold_ret = [-0.005, -0.01, 0.008, -0.007, -0.008, 0.003, -0.005, -0.015, 0.01, -0.006]

cm = CorrelationMatrix()
cm.compute({"AAPL": aapl_ret, "MSFT": msft_ret, "GOLD": gold_ret})

print(f"Correlation matrix computed for 3 assets")
pairs = cm.most_correlated(threshold=0.7)
print(f"Most correlated (>0.7): {pairs}")
least = cm.least_correlated()
print(f"Least correlated: {least}")
print(f"Diversification score: {cm.diversification_score()}")
Solution
from typing import List, Dict, Tuple, Optional
import math

def mean(xs):
return sum(xs) / len(xs)

def stdev(xs):
m = mean(xs)
return math.sqrt(sum((x - m)**2 for x in xs) / len(xs))

def pearson(xs, ys):
n, mx, my = len(xs), mean(xs), mean(ys)
cov = sum((xs[i] - mx) * (ys[i] - my) for i in range(n)) / n
sx, sy = stdev(xs), stdev(ys)
return cov / (sx * sy) if sx and sy else 0.0

class CorrelationMatrix:
def __init__(self):
self._matrix: Dict[str, Dict[str, float]] = {}
self._assets: List[str] = []

def compute(self, returns_dict: Dict[str, List[float]]) -> None:
self._assets = list(returns_dict.keys())
for a1 in self._assets:
self._matrix[a1] = {
a2: pearson(returns_dict[a1], returns_dict[a2])
for a2 in self._assets
}

def most_correlated(self, threshold=0.7):
return sorted(
[(a1, a2, round(self._matrix[a1][a2], 2))
for i, a1 in enumerate(self._assets)
for a2 in self._assets[i+1:]
if abs(self._matrix[a1][a2]) > threshold],
key=lambda x: abs(x[2]), reverse=True
)

def least_correlated(self):
pairs = [(a1, a2, self._matrix[a1][a2])
for i, a1 in enumerate(self._assets)
for a2 in self._assets[i+1:]]
return min(pairs, key=lambda x: abs(x[2]), default=None)

def diversification_score(self):
off = [abs(self._matrix[a1][a2])
for i, a1 in enumerate(self._assets)
for a2 in self._assets[i+1:]]
return round(1 - sum(off)/len(off), 2) if off else 1.0

aapl = [0.01, 0.02, -0.01, 0.015, 0.02, -0.005, 0.01, 0.03, -0.02, 0.01]
msft = [0.009, 0.019, -0.011, 0.014, 0.018, -0.006, 0.011, 0.028, -0.019, 0.009]
gold = [-0.005, -0.01, 0.008, -0.007, -0.008, 0.003, -0.005, -0.015, 0.01, -0.006]

cm = CorrelationMatrix()
cm.compute({"AAPL": aapl, "MSFT": msft, "GOLD": gold})
print(f"Correlation matrix computed for 3 assets")
print(f"Most correlated (>0.7): {cm.most_correlated(0.7)}")
print(f"Least correlated: {cm.least_correlated()}")
print(f"Diversification score: {cm.diversification_score()}")

Correlation in portfolio construction: Markowitz portfolio theory (1952) showed that combining assets with low pairwise correlations reduces portfolio volatility without reducing expected return. Modern portfolio risk systems (like those at hedge funds) compute full N x N correlation matrices and flag when positions become too correlated — a warning sign that the portfolio is less diversified than it appears.

from typing import List, Dict
import math

class CorrelationMatrix:
    """Compute and analyze pairwise correlations between asset return series.
    
    - compute(returns_dict) -> matrix as dict of dicts
    - most_correlated(threshold) -> pairs above threshold
    - least_correlated() -> the pair with lowest absolute correlation
    - portfolio_diversification_score() -> 0-1 score (1 = fully uncorrelated)
    """
    pass
Expected Output
Correlation matrix computed for 3 assets\nMost correlated (>0.7): [('AAPL', 'MSFT', 0.95)]\nLeast correlated: ('SPY', 'GOLD', -0.30)\nDiversification score: 0.65
Hints

Hint 1: Pearson correlation: r = cov(X,Y) / (std(X) * std(Y)). Compute covariance as mean((X - mean_X) * (Y - mean_Y)).

Hint 2: Diversification score: 1 - mean(abs(off-diagonal correlations)). A score of 1 means all pairs are uncorrelated; 0 means all pairs are perfectly correlated.

© 2026 EngineersOfAI. All rights reserved.