Skip to main content

Cointegration and Granger Causality

Reading time: ~35 minutes Interview relevance: High for financial ML roles; medium-high for general DS/ML roles that work with multi-series temporal data Target roles: Financial ML Engineer, Quantitative Researcher, Data Scientist, ML Engineer (NLP/Economics)

The Real Interview Moment

You're interviewing at a hedge fund. The interviewer shows you two time series: the prices of Coca-Cola (KO) and Pepsi (PEP). "Both series have unit roots - they're non-stationary. Can you still model the relationship between them?"

The naive answer: "No - we need to difference both series first."

The correct answer: "Not necessarily. If KO and PEP are cointegrated - if they share a common stochastic trend - then there's a stationary linear combination of the two series. The spread KO - β·PEP might be stationary even though neither series alone is. We test this with the Johansen or Engle-Granger test. If cointegrated, we model with a Vector Error Correction Model (VECM), which captures both short-run dynamics and long-run equilibrium."

This is the foundation of pairs trading - one of the oldest and most robust statistical arbitrage strategies in quantitative finance. It also applies to macroeconomics (long-run money demand), ecology (predator-prey relationships), and any domain where related non-stationary series are analyzed.

The Problem: Spurious Regression

The danger with non-stationary time series is spurious regression: two completely unrelated series can appear highly correlated simply because both are trending.

import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

from statsmodels.tsa.stattools import adfuller, coint, grangercausalitytests
from statsmodels.tsa.vector_ar.var_model import VAR
from statsmodels.tsa.vector_ar.vecm import coint_johansen, VECM
import statsmodels.api as sm
from scipy import stats

def demonstrate_spurious_regression(n: int = 500, seed: int = 42) -> None:
"""
Two independent random walks appear spuriously correlated.

This is the Yule (1926) phenomenon - regressing two independent
I(1) processes gives R² ≈ 0.5 and significant t-statistics
despite the series being completely independent.
"""
np.random.seed(seed)

# Two completely independent random walks
rw1 = np.cumsum(np.random.normal(0, 1, n))
rw2 = np.cumsum(np.random.normal(0, 1, n))

# OLS regression of rw2 on rw1
X = sm.add_constant(rw1)
model = sm.OLS(rw2, X).fit()

print("Spurious Regression Demonstration")
print("=" * 50)
print(f"Two INDEPENDENT random walks (n={n})")
print(f"\nOLS regression y ~ x:")
print(f" R²: {model.rsquared:.4f}")
print(f" β₁: {model.params[1]:.4f}")
print(f" t-stat: {model.tvalues[1]:.4f}")
print(f" p-value: {model.pvalues[1]:.4f}")
print(f"\n Conclusion: p={model.pvalues[1]:.4f} {'→ SIGNIFICANT' if model.pvalues[1] < 0.05 else '→ not significant'}")
print(f" But the series are INDEPENDENT - this is SPURIOUS!")

# ADF tests to confirm non-stationarity
_, p1, *_ = adfuller(rw1)
_, p2, *_ = adfuller(rw2)
print(f"\nADF tests:")
print(f" rw1: ADF p = {p1:.4f} ({'non-stationary' if p1 > 0.05 else 'stationary'})")
print(f" rw2: ADF p = {p2:.4f} ({'non-stationary' if p2 > 0.05 else 'stationary'})")

demonstrate_spurious_regression()

Output:

Spurious Regression Demonstration
==================================================
Two INDEPENDENT random walks (n=500)

OLS regression y ~ x:
R²: 0.4923
β₁: 0.8312
t-stat: 21.8834
p-value: 0.0000

Conclusion: p=0.0000 → SIGNIFICANT
But the series are INDEPENDENT - this is SPURIOUS!

ADF tests:
rw1: ADF p = 0.7231 (non-stationary)
rw2: ADF p = 0.6891 (non-stationary)

The lesson: never run OLS on two non-stationary series without first checking for cointegration. A significant t-statistic tells you nothing when both series have unit roots.

What is Cointegration?

Two series XtX_t and YtY_t are cointegrated if:

  1. Both are I(1)I(1) (non-stationary, have unit roots)
  2. There exists a linear combination Zt=YtβXtZ_t = Y_t - \beta X_t that is I(0)I(0) (stationary)

The stationary combination ZtZ_t is called the cointegrating relationship. It represents a long-run equilibrium: even though XtX_t and YtY_t wander individually, they cannot deviate too far from each other - they are "tied together" by a shared stochastic trend.

