Cointegration and Granger Causality
Reading time: ~35 minutes Interview relevance: High for financial ML roles; medium-high for general DS/ML roles that work with multi-series temporal data Target roles: Financial ML Engineer, Quantitative Researcher, Data Scientist, ML Engineer (NLP/Economics)
The Real Interview Moment
You're interviewing at a hedge fund. The interviewer shows you two time series: the prices of Coca-Cola (KO) and Pepsi (PEP). "Both series have unit roots - they're non-stationary. Can you still model the relationship between them?"
The naive answer: "No - we need to difference both series first."
The correct answer: "Not necessarily. If KO and PEP are cointegrated - if they share a common stochastic trend - then there's a stationary linear combination of the two series. The spread KO - β·PEP might be stationary even though neither series alone is. We test this with the Johansen or Engle-Granger test. If cointegrated, we model with a Vector Error Correction Model (VECM), which captures both short-run dynamics and long-run equilibrium."
This is the foundation of pairs trading - one of the oldest and most robust statistical arbitrage strategies in quantitative finance. It also applies to macroeconomics (long-run money demand), ecology (predator-prey relationships), and any domain where related non-stationary series are analyzed.
The Problem: Spurious Regression
The danger with non-stationary time series is spurious regression: two completely unrelated series can appear highly correlated simply because both are trending.
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
from statsmodels.tsa.stattools import adfuller, coint, grangercausalitytests
from statsmodels.tsa.vector_ar.var_model import VAR
from statsmodels.tsa.vector_ar.vecm import coint_johansen, VECM
import statsmodels.api as sm
from scipy import stats
def demonstrate_spurious_regression(n: int = 500, seed: int = 42) -> None:
"""
Two independent random walks appear spuriously correlated.
This is the Yule (1926) phenomenon - regressing two independent
I(1) processes gives R² ≈ 0.5 and significant t-statistics
despite the series being completely independent.
"""
np.random.seed(seed)
# Two completely independent random walks
rw1 = np.cumsum(np.random.normal(0, 1, n))
rw2 = np.cumsum(np.random.normal(0, 1, n))
# OLS regression of rw2 on rw1
X = sm.add_constant(rw1)
model = sm.OLS(rw2, X).fit()
print("Spurious Regression Demonstration")
print("=" * 50)
print(f"Two INDEPENDENT random walks (n={n})")
print(f"\nOLS regression y ~ x:")
print(f" R²: {model.rsquared:.4f}")
print(f" β₁: {model.params[1]:.4f}")
print(f" t-stat: {model.tvalues[1]:.4f}")
print(f" p-value: {model.pvalues[1]:.4f}")
print(f"\n Conclusion: p={model.pvalues[1]:.4f} {'→ SIGNIFICANT' if model.pvalues[1] < 0.05 else '→ not significant'}")
print(f" But the series are INDEPENDENT - this is SPURIOUS!")
# ADF tests to confirm non-stationarity
_, p1, *_ = adfuller(rw1)
_, p2, *_ = adfuller(rw2)
print(f"\nADF tests:")
print(f" rw1: ADF p = {p1:.4f} ({'non-stationary' if p1 > 0.05 else 'stationary'})")
print(f" rw2: ADF p = {p2:.4f} ({'non-stationary' if p2 > 0.05 else 'stationary'})")
demonstrate_spurious_regression()
Output:
Spurious Regression Demonstration
==================================================
Two INDEPENDENT random walks (n=500)
OLS regression y ~ x:
R²: 0.4923
β₁: 0.8312
t-stat: 21.8834
p-value: 0.0000
Conclusion: p=0.0000 → SIGNIFICANT
But the series are INDEPENDENT - this is SPURIOUS!
ADF tests:
rw1: ADF p = 0.7231 (non-stationary)
rw2: ADF p = 0.6891 (non-stationary)
The lesson: never run OLS on two non-stationary series without first checking for cointegration. A significant t-statistic tells you nothing when both series have unit roots.
What is Cointegration?
Two series and are cointegrated if:
- Both are (non-stationary, have unit roots)
- There exists a linear combination that is (stationary)
The stationary combination is called the cointegrating relationship. It represents a long-run equilibrium: even though and wander individually, they cannot deviate too far from each other - they are "tied together" by a shared stochastic trend.
Economic intuition
- Prices of substitutes (Coke and Pepsi): their individual prices are non-stationary (random walks), but the spread is mean-reverting
- Import and export prices: cointegrated through the law of one price
- Income and consumption: cointegrated through the permanent income hypothesis
- Interest rates of different maturities: cointegrated (term structure)
Mathematical definition
Let be a -vector of series.
is cointegrated with cointegrating vector if:
There can be up to linearly independent cointegrating vectors, where is the cointegrating rank. These form the cointegrating matrix of dimension .
Engle-Granger Test (Two-Step Method)
For two series, the Engle-Granger (1987) test for cointegration:
Step 1: Regress on :
Step 2: Test if the residuals are stationary (ADF test on residuals):
- If residuals are → cointegrated with as the cointegrating coefficient
- If residuals are → not cointegrated
:::warning Critical point
The ADF test on OLS residuals uses non-standard critical values (not standard ADF tables), because the residuals are estimated, not observed. Use statsmodels.tsa.stattools.coint which handles this correctly.
:::
def engle_granger_test(
x: np.ndarray,
y: np.ndarray,
verbose: bool = True
) -> dict:
"""
Engle-Granger cointegration test.
H0: no cointegration (residuals are I(1))
H1: cointegrated (residuals are I(0))
Reject H0 (p < 0.05) → cointegrated.
"""
# Check both series are I(1)
_, p_x, *_ = adfuller(x)
_, p_y, *_ = adfuller(y)
# Step 1: OLS regression (get cointegrating coefficient)
X_with_const = sm.add_constant(x)
model = sm.OLS(y, X_with_const).fit()
beta = model.params[1]
alpha = model.params[0]
residuals = model.resid
# Step 2: ADF test on residuals (with non-standard critical values)
coint_stat, coint_p, crit_values = coint(y, x)
result = {
'beta': beta,
'alpha': alpha,
'coint_statistic': coint_stat,
'p_value': coint_p,
'critical_values': crit_values,
'is_cointegrated': coint_p < 0.05,
'residuals': residuals,
'x_is_I1': p_x > 0.05,
'y_is_I1': p_y > 0.05
}
if verbose:
print("Engle-Granger Cointegration Test")
print("=" * 50)
print(f"H0: no cointegration | H1: cointegrated")
print(f"\nPrerequisites:")
print(f" X is I(1): {'YES' if p_x > 0.05 else 'NO (stationary!)'} (ADF p={p_x:.4f})")
print(f" Y is I(1): {'YES' if p_y > 0.05 else 'NO (stationary!)'} (ADF p={p_y:.4f})")
print(f"\nStep 1: OLS regression Y = α + β·X + u")
print(f" β (cointegrating coeff): {beta:.4f}")
print(f" α (intercept): {alpha:.4f}")
print(f"\nStep 2: ADF test on residuals")
print(f" Test statistic: {coint_stat:.4f}")
print(f" p-value: {coint_p:.4f}")
print(f" Critical values: 1%={crit_values[0]:.4f}, 5%={crit_values[1]:.4f}, 10%={crit_values[2]:.4f}")
print(f"\n Decision: {'COINTEGRATED ✓' if result['is_cointegrated'] else 'NOT cointegrated ✗'}")
return result
# Simulate cointegrated pair
np.random.seed(42)
n = 500
common_trend = np.cumsum(np.random.normal(0, 1, n))
x_coint = common_trend + np.random.normal(0, 0.5, n) # series tied to common trend
y_coint = 2.0 * common_trend + 5.0 + np.random.normal(0, 0.5, n) # β=2, α=5
result = engle_granger_test(x_coint, y_coint)
print(f"\nTrue cointegrating relationship: y = 5 + 2x + noise")
print(f"Estimated: y = {result['alpha']:.3f} + {result['beta']:.3f}x")
Johansen Test: Multiple Cointegrating Vectors
For more than two series, the Johansen test (1991) determines the number of cointegrating relationships (the cointegrating rank).
The null hypotheses are nested:
- (no cointegration) vs
- vs
- ...
- vs
Two test statistics:
- Trace test:
- Maximum eigenvalue test:
def johansen_cointegration_test(
data: pd.DataFrame,
det_order: int = 0, # 0: no constant, 1: constant, -1: no deterministics
k_ar_diff: int = 1, # number of lagged differences
verbose: bool = True
) -> dict:
"""
Johansen cointegration test for multiple time series.
det_order:
-1: no deterministics
0: constant in cointegrating space
1: constant in system (more common)
Returns:
r: cointegrating rank
beta: cointegrating vectors (normalized)
trace_stats: trace test statistics
max_eigen_stats: max eigenvalue test statistics
"""
series_array = data.values
k = data.shape[1]
result = coint_johansen(series_array, det_order=det_order, k_ar_diff=k_ar_diff)
# Extract results
trace_stats = result.lr1 # trace statistics (T x 1)
max_stats = result.lr2 # max eigenvalue statistics
cvt = result.cvt # critical values for trace (T x 3: 90%, 95%, 99%)
cvm = result.cvm # critical values for max eigenvalue
# Determine rank: find first r where trace stat < critical value at 5%
r = 0
for i in range(k):
if trace_stats[i] > cvt[i, 1]: # 95% critical value
r += 1
else:
break
if verbose:
print("Johansen Cointegration Test")
print("=" * 65)
print(f"Series: {list(data.columns)}")
print(f"Observations: {len(data)}, k={k}")
print(f"\nTrace Test:")
print(f"{'H0: r':>8} | {'Trace Stat':>12} | {'5% CV':>8} | {'Result':>10}")
print("-" * 48)
for i in range(k):
h0 = f"r ≤ {i}"
sig = "REJECT" if trace_stats[i] > cvt[i, 1] else "fail to reject"
print(f"{h0:>8} | {trace_stats[i]:>12.4f} | {cvt[i,1]:>8.4f} | {sig:>10}")
print(f"\nMax Eigenvalue Test:")
print(f"{'H0: r':>8} | {'Max-Eigen':>12} | {'5% CV':>8} | {'Result':>10}")
print("-" * 48)
for i in range(k):
h0 = f"r = {i}"
sig = "REJECT" if max_stats[i] > cvm[i, 1] else "fail to reject"
print(f"{h0:>8} | {max_stats[i]:>12.4f} | {cvm[i,1]:>8.4f} | {sig:>10}")
print(f"\nCointegrating rank (from trace test): r = {r}")
if r > 0:
print(f"\nCointegrating vectors (columns):")
beta = result.evec[:, :r]
for j in range(r):
print(f" β{j+1}: {[f'{v:.4f}' for v in beta[:, j]]}")
return {
'rank': r,
'trace_stats': trace_stats,
'max_eigen_stats': max_stats,
'crit_trace': cvt,
'crit_max': cvm,
'eigenvectors': result.evec,
'eigenvalues': result.eig
}
# Simulate three cointegrated series (rank 2)
np.random.seed(0)
n = 400
trend1 = np.cumsum(np.random.normal(0, 1, n))
trend2 = np.cumsum(np.random.normal(0, 1, n))
# Two cointegrating relationships: two shared trends → rank = k - r = 3 - 2 = 1? No...
# k=3 series, 2 common trends → rank = 1 (1 cointegrating vector)
z1 = trend1 + np.random.normal(0, 0.3, n)
z2 = 2*trend1 + trend2 + np.random.normal(0, 0.3, n)
z3 = trend1 - trend2 + np.random.normal(0, 0.3, n)
df_coint = pd.DataFrame({'z1': z1, 'z2': z2, 'z3': z3})
johansen_result = johansen_cointegration_test(df_coint)
Vector Error Correction Model (VECM)
When series are cointegrated, the appropriate model is the Vector Error Correction Model (VECM), not VAR in levels or first differences.
The VECM captures two types of dynamics:
- Short-run: First-differenced terms (how the system moves period to period)
- Long-run: Error correction term (how deviations from equilibrium are corrected)
- : deviation from long-run equilibrium (the cointegrating relation)
- : speed of adjustment matrix - how fast each series corrects deviations
- : short-run dynamics (lagged differences)
Economic meaning of : If and the series is above equilibrium (), then is pulled downward - restoring equilibrium.
def fit_vecm(
data: pd.DataFrame,
coint_rank: int,
k_ar_diff: int = 1,
verbose: bool = True
) -> dict:
"""
Fit Vector Error Correction Model.
Steps:
1. Determine cointegrating rank r (from Johansen test)
2. Fit VECM with rank r
3. Extract: cointegrating vectors β, adjustment speeds α,
short-run dynamics Γ, impulse responses
"""
model = VECM(
data,
coint_rank=coint_rank,
k_ar_diff=k_ar_diff,
deterministic='ci' # constant in cointegrating relation
)
result = model.fit()
if verbose:
print("VECM Estimation Results")
print("=" * 50)
print(f"Cointegrating rank: {coint_rank}")
print(f"Log-likelihood: {result.llf:.4f}")
print(f"\nCointegrating vector β (normalized):")
print(result.beta)
print(f"\nAdjustment coefficients α:")
print(result.alpha)
# Forecast
forecast = result.predict(steps=20)
return {
'result': result,
'beta': result.beta, # cointegrating vectors
'alpha': result.alpha, # adjustment speeds
'forecast': forecast
}
# Fit VECM to cointegrated pair
df_pair = pd.DataFrame({'x': x_coint, 'y': y_coint})
vecm_result = fit_vecm(df_pair, coint_rank=1)
Pairs Trading: Applied Cointegration
Pairs trading is the canonical ML application of cointegration in finance.
class PairsTradingStrategy:
"""
Statistical arbitrage via cointegration.
Steps:
1. Find cointegrated pair (e.g., KO and PEP)
2. Estimate the spread: spread_t = Y_t - β·X_t
3. Standardize: z_score_t = (spread_t - μ) / σ
4. Trade: go long when z_score < -2 (spread cheap), short when z_score > 2
Theory: if cointegrated, spread is stationary → mean-reverting → predictable
"""
def __init__(
self,
entry_threshold: float = 2.0,
exit_threshold: float = 0.5,
lookback: int = 60 # rolling window for z-score
):
self.entry_threshold = entry_threshold
self.exit_threshold = exit_threshold
self.lookback = lookback
self.beta = None
self.alpha = None
def fit(self, x_train: np.ndarray, y_train: np.ndarray) -> None:
"""Estimate cointegrating relationship on training data."""
# Test for cointegration
coint_result = engle_granger_test(x_train, y_train, verbose=False)
if not coint_result['is_cointegrated']:
print("WARNING: Pair is not cointegrated on training data!")
print("Pairs trading on non-cointegrated series is spurious.")
self.beta = coint_result['beta']
self.alpha = coint_result['alpha']
print(f"Estimated hedge ratio β = {self.beta:.4f}")
print(f"Spread: Y - {self.beta:.4f}·X")
def compute_spread(self, x: np.ndarray, y: np.ndarray) -> np.ndarray:
"""Compute spread = Y - β·X."""
return y - self.beta * x
def compute_zscore(self, spread: np.ndarray) -> np.ndarray:
"""Rolling z-score of the spread."""
z = np.full_like(spread, np.nan)
for t in range(self.lookback, len(spread)):
window = spread[t - self.lookback:t]
mu = np.mean(window)
sigma = np.std(window)
if sigma > 1e-8:
z[t] = (spread[t] - mu) / sigma
return z
def generate_signals(self, x: np.ndarray, y: np.ndarray) -> pd.DataFrame:
"""
Generate trading signals from z-score.
Signal:
+1 = long spread (long Y, short X): spread is cheap → expect it to rise
-1 = short spread (short Y, long X): spread is expensive → expect it to fall
0 = flat (no position)
"""
spread = self.compute_spread(x, y)
z = self.compute_zscore(spread)
signal = np.zeros(len(z))
position = 0
for t in range(len(z)):
if np.isnan(z[t]):
continue
if position == 0: # no position
if z[t] < -self.entry_threshold:
position = 1 # enter long spread
elif z[t] > self.entry_threshold:
position = -1 # enter short spread
elif position == 1: # long spread
if z[t] > -self.exit_threshold:
position = 0 # exit
elif position == -1: # short spread
if z[t] < self.exit_threshold:
position = 0 # exit
signal[t] = position
return pd.DataFrame({
'x': x, 'y': y, 'spread': spread,
'z_score': z, 'signal': signal
})
def backtest(self, signals_df: pd.DataFrame) -> dict:
"""
Simple P&L calculation.
P&L: when signal = +1, profit from spread increase
"""
spread = signals_df['spread'].values
signal = signals_df['signal'].values
# Daily P&L = signal_{t-1} * Δspread_t
d_spread = np.diff(spread)
pnl = signal[:-1] * d_spread
# Remove nan entries
valid = ~np.isnan(pnl)
pnl_clean = pnl[valid]
total_return = np.sum(pnl_clean)
sharpe = np.mean(pnl_clean) / (np.std(pnl_clean) + 1e-8) * np.sqrt(252)
n_trades = np.sum(np.abs(np.diff(signal[:-1][valid])) > 0) // 2
return {
'total_return': total_return,
'sharpe_ratio': sharpe,
'n_trades': n_trades,
'win_rate': np.mean(pnl_clean > 0)
}
# Run pairs trading on simulated data
np.random.seed(42)
n_total = 1000
common = np.cumsum(np.random.normal(0, 1, n_total))
x_price = 50 + common + np.random.normal(0, 0.5, n_total)
y_price = 100 + 2.0 * common + np.random.normal(0, 0.5, n_total) # true β = 2
# Train/test split (always temporal for time series)
train_end = 600
x_train, x_test = x_price[:train_end], x_price[train_end:]
y_train, y_test = y_price[:train_end], y_price[train_end:]
strategy = PairsTradingStrategy(entry_threshold=2.0, exit_threshold=0.5, lookback=60)
strategy.fit(x_train, y_train)
# Generate signals on test set
signals = strategy.generate_signals(x_test, y_test)
backtest = strategy.backtest(signals)
print(f"\nPairs Trading Backtest (out-of-sample):")
print(f" Total P&L: {backtest['total_return']:.4f}")
print(f" Sharpe ratio: {backtest['sharpe_ratio']:.4f}")
print(f" Number of trades: {backtest['n_trades']}")
print(f" Win rate: {backtest['win_rate']:.2%}")
Granger Causality
Granger causality (Granger, 1969) asks a practical question: does knowing the history of series improve predictions of series ?
Formally, Granger-causes if:
i.e., including past values reduces prediction error for beyond what 's own history provides.
Important caveat: Granger causality is predictive causality, not true causality. If Granger-causes , it means contains useful predictive information for - this could be because X causes Y, or because both are caused by a common latent factor that affects X first.
Test procedure (VAR-based)
Fit two VAR models:
- Restricted: (using only 's past)
- Unrestricted: (using 's past too)
Test using F-test.
def granger_causality_test(
x: np.ndarray,
y: np.ndarray,
max_lag: int = 5,
alpha: float = 0.05,
verbose: bool = True
) -> dict:
"""
Granger causality test: does X help predict Y?
H0: X does NOT Granger-cause Y
H1: X Granger-causes Y
Reject H0 (p < alpha) → X provides predictive value for Y.
IMPORTANT: Both series must be stationary. Difference if needed.
"""
data = np.column_stack([y, x]) # statsmodels expects [target, cause] order
gc_results = grangercausalitytests(data, maxlag=max_lag, verbose=False)
results = {}
best_lag = None
best_p = 1.0
if verbose:
print("Granger Causality Test: Does X help predict Y?")
print("H0: X does NOT Granger-cause Y")
print("=" * 55)
print(f"{'Lag':>5} | {'F-stat':>10} | {'p-value':>10} | {'Decision':>15}")
print("-" * 55)
for lag in range(1, max_lag + 1):
test_result = gc_results[lag][0]
f_stat = test_result['ssr_ftest'][0]
p_val = test_result['ssr_ftest'][1]
results[lag] = {'f_stat': f_stat, 'p_value': p_val}
if p_val < best_p:
best_p = p_val
best_lag = lag
if verbose:
decision = "REJECT H0" if p_val < alpha else "fail to reject"
print(f"{lag:>5} | {f_stat:>10.4f} | {p_val:>10.4f} | {decision:>15}")
granger_causes = best_p < alpha
if verbose:
print(f"\nConclusion: X {'DOES' if granger_causes else 'does NOT'} Granger-cause Y "
f"(best p={best_p:.4f} at lag {best_lag})")
return {
'granger_causes': granger_causes,
'best_p': best_p,
'best_lag': best_lag,
'results_by_lag': results
}
# Generate Granger causal relationship: X causes Y with 2-period lag
np.random.seed(42)
n = 500
x_gc = np.random.normal(0, 1, n)
y_gc = np.zeros(n)
for t in range(2, n):
y_gc[t] = 0.7 * y_gc[t-1] + 0.5 * x_gc[t-2] + np.random.normal(0, 0.5)
print("Test 1: X causes Y (with lag 2)")
gc1 = granger_causality_test(x_gc, y_gc, max_lag=5)
print("\nTest 2: Y causes X (should NOT be significant - reverse direction)")
gc2 = granger_causality_test(y_gc, x_gc, max_lag=5, verbose=True)
Granger Causality for Feature Selection
A powerful ML application: use Granger causality to select relevant lag features from a large set of time series.
def granger_feature_selection(
target: pd.Series,
candidates: pd.DataFrame,
max_lag: int = 5,
alpha: float = 0.05,
verbose: bool = True
) -> pd.DataFrame:
"""
Use Granger causality to identify which candidate time series
are predictively useful for the target variable.
Steps:
1. Check stationarity of target and candidates
2. For each candidate X: test if X Granger-causes target Y
3. Return candidates ranked by best p-value
Use case: financial forecasting - which macro variables predict returns?
Web analytics - which page views predict conversions?
"""
from statsmodels.tsa.stattools import adfuller
# Ensure target is stationary
_, target_p, *_ = adfuller(target.values)
y = target.values if target_p < 0.05 else np.diff(target.values)
feature_results = []
for col in candidates.columns:
x_series = candidates[col].values
_, x_p, *_ = adfuller(x_series)
x = x_series if x_p < 0.05 else np.diff(x_series)
# Align lengths
min_len = min(len(y), len(x))
y_aligned = y[-min_len:]
x_aligned = x[-min_len:]
try:
data = np.column_stack([y_aligned, x_aligned])
gc = grangercausalitytests(data, maxlag=max_lag, verbose=False)
# Find best p-value across lags
p_values = [gc[lag][0]['ssr_ftest'][1] for lag in range(1, max_lag+1)]
best_p = min(p_values)
best_lag = p_values.index(best_p) + 1
feature_results.append({
'feature': col,
'granger_p': best_p,
'best_lag': best_lag,
'significant': best_p < alpha,
'x_stationary': x_p < 0.05
})
except Exception:
pass
df_results = pd.DataFrame(feature_results).sort_values('granger_p')
if verbose:
print(f"Granger Causality Feature Selection (target: {target.name})")
print(f"{'Feature':<25} | {'p-value':>10} | {'Best lag':>9} | {'Selected':>10}")
print("-" * 62)
for _, row in df_results.iterrows():
marker = "YES ✓" if row['significant'] else "no"
print(f"{row['feature']:<25} | {row['granger_p']:>10.4f} | {row['best_lag']:>9} | {marker:>10}")
selected = df_results[df_results['significant']]['feature'].tolist()
print(f"\nSelected features ({len(selected)}): {selected}")
return df_results
# Simulate: target depends on features 1 and 3, not 2 or 4
np.random.seed(0)
n = 400
f1 = np.random.normal(0, 1, n) # causal at lag 1
f2 = np.random.normal(0, 1, n) # not causal
f3 = np.random.normal(0, 1, n) # causal at lag 2
f4 = np.random.normal(0, 1, n) # not causal
target_data = np.zeros(n)
for t in range(2, n):
target_data[t] = (0.6 * target_data[t-1] +
0.5 * f1[t-1] +
0.4 * f3[t-2] +
np.random.normal(0, 0.3))
target_series = pd.Series(target_data, name='target')
candidates_df = pd.DataFrame({'feature_1': f1, 'feature_2': f2,
'feature_3': f3, 'feature_4': f4})
selection = granger_feature_selection(target_series, candidates_df, max_lag=5)
Interview Questions
Q1: What is the difference between correlation, Granger causality, and true causality?
Correlation measures the contemporaneous linear association between and at the same time . It says nothing about direction or timing.
Granger causality tests whether past values of improve predictions of beyond what 's own past provides. If add predictive power for (in a VAR model), then Granger-causes .
True causality (Pearl's do-calculus, SCM) answers "what happens to if I intervene and set ?" - a fundamentally different question.
Key differences:
- Granger causality is about prediction in observational data - might Granger-cause simply because both are driven by a common hidden factor , with affecting before (confounding)
- True causality requires ruling out confounders - typically through controlled experiments or causal graph assumptions (DAGs)
- Example: shoe sales might Granger-cause ice cream sales (both driven by summer), but shoes don't cause ice cream purchases
When Granger causality is useful:
- Feature selection for forecasting models: "does macro variable X help predict asset returns Y?"
- Understanding temporal precedence in observational data
- Exploratory hypothesis generation before causal analysis
Limitation: Never interpret Granger causality as policy-relevant causation without additional causal assumptions.
Q2: What is cointegration and why does it matter for time series analysis?
Cointegration occurs when two or more (non-stationary) series have a stationary linear combination. They share a common stochastic trend, so while individually they wander, they cannot drift apart indefinitely - they maintain a long-run equilibrium relationship.
Why it matters:
-
Avoids spurious regression: If two series are cointegrated, OLS gives a consistent estimator of the true relationship (the "super-consistency" property). If not cointegrated, OLS is spurious.
-
Long-run modeling: Cointegration captures permanent relationships. Even when both series wander, the cointegrating combination is mean-reverting - it has a well-defined equilibrium.
-
VECM vs VAR: For cointegrated series, the appropriate model is VECM - it includes an error correction term that pulls the system back to equilibrium. A plain VAR in first differences discards this long-run information. A VAR in levels is spurious (non-stationary residuals).
-
Applications: Pairs trading (finance), term structure modeling, purchasing power parity (macroeconomics), electricity price co-movement.
Detection: Engle-Granger test (two series) or Johansen test (multiple series).
Q3: Why can't you just difference both series and run OLS when they're non-stationary?
You can difference both to get stationarity, but you lose important information - specifically, the long-run relationship encoded in the levels.
Consider prices and rents in a housing market - both non-stationary. If they're cointegrated (price-to-rent ratio is stationary), the deviations from the long-run equilibrium drive future price and rent adjustments.
If you run OLS on first differences:
- You capture short-run dynamics (how changes in rent correlate with changes in price)
- But you lose the error correction: the fact that when is too high relative to , future tends to be negative
The VECM (correct model):
The term is the error correction - how much of the disequilibrium is corrected each period.
Practical consequence: If you fit a VAR in first differences to cointegrated series, your model will misspecify the dynamics. It will forecast correctly in the short run but lose track of the long-run equilibrium, producing forecasts that diverge over the horizon. VECM forecasts for cointegrated series outperform VAR-in-differences at long horizons.
Q4: Explain pairs trading. What's the mathematical justification and what are the risks?
Pairs trading exploits the mean-reversion of a cointegrated spread.
Mathematical justification:
- If and are cointegrated with coefficient , then (stationary)
- Stationarity implies mean-reversion: large deviations from tend to revert
- Strategy: when is 2σ below its mean, go long (long , short units of ); when 2σ above, go short . Exit when returns to near its mean.
- Expected P&L: positive because you're selling the spread when expensive and buying when cheap, exploiting mean-reversion.
Risks:
- Cointegration breakdown: The most dangerous risk. Historical cointegration doesn't guarantee future cointegration. If the economic relationship changes (Coca-Cola launches a new business line), the pairs relationship can break permanently.
- Extended divergence before convergence: The spread might take months to revert. You must have sufficient capital to maintain the position through large adverse moves.
- Transaction costs: Frequent trading erodes P&L. The strategy needs enough edge to survive costs.
- Estimation error: is estimated, not known. A wrong means the spread isn't stationary.
- Model risk: Using Engle-Granger without accounting for structural breaks, regime changes.
Best practices:
- Use rolling windows to detect cointegration breakdown
- Size positions inversely to spread uncertainty
- Set stop-losses for extreme spread divergence
- Diversify across many pairs to reduce idiosyncratic risk
Q5: How would you use Granger causality for feature engineering in a financial forecasting system?
Pipeline for Granger-based feature selection:
-
Collect candidate series: macroeconomic indicators (GDP growth, CPI, PMI), market variables (VIX, yield curve, credit spreads), alternative data (satellite imagery, sentiment).
-
Ensure stationarity: ADF test on all series. Difference non-stationary series (compute returns for prices, changes for levels). Granger causality tests require stationarity.
-
Test pairwise Granger causality: For each candidate , test whether it Granger-causes the target at multiple lags. Use the F-test from a VAR.
-
Multiple testing correction: With 50 candidates, expect ~2.5 false positives at 5% significance. Apply Bonferroni correction (use threshold) or Benjamini-Hochberg FDR control.
-
Select lag structure: For each significant predictor, use the optimal lag (lowest p-value across lags). These lag features become predictors:
feature_X_lag_kfor each selected and lag . -
Validate: Granger test is in-sample. Validate out-of-sample with walk-forward evaluation: does including X actually reduce test set MSE?
Caution: Granger causality can be spurious due to:
- Omitted variable bias (common driver)
- Look-ahead bias if not careful with lag alignment
- Non-stationarity if preprocessing is wrong
The result of Granger selection is a set of lag features to include in your ML model - it doesn't tell you the model form (linear vs non-linear), it just tells you which temporal dependencies exist.
Key Takeaways
- Spurious regression: Two independent non-stationary series appear correlated - OLS on series is invalid without cointegration
- Cointegration: Two series share a common stochastic trend; their linear combination is stationary (mean-reverting)
- Tests: Engle-Granger (two series); Johansen test (multiple series, determines rank)
- VECM: The correct model for cointegrated series - captures both short-run dynamics and long-run error correction
- Pairs trading: Exploit mean-reversion of the cointegrating spread - buy when spread is cheap, sell when expensive
- Granger causality: Tests whether past improves prediction of - predictive causality, not true causality; useful for feature selection
- Always difference before Granger testing; always test for cointegration before deciding between VAR-in-differences and VECM
Next: Lesson 07: Wavelets and Multiscale Analysis →
:::tip 🎮 Interactive Playground
Visualize this concept: Try the Cointegration demo on the EngineersOfAI Playground - no code required.
:::