Economic intuition

  • Prices of substitutes (Coke and Pepsi): their individual prices are non-stationary (random walks), but the spread is mean-reverting
  • Import and export prices: cointegrated through the law of one price
  • Income and consumption: cointegrated through the permanent income hypothesis
  • Interest rates of different maturities: cointegrated (term structure)

Mathematical definition

Let yt=(y1,t,y2,t,,yk,t)\mathbf{y}_t = (y_{1,t}, y_{2,t}, \ldots, y_{k,t})^\top be a kk-vector of I(1)I(1) series.

yt\mathbf{y}_t is cointegrated with cointegrating vector β\boldsymbol{\beta} if:

zt=βytI(0)\mathbf{z}_t = \boldsymbol{\beta}^\top \mathbf{y}_t \sim I(0)

There can be up to rr linearly independent cointegrating vectors, where rr is the cointegrating rank. These form the cointegrating matrix β\boldsymbol{\beta} of dimension k×rk \times r.

Engle-Granger Test (Two-Step Method)

For two series, the Engle-Granger (1987) test for cointegration:

Step 1: Regress YtY_t on XtX_t: Yt=α+βXt+u^tY_t = \alpha + \beta X_t + \hat{u}_t

Step 2: Test if the residuals u^t\hat{u}_t are stationary (ADF test on residuals):

  • If residuals are I(0)I(0) → cointegrated with β\beta as the cointegrating coefficient
  • If residuals are I(1)I(1) → not cointegrated

:::warning Critical point The ADF test on OLS residuals uses non-standard critical values (not standard ADF tables), because the residuals are estimated, not observed. Use statsmodels.tsa.stattools.coint which handles this correctly. :::

def engle_granger_test(
x: np.ndarray,
y: np.ndarray,
verbose: bool = True
) -> dict:
"""
Engle-Granger cointegration test.

H0: no cointegration (residuals are I(1))
H1: cointegrated (residuals are I(0))

Reject H0 (p < 0.05) → cointegrated.
"""
# Check both series are I(1)
_, p_x, *_ = adfuller(x)
_, p_y, *_ = adfuller(y)

# Step 1: OLS regression (get cointegrating coefficient)
X_with_const = sm.add_constant(x)
model = sm.OLS(y, X_with_const).fit()
beta = model.params[1]
alpha = model.params[0]
residuals = model.resid

# Step 2: ADF test on residuals (with non-standard critical values)
coint_stat, coint_p, crit_values = coint(y, x)

result = {
'beta': beta,
'alpha': alpha,
'coint_statistic': coint_stat,
'p_value': coint_p,
'critical_values': crit_values,
'is_cointegrated': coint_p < 0.05,
'residuals': residuals,
'x_is_I1': p_x > 0.05,
'y_is_I1': p_y > 0.05
}

if verbose:
print("Engle-Granger Cointegration Test")
print("=" * 50)
print(f"H0: no cointegration | H1: cointegrated")
print(f"\nPrerequisites:")
print(f" X is I(1): {'YES' if p_x > 0.05 else 'NO (stationary!)'} (ADF p={p_x:.4f})")
print(f" Y is I(1): {'YES' if p_y > 0.05 else 'NO (stationary!)'} (ADF p={p_y:.4f})")
print(f"\nStep 1: OLS regression Y = α + β·X + u")
print(f" β (cointegrating coeff): {beta:.4f}")
print(f" α (intercept): {alpha:.4f}")
print(f"\nStep 2: ADF test on residuals")
print(f" Test statistic: {coint_stat:.4f}")
print(f" p-value: {coint_p:.4f}")
print(f" Critical values: 1%={crit_values[0]:.4f}, 5%={crit_values[1]:.4f}, 10%={crit_values[2]:.4f}")
print(f"\n Decision: {'COINTEGRATED ✓' if result['is_cointegrated'] else 'NOT cointegrated ✗'}")

return result


# Simulate cointegrated pair
np.random.seed(42)
n = 500
common_trend = np.cumsum(np.random.normal(0, 1, n))
x_coint = common_trend + np.random.normal(0, 0.5, n) # series tied to common trend
y_coint = 2.0 * common_trend + 5.0 + np.random.normal(0, 0.5, n) # β=2, α=5

result = engle_granger_test(x_coint, y_coint)
print(f"\nTrue cointegrating relationship: y = 5 + 2x + noise")
print(f"Estimated: y = {result['alpha']:.3f} + {result['beta']:.3f}x")

Johansen Test: Multiple Cointegrating Vectors

For more than two series, the Johansen test (1991) determines the number of cointegrating relationships rr (the cointegrating rank).

The null hypotheses are nested:

  • H0:r=0H_0: r = 0 (no cointegration) vs H1:r1H_1: r \geq 1
  • H0:r1H_0: r \leq 1 vs H1:r2H_1: r \geq 2
  • ...
  • H0:rk1H_0: r \leq k-1 vs H1:r=kH_1: r = k

Two test statistics:

  • Trace test: λtrace=Ti=r+1kln(1λ^i)\lambda_{\text{trace}} = -T \sum_{i=r+1}^{k} \ln(1 - \hat{\lambda}_i)
  • Maximum eigenvalue test: λmax=Tln(1λ^r+1)\lambda_{\max} = -T \ln(1 - \hat{\lambda}_{r+1})
def johansen_cointegration_test(
data: pd.DataFrame,
det_order: int = 0, # 0: no constant, 1: constant, -1: no deterministics
k_ar_diff: int = 1, # number of lagged differences
verbose: bool = True
) -> dict:
"""
Johansen cointegration test for multiple time series.

det_order:
-1: no deterministics
0: constant in cointegrating space
1: constant in system (more common)

Returns:
r: cointegrating rank
beta: cointegrating vectors (normalized)
trace_stats: trace test statistics
max_eigen_stats: max eigenvalue test statistics
"""
series_array = data.values
k = data.shape[1]

result = coint_johansen(series_array, det_order=det_order, k_ar_diff=k_ar_diff)

# Extract results
trace_stats = result.lr1 # trace statistics (T x 1)
max_stats = result.lr2 # max eigenvalue statistics
cvt = result.cvt # critical values for trace (T x 3: 90%, 95%, 99%)
cvm = result.cvm # critical values for max eigenvalue

# Determine rank: find first r where trace stat < critical value at 5%
r = 0
for i in range(k):
if trace_stats[i] > cvt[i, 1]: # 95% critical value
r += 1
else:
break

if verbose:
print("Johansen Cointegration Test")
print("=" * 65)
print(f"Series: {list(data.columns)}")
print(f"Observations: {len(data)}, k={k}")
print(f"\nTrace Test:")
print(f"{'H0: r':>8} | {'Trace Stat':>12} | {'5% CV':>8} | {'Result':>10}")
print("-" * 48)
for i in range(k):
h0 = f"r ≤ {i}"
sig = "REJECT" if trace_stats[i] > cvt[i, 1] else "fail to reject"
print(f"{h0:>8} | {trace_stats[i]:>12.4f} | {cvt[i,1]:>8.4f} | {sig:>10}")

print(f"\nMax Eigenvalue Test:")
print(f"{'H0: r':>8} | {'Max-Eigen':>12} | {'5% CV':>8} | {'Result':>10}")
print("-" * 48)
for i in range(k):
h0 = f"r = {i}"
sig = "REJECT" if max_stats[i] > cvm[i, 1] else "fail to reject"
print(f"{h0:>8} | {max_stats[i]:>12.4f} | {cvm[i,1]:>8.4f} | {sig:>10}")

print(f"\nCointegrating rank (from trace test): r = {r}")
if r > 0:
print(f"\nCointegrating vectors (columns):")
beta = result.evec[:, :r]
for j in range(r):
print(f" β{j+1}: {[f'{v:.4f}' for v in beta[:, j]]}")

return {
'rank': r,
'trace_stats': trace_stats,
'max_eigen_stats': max_stats,
'crit_trace': cvt,
'crit_max': cvm,
'eigenvectors': result.evec,
'eigenvalues': result.eig
}


# Simulate three cointegrated series (rank 2)
np.random.seed(0)
n = 400
trend1 = np.cumsum(np.random.normal(0, 1, n))
trend2 = np.cumsum(np.random.normal(0, 1, n))

# Two cointegrating relationships: two shared trends → rank = k - r = 3 - 2 = 1? No...
# k=3 series, 2 common trends → rank = 1 (1 cointegrating vector)
z1 = trend1 + np.random.normal(0, 0.3, n)
z2 = 2*trend1 + trend2 + np.random.normal(0, 0.3, n)
z3 = trend1 - trend2 + np.random.normal(0, 0.3, n)

df_coint = pd.DataFrame({'z1': z1, 'z2': z2, 'z3': z3})
johansen_result = johansen_cointegration_test(df_coint)

Vector Error Correction Model (VECM)

When series are cointegrated, the appropriate model is the Vector Error Correction Model (VECM), not VAR in levels or first differences.

The VECM captures two types of dynamics:

  1. Short-run: First-differenced terms (how the system moves period to period)
  2. Long-run: Error correction term (how deviations from equilibrium are corrected)

Δyt=αβyt1error correction+j=1p1ΓjΔytj+ϵt\Delta \mathbf{y}_t = \underbrace{\boldsymbol{\alpha} \boldsymbol{\beta}^\top \mathbf{y}_{t-1}}_{\text{error correction}} + \sum_{j=1}^{p-1} \boldsymbol{\Gamma}_j \Delta \mathbf{y}_{t-j} + \boldsymbol{\epsilon}_t

  • βyt1\boldsymbol{\beta}^\top \mathbf{y}_{t-1}: deviation from long-run equilibrium (the cointegrating relation)
  • α\boldsymbol{\alpha}: speed of adjustment matrix - how fast each series corrects deviations
  • Γj\boldsymbol{\Gamma}_j: short-run dynamics (lagged differences)

Economic meaning of α\alpha: If αi>0\boldsymbol{\alpha}_i > 0 and the series is above equilibrium (βyt1>0\boldsymbol{\beta}^\top \mathbf{y}_{t-1} > 0), then Δyi,t\Delta y_{i,t} is pulled downward - restoring equilibrium.

def fit_vecm(
data: pd.DataFrame,
coint_rank: int,
k_ar_diff: int = 1,
verbose: bool = True
) -> dict:
"""
Fit Vector Error Correction Model.

Steps:
1. Determine cointegrating rank r (from Johansen test)
2. Fit VECM with rank r
3. Extract: cointegrating vectors β, adjustment speeds α,
short-run dynamics Γ, impulse responses
"""
model = VECM(
data,
coint_rank=coint_rank,
k_ar_diff=k_ar_diff,
deterministic='ci' # constant in cointegrating relation
)
result = model.fit()

if verbose:
print("VECM Estimation Results")
print("=" * 50)
print(f"Cointegrating rank: {coint_rank}")
print(f"Log-likelihood: {result.llf:.4f}")
print(f"\nCointegrating vector β (normalized):")
print(result.beta)
print(f"\nAdjustment coefficients α:")
print(result.alpha)

# Forecast
forecast = result.predict(steps=20)

return {
'result': result,
'beta': result.beta, # cointegrating vectors
'alpha': result.alpha, # adjustment speeds
'forecast': forecast
}

# Fit VECM to cointegrated pair
df_pair = pd.DataFrame({'x': x_coint, 'y': y_coint})
vecm_result = fit_vecm(df_pair, coint_rank=1)

Pairs Trading: Applied Cointegration

Pairs trading is the canonical ML application of cointegration in finance.

class PairsTradingStrategy:
"""
Statistical arbitrage via cointegration.

Steps:
1. Find cointegrated pair (e.g., KO and PEP)
2. Estimate the spread: spread_t = Y_t - β·X_t
3. Standardize: z_score_t = (spread_t - μ) / σ
4. Trade: go long when z_score < -2 (spread cheap), short when z_score > 2

Theory: if cointegrated, spread is stationary → mean-reverting → predictable
"""

def __init__(
self,
entry_threshold: float = 2.0,
exit_threshold: float = 0.5,
lookback: int = 60 # rolling window for z-score
):
self.entry_threshold = entry_threshold
self.exit_threshold = exit_threshold
self.lookback = lookback
self.beta = None
self.alpha = None

def fit(self, x_train: np.ndarray, y_train: np.ndarray) -> None:
"""Estimate cointegrating relationship on training data."""
# Test for cointegration
coint_result = engle_granger_test(x_train, y_train, verbose=False)

if not coint_result['is_cointegrated']:
print("WARNING: Pair is not cointegrated on training data!")
print("Pairs trading on non-cointegrated series is spurious.")

self.beta = coint_result['beta']
self.alpha = coint_result['alpha']
print(f"Estimated hedge ratio β = {self.beta:.4f}")
print(f"Spread: Y - {self.beta:.4f}·X")

def compute_spread(self, x: np.ndarray, y: np.ndarray) -> np.ndarray:
"""Compute spread = Y - β·X."""
return y - self.beta * x

def compute_zscore(self, spread: np.ndarray) -> np.ndarray:
"""Rolling z-score of the spread."""
z = np.full_like(spread, np.nan)
for t in range(self.lookback, len(spread)):
window = spread[t - self.lookback:t]
mu = np.mean(window)
sigma = np.std(window)
if sigma > 1e-8:
z[t] = (spread[t] - mu) / sigma
return z

def generate_signals(self, x: np.ndarray, y: np.ndarray) -> pd.DataFrame:
"""
Generate trading signals from z-score.

Signal:
+1 = long spread (long Y, short X): spread is cheap → expect it to rise
-1 = short spread (short Y, long X): spread is expensive → expect it to fall
0 = flat (no position)
"""
spread = self.compute_spread(x, y)
z = self.compute_zscore(spread)

signal = np.zeros(len(z))
position = 0

for t in range(len(z)):
if np.isnan(z[t]):
continue

if position == 0: # no position
if z[t] < -self.entry_threshold:
position = 1 # enter long spread
elif z[t] > self.entry_threshold:
position = -1 # enter short spread
elif position == 1: # long spread
if z[t] > -self.exit_threshold:
position = 0 # exit
elif position == -1: # short spread
if z[t] < self.exit_threshold:
position = 0 # exit

signal[t] = position

return pd.DataFrame({
'x': x, 'y': y, 'spread': spread,
'z_score': z, 'signal': signal
})

def backtest(self, signals_df: pd.DataFrame) -> dict:
"""
Simple P&L calculation.
P&L: when signal = +1, profit from spread increase
"""
spread = signals_df['spread'].values
signal = signals_df['signal'].values

# Daily P&L = signal_{t-1} * Δspread_t
d_spread = np.diff(spread)
pnl = signal[:-1] * d_spread

# Remove nan entries
valid = ~np.isnan(pnl)
pnl_clean = pnl[valid]

total_return = np.sum(pnl_clean)
sharpe = np.mean(pnl_clean) / (np.std(pnl_clean) + 1e-8) * np.sqrt(252)
n_trades = np.sum(np.abs(np.diff(signal[:-1][valid])) > 0) // 2

return {
'total_return': total_return,
'sharpe_ratio': sharpe,
'n_trades': n_trades,
'win_rate': np.mean(pnl_clean > 0)
}


# Run pairs trading on simulated data
np.random.seed(42)
n_total = 1000
common = np.cumsum(np.random.normal(0, 1, n_total))
x_price = 50 + common + np.random.normal(0, 0.5, n_total)
y_price = 100 + 2.0 * common + np.random.normal(0, 0.5, n_total) # true β = 2

# Train/test split (always temporal for time series)
train_end = 600
x_train, x_test = x_price[:train_end], x_price[train_end:]
y_train, y_test = y_price[:train_end], y_price[train_end:]

strategy = PairsTradingStrategy(entry_threshold=2.0, exit_threshold=0.5, lookback=60)
strategy.fit(x_train, y_train)

# Generate signals on test set
signals = strategy.generate_signals(x_test, y_test)
backtest = strategy.backtest(signals)

print(f"\nPairs Trading Backtest (out-of-sample):")
print(f" Total P&L: {backtest['total_return']:.4f}")
print(f" Sharpe ratio: {backtest['sharpe_ratio']:.4f}")
print(f" Number of trades: {backtest['n_trades']}")
print(f" Win rate: {backtest['win_rate']:.2%}")

Granger Causality

Granger causality (Granger, 1969) asks a practical question: does knowing the history of series XX improve predictions of series YY?

Formally, XX Granger-causes YY if:

MSE[Y^tYt1,Yt2,,Xt1,Xt2,]<MSE[Y^tYt1,Yt2,]\text{MSE}[\hat{Y}_t | Y_{t-1}, Y_{t-2}, \ldots, X_{t-1}, X_{t-2}, \ldots] < \text{MSE}[\hat{Y}_t | Y_{t-1}, Y_{t-2}, \ldots]

i.e., including past XX values reduces prediction error for YY beyond what YY's own history provides.

Important caveat: Granger causality is predictive causality, not true causality. If XX Granger-causes YY, it means XX contains useful predictive information for YY - this could be because X causes Y, or because both are caused by a common latent factor that affects X first.

Test procedure (VAR-based)

Fit two VAR models:

  1. Restricted: Yt=k=1pakYtk+ϵtY_t = \sum_{k=1}^{p} a_k Y_{t-k} + \epsilon_t (using only YY's past)
  2. Unrestricted: Yt=k=1pakYtk+k=1pbkXtk+ϵtY_t = \sum_{k=1}^{p} a_k Y_{t-k} + \sum_{k=1}^{p} b_k X_{t-k} + \epsilon_t (using XX's past too)

Test H0:b1=b2==bp=0H_0: b_1 = b_2 = \cdots = b_p = 0 using F-test.

def granger_causality_test(
x: np.ndarray,
y: np.ndarray,
max_lag: int = 5,
alpha: float = 0.05,
verbose: bool = True
) -> dict:
"""
Granger causality test: does X help predict Y?

H0: X does NOT Granger-cause Y
H1: X Granger-causes Y

Reject H0 (p < alpha) → X provides predictive value for Y.

IMPORTANT: Both series must be stationary. Difference if needed.
"""
data = np.column_stack([y, x]) # statsmodels expects [target, cause] order

gc_results = grangercausalitytests(data, maxlag=max_lag, verbose=False)

results = {}
best_lag = None
best_p = 1.0

if verbose:
print("Granger Causality Test: Does X help predict Y?")
print("H0: X does NOT Granger-cause Y")
print("=" * 55)
print(f"{'Lag':>5} | {'F-stat':>10} | {'p-value':>10} | {'Decision':>15}")
print("-" * 55)

for lag in range(1, max_lag + 1):
test_result = gc_results[lag][0]
f_stat = test_result['ssr_ftest'][0]
p_val = test_result['ssr_ftest'][1]

results[lag] = {'f_stat': f_stat, 'p_value': p_val}

if p_val < best_p:
best_p = p_val
best_lag = lag

if verbose:
decision = "REJECT H0" if p_val < alpha else "fail to reject"
print(f"{lag:>5} | {f_stat:>10.4f} | {p_val:>10.4f} | {decision:>15}")

granger_causes = best_p < alpha
if verbose:
print(f"\nConclusion: X {'DOES' if granger_causes else 'does NOT'} Granger-cause Y "
f"(best p={best_p:.4f} at lag {best_lag})")

return {
'granger_causes': granger_causes,
'best_p': best_p,
'best_lag': best_lag,
'results_by_lag': results
}


# Generate Granger causal relationship: X causes Y with 2-period lag
np.random.seed(42)
n = 500
x_gc = np.random.normal(0, 1, n)
y_gc = np.zeros(n)
for t in range(2, n):
y_gc[t] = 0.7 * y_gc[t-1] + 0.5 * x_gc[t-2] + np.random.normal(0, 0.5)

print("Test 1: X causes Y (with lag 2)")
gc1 = granger_causality_test(x_gc, y_gc, max_lag=5)

print("\nTest 2: Y causes X (should NOT be significant - reverse direction)")
gc2 = granger_causality_test(y_gc, x_gc, max_lag=5, verbose=True)

Granger Causality for Feature Selection

A powerful ML application: use Granger causality to select relevant lag features from a large set of time series.

def granger_feature_selection(
target: pd.Series,
candidates: pd.DataFrame,
max_lag: int = 5,
alpha: float = 0.05,
verbose: bool = True
) -> pd.DataFrame:
"""
Use Granger causality to identify which candidate time series
are predictively useful for the target variable.

Steps:
1. Check stationarity of target and candidates
2. For each candidate X: test if X Granger-causes target Y
3. Return candidates ranked by best p-value

Use case: financial forecasting - which macro variables predict returns?
Web analytics - which page views predict conversions?
"""
from statsmodels.tsa.stattools import adfuller

# Ensure target is stationary
_, target_p, *_ = adfuller(target.values)
y = target.values if target_p < 0.05 else np.diff(target.values)

feature_results = []

for col in candidates.columns:
x_series = candidates[col].values
_, x_p, *_ = adfuller(x_series)
x = x_series if x_p < 0.05 else np.diff(x_series)

# Align lengths
min_len = min(len(y), len(x))
y_aligned = y[-min_len:]
x_aligned = x[-min_len:]

try:
data = np.column_stack([y_aligned, x_aligned])
gc = grangercausalitytests(data, maxlag=max_lag, verbose=False)

# Find best p-value across lags
p_values = [gc[lag][0]['ssr_ftest'][1] for lag in range(1, max_lag+1)]
best_p = min(p_values)
best_lag = p_values.index(best_p) + 1

feature_results.append({
'feature': col,
'granger_p': best_p,
'best_lag': best_lag,
'significant': best_p < alpha,
'x_stationary': x_p < 0.05
})
except Exception:
pass

df_results = pd.DataFrame(feature_results).sort_values('granger_p')

if verbose:
print(f"Granger Causality Feature Selection (target: {target.name})")
print(f"{'Feature':<25} | {'p-value':>10} | {'Best lag':>9} | {'Selected':>10}")
print("-" * 62)
for _, row in df_results.iterrows():
marker = "YES ✓" if row['significant'] else "no"
print(f"{row['feature']:<25} | {row['granger_p']:>10.4f} | {row['best_lag']:>9} | {marker:>10}")

selected = df_results[df_results['significant']]['feature'].tolist()
print(f"\nSelected features ({len(selected)}): {selected}")
return df_results


# Simulate: target depends on features 1 and 3, not 2 or 4
np.random.seed(0)
n = 400
f1 = np.random.normal(0, 1, n) # causal at lag 1
f2 = np.random.normal(0, 1, n) # not causal
f3 = np.random.normal(0, 1, n) # causal at lag 2
f4 = np.random.normal(0, 1, n) # not causal

target_data = np.zeros(n)
for t in range(2, n):
target_data[t] = (0.6 * target_data[t-1] +
0.5 * f1[t-1] +
0.4 * f3[t-2] +
np.random.normal(0, 0.3))

target_series = pd.Series(target_data, name='target')
candidates_df = pd.DataFrame({'feature_1': f1, 'feature_2': f2,
'feature_3': f3, 'feature_4': f4})

selection = granger_feature_selection(target_series, candidates_df, max_lag=5)

Interview Questions

Q1: What is the difference between correlation, Granger causality, and true causality?

Correlation measures the contemporaneous linear association between XX and YY at the same time tt. It says nothing about direction or timing.

Granger causality tests whether past values of XX improve predictions of YY beyond what YY's own past provides. If Xt1,Xt2,X_{t-1}, X_{t-2}, \ldots add predictive power for YtY_t (in a VAR model), then XX Granger-causes YY.

True causality (Pearl's do-calculus, SCM) answers "what happens to YY if I intervene and set X=xX = x?" - a fundamentally different question.

Key differences:

  • Granger causality is about prediction in observational data - XX might Granger-cause YY simply because both are driven by a common hidden factor ZZ, with ZZ affecting XX before YY (confounding)
  • True causality requires ruling out confounders - typically through controlled experiments or causal graph assumptions (DAGs)
  • Example: shoe sales might Granger-cause ice cream sales (both driven by summer), but shoes don't cause ice cream purchases

When Granger causality is useful:

  • Feature selection for forecasting models: "does macro variable X help predict asset returns Y?"
  • Understanding temporal precedence in observational data
  • Exploratory hypothesis generation before causal analysis

Limitation: Never interpret Granger causality as policy-relevant causation without additional causal assumptions.

Q2: What is cointegration and why does it matter for time series analysis?

Cointegration occurs when two or more I(1)I(1) (non-stationary) series have a stationary linear combination. They share a common stochastic trend, so while individually they wander, they cannot drift apart indefinitely - they maintain a long-run equilibrium relationship.

Why it matters:

  1. Avoids spurious regression: If two I(1)I(1) series are cointegrated, OLS gives a consistent estimator of the true relationship (the "super-consistency" property). If not cointegrated, OLS is spurious.

  2. Long-run modeling: Cointegration captures permanent relationships. Even when both series wander, the cointegrating combination is mean-reverting - it has a well-defined equilibrium.

  3. VECM vs VAR: For cointegrated series, the appropriate model is VECM - it includes an error correction term that pulls the system back to equilibrium. A plain VAR in first differences discards this long-run information. A VAR in levels is spurious (non-stationary residuals).

  4. Applications: Pairs trading (finance), term structure modeling, purchasing power parity (macroeconomics), electricity price co-movement.

Detection: Engle-Granger test (two series) or Johansen test (multiple series).

Q3: Why can't you just difference both series and run OLS when they're non-stationary?

You can difference both to get stationarity, but you lose important information - specifically, the long-run relationship encoded in the levels.

Consider prices PtP_t and rents RtR_t in a housing market - both non-stationary. If they're cointegrated (price-to-rent ratio is stationary), the deviations from the long-run equilibrium drive future price and rent adjustments.

If you run OLS on first differences:

  • ΔPt=α+βΔRt+ϵt\Delta P_t = \alpha + \beta \Delta R_t + \epsilon_t
  • You capture short-run dynamics (how changes in rent correlate with changes in price)
  • But you lose the error correction: the fact that when PtP_t is too high relative to RtR_t, future ΔPt\Delta P_t tends to be negative

The VECM (correct model): ΔPt=γ(Pt1βRt1)+short-run dynamics+ϵt\Delta P_t = \gamma(P_{t-1} - \beta R_{t-1}) + \text{short-run dynamics} + \epsilon_t

The term γ(Pt1βRt1)\gamma(P_{t-1} - \beta R_{t-1}) is the error correction - how much of the disequilibrium is corrected each period.

Practical consequence: If you fit a VAR in first differences to cointegrated series, your model will misspecify the dynamics. It will forecast correctly in the short run but lose track of the long-run equilibrium, producing forecasts that diverge over the horizon. VECM forecasts for cointegrated series outperform VAR-in-differences at long horizons.

Q4: Explain pairs trading. What's the mathematical justification and what are the risks?

Pairs trading exploits the mean-reversion of a cointegrated spread.

Mathematical justification:

  1. If XtX_t and YtY_t are cointegrated with coefficient β\beta, then Zt=YtβXtI(0)Z_t = Y_t - \beta X_t \sim I(0) (stationary)
  2. Stationarity implies mean-reversion: large deviations from E[Zt]\mathbb{E}[Z_t] tend to revert
  3. Strategy: when ZtZ_t is 2σ below its mean, go long ZZ (long YY, short β\beta units of XX); when 2σ above, go short ZZ. Exit when ZZ returns to near its mean.
  4. Expected P&L: positive because you're selling the spread when expensive and buying when cheap, exploiting mean-reversion.

Risks:

  1. Cointegration breakdown: The most dangerous risk. Historical cointegration doesn't guarantee future cointegration. If the economic relationship changes (Coca-Cola launches a new business line), the pairs relationship can break permanently.
  2. Extended divergence before convergence: The spread might take months to revert. You must have sufficient capital to maintain the position through large adverse moves.
  3. Transaction costs: Frequent trading erodes P&L. The strategy needs enough edge to survive costs.
  4. Estimation error: β\beta is estimated, not known. A wrong β\beta means the spread isn't stationary.
  5. Model risk: Using Engle-Granger without accounting for structural breaks, regime changes.

Best practices:

  • Use rolling windows to detect cointegration breakdown
  • Size positions inversely to spread uncertainty
  • Set stop-losses for extreme spread divergence
  • Diversify across many pairs to reduce idiosyncratic risk
Q5: How would you use Granger causality for feature engineering in a financial forecasting system?

Pipeline for Granger-based feature selection:

  1. Collect candidate series: macroeconomic indicators (GDP growth, CPI, PMI), market variables (VIX, yield curve, credit spreads), alternative data (satellite imagery, sentiment).

  2. Ensure stationarity: ADF test on all series. Difference non-stationary series (compute returns for prices, changes for levels). Granger causality tests require stationarity.

  3. Test pairwise Granger causality: For each candidate XiX_i, test whether it Granger-causes the target YY at multiple lags. Use the F-test from a VAR.

  4. Multiple testing correction: With 50 candidates, expect ~2.5 false positives at 5% significance. Apply Bonferroni correction (use α/k\alpha/k threshold) or Benjamini-Hochberg FDR control.

  5. Select lag structure: For each significant predictor, use the optimal lag (lowest p-value across lags). These lag features become predictors: feature_X_lag_k for each selected XX and lag kk.

  6. Validate: Granger test is in-sample. Validate out-of-sample with walk-forward evaluation: does including X actually reduce test set MSE?

Caution: Granger causality can be spurious due to:

  • Omitted variable bias (common driver)
  • Look-ahead bias if not careful with lag alignment
  • Non-stationarity if preprocessing is wrong

The result of Granger selection is a set of lag features to include in your ML model - it doesn't tell you the model form (linear vs non-linear), it just tells you which temporal dependencies exist.

Key Takeaways

  • Spurious regression: Two independent non-stationary series appear correlated - OLS on I(1)I(1) series is invalid without cointegration
  • Cointegration: Two I(1)I(1) series share a common stochastic trend; their linear combination Zt=YtβXtZ_t = Y_t - \beta X_t is stationary (mean-reverting)
  • Tests: Engle-Granger (two series); Johansen test (multiple series, determines rank)
  • VECM: The correct model for cointegrated series - captures both short-run dynamics and long-run error correction
  • Pairs trading: Exploit mean-reversion of the cointegrating spread - buy when spread is cheap, sell when expensive
  • Granger causality: Tests whether past XX improves prediction of YY - predictive causality, not true causality; useful for feature selection
  • Always difference before Granger testing; always test for cointegration before deciding between VAR-in-differences and VECM

Next: Lesson 07: Wavelets and Multiscale Analysis →

:::tip 🎮 Interactive Playground

Visualize this concept: Try the Cointegration demo on the EngineersOfAI Playground - no code required.

:::

© 2026 EngineersOfAI. All rights reserved.