Fourier Analysis for ML Engineers
Reading time: ~40 minutes Interview relevance: High - Fourier features appear in transformer positional encodings, audio ML, and IoT anomaly detection interviews Target roles: ML Engineer, Research Engineer, Audio/Speech ML, IoT/Sensor ML
The Real Interview Moment
You're interviewing for a speech recognition team. The interviewer shows you a waveform of a spoken word and asks: "How would you turn this raw audio into features for a neural network?"
"I'd extract mel-frequency cepstral coefficients," you say.
"Good. But why do MFCCs work? What's the mathematical operation at the core of that pipeline?"
The answer is the Fourier transform - specifically, the Short-Time Fourier Transform. But more than knowing the name, you need to understand why decomposing a signal into frequencies is a natural representation for audio (and many other time series), and why the FFT algorithm makes this computationally tractable at scale.
Fourier analysis is foundational to:
- Audio and speech feature extraction (STFT, mel spectrograms, MFCCs)
- Anomaly detection in IoT sensors (unexpected frequencies = faults)
- Transformer positional encodings (sinusoidal basis)
- Time-frequency tradeoffs in time series forecasting
- Accelerating convolutions in neural networks (conv via FFT)
The Core Idea: Signals as Sums of Sinusoids
Any periodic signal can be expressed as a sum of sine and cosine waves of different frequencies, amplitudes, and phases. This is the Fourier insight from 1807.
A continuous periodic signal with period :
where the Fourier coefficients are:
Using complex exponentials (), this condenses to:
This is elegant, but we work with discrete, finite samples in ML. Enter the Discrete Fourier Transform.
The Discrete Fourier Transform (DFT)
Given samples at equally spaced times, the DFT is:
The output is a complex number representing the frequency component at frequency cycles per sample (or in Hz, where is the sampling rate).
The inverse DFT recovers the original signal:
What each output frequency means
For samples taken at rate (samples/sec):
| DFT output index | Frequency | Meaning |
|---|---|---|
| 0 Hz | DC component (mean value of signal) | |
| Lowest non-zero frequency (period = samples) | ||
| (Nyquist) | Highest representable frequency | |
| Aliased | Mirror of (negative frequencies) |
import numpy as np
import matplotlib.pyplot as plt
from scipy.fft import fft, ifft, fftfreq, fftshift
# ─── Manual DFT (slow but transparent) ──────────────────────────────────────
def dft_manual(x: np.ndarray) -> np.ndarray:
"""O(N^2) DFT - illustrates the formula directly."""
N = len(x)
X = np.zeros(N, dtype=complex)
for k in range(N):
for n in range(N):
X[k] += x[n] * np.exp(-2j * np.pi * k * n / N)
return X
# Generate test signal: 50 Hz + 120 Hz + noise
fs = 1000 # sampling rate (Hz)
T = 1.0 # duration (seconds)
N = int(fs * T) # number of samples = 1000
t = np.linspace(0, T, N, endpoint=False)
signal = (
1.0 * np.sin(2 * np.pi * 50 * t) + # 50 Hz component, amplitude 1
0.5 * np.sin(2 * np.pi * 120 * t) + # 120 Hz component, amplitude 0.5
0.2 * np.random.normal(0, 1, N) # noise
)
# ─── FFT (fast) ──────────────────────────────────────────────────────────────
X = fft(signal)
freqs = fftfreq(N, d=1/fs) # frequency array in Hz
# Compare manual DFT vs FFT on small example
small_signal = signal[:64] # use first 64 samples for speed
X_manual = dft_manual(small_signal)
X_scipy = fft(small_signal)
print("Manual DFT vs FFT (first 5 complex values):")
for k in range(5):
print(f" k={k}: manual={X_manual[k]:.4f}, scipy={X_scipy[k]:.4f}")
The Fast Fourier Transform (FFT)
The naive DFT requires operations. For samples, that's operations - 1 trillion multiplications. The FFT reduces this to using the Cooley-Tukey algorithm.
The divide-and-conquer insight
For even , split the DFT into two half-sized DFTs:
Because is periodic (), this gives:
where is the twiddle factor.
Apply recursively until : operations total.
def fft_recursive(x: np.ndarray) -> np.ndarray:
"""
Cooley-Tukey FFT (radix-2 decimation-in-time).
Requires N to be a power of 2.
"""
N = len(x)
if N == 1:
return x.astype(complex)
if N % 2 != 0:
raise ValueError("N must be a power of 2")
# Recursively compute FFT of even and odd indexed elements
even = fft_recursive(x[::2]) # even-indexed: x[0], x[2], x[4], ...
odd = fft_recursive(x[1::2]) # odd-indexed: x[1], x[3], x[5], ...
# Twiddle factors: W_N^k = exp(-2pi*i*k/N)
k = np.arange(N // 2)
twiddle = np.exp(-2j * np.pi * k / N) * odd
return np.concatenate([even + twiddle, even - twiddle])
# Verify correctness
x_test = np.random.randn(256)
X_recursive = fft_recursive(x_test)
X_scipy = fft(x_test)
max_diff = np.max(np.abs(X_recursive - X_scipy))
print(f"Max difference (recursive FFT vs scipy): {max_diff:.2e}")
# Performance comparison
import time
sizes = [64, 256, 1024, 4096, 16384, 65536]
print(f"\n{'N':>8} | {'DFT O(N²) (ms)':>16} | {'FFT O(NlogN) (ms)':>18} | {'Speedup':>8}")
print("-" * 60)
for N_test in sizes:
x_bench = np.random.randn(N_test)
if N_test <= 1024: # DFT too slow for large N
t0 = time.perf_counter()
dft_manual(x_bench)
t_dft = (time.perf_counter() - t0) * 1000
else:
t_dft = float('inf')
t0 = time.perf_counter()
fft(x_bench)
t_fft = (time.perf_counter() - t0) * 1000
speedup = t_dft / t_fft if t_dft < float('inf') else float('inf')
t_dft_str = f"{t_dft:.2f}" if t_dft < float('inf') else "too slow"
speedup_str = f"{speedup:.0f}x" if speedup < float('inf') else ">>>"
print(f"{N_test:>8} | {t_dft_str:>16} | {t_fft:>18.3f} | {speedup_str:>8}")
Power Spectrum and Spectral Analysis
The power spectrum tells you how much power (energy) each frequency component carries:
where .
def compute_power_spectrum(
signal: np.ndarray,
fs: float = 1.0,
window: str = 'hann'
) -> tuple[np.ndarray, np.ndarray]:
"""
Compute one-sided power spectrum with windowing.
Windowing reduces spectral leakage - the artifact where energy
from one frequency bleeds into adjacent frequency bins when
the signal doesn't fit perfectly in the analysis window.
Common windows:
- Hann: good general purpose, -3dB bandwidth = 1.5 bins
- Hamming: slightly better spectral leakage than Hann
- Rectangular (no window): sharpest but most leakage
- Blackman: lowest sidelobes, widest main lobe
"""
N = len(signal)
# Apply window
if window == 'hann':
w = np.hanning(N)
elif window == 'hamming':
w = np.hamming(N)
elif window == 'blackman':
w = np.blackman(N)
else:
w = np.ones(N) # rectangular
windowed = signal * w
# FFT and power
X = fft(windowed)
power = (np.abs(X)**2) / (N * np.sum(w**2)) # normalized
# One-sided spectrum (positive frequencies only, double for conservation)
freqs = fftfreq(N, d=1/fs)
pos_mask = freqs >= 0
freqs_pos = freqs[pos_mask]
power_pos = 2 * power[pos_mask]
power_pos[0] /= 2 # DC component: don't double
if N % 2 == 0:
power_pos[-1] /= 2 # Nyquist: don't double
return freqs_pos, power_pos
freqs, power = compute_power_spectrum(signal, fs=fs, window='hann')
# Find peak frequencies
peak_idx = np.argsort(power)[-5:][::-1] # top 5 peaks
print("Top frequency components detected:")
for idx in peak_idx:
print(f" f = {freqs[idx]:.1f} Hz, power = {power[idx]:.4f}")
# Detect anomalous frequencies
def detect_spectral_anomalies(
signal: np.ndarray,
fs: float,
threshold_sigma: float = 3.0
) -> list[dict]:
"""
Detect anomalous frequency components: peaks > threshold_sigma
standard deviations above the median power.
Use case: IoT sensor anomaly detection - a motor fault introduces
harmonics at unexpected frequencies.
"""
freqs, power = compute_power_spectrum(signal, fs=fs)
median_power = np.median(power)
std_power = np.std(power)
threshold = median_power + threshold_sigma * std_power
anomalous = []
for i, (f, p) in enumerate(zip(freqs, power)):
if p > threshold and f > 0: # exclude DC
anomalous.append({
'frequency_hz': f,
'power': p,
'sigma_above_median': (p - median_power) / std_power
})
return sorted(anomalous, key=lambda x: x['power'], reverse=True)
anomalies = detect_spectral_anomalies(signal, fs=fs)
print(f"\nDetected {len(anomalies)} spectral anomalies:")
for a in anomalies[:3]:
print(f" f={a['frequency_hz']:.1f} Hz, {a['sigma_above_median']:.1f}σ above median")
Short-Time Fourier Transform (STFT)
A major limitation of the DFT: it's global - it tells you which frequencies exist in the signal, but not when they appear.
The STFT solves this by computing the DFT over short overlapping windows:
where is the analysis window, is the window center, and is the frequency.
The result is a 2D spectrogram: time × frequency.
from scipy.signal import stft, spectrogram
def compute_spectrogram(
signal: np.ndarray,
fs: float,
window_size: float = 0.025, # 25ms window (standard for speech)
hop_size: float = 0.010, # 10ms hop (60% overlap)
n_fft: int = None
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Compute STFT spectrogram.
Parameters:
window_size: window duration in seconds (longer = better freq resolution)
hop_size: hop duration in seconds (shorter = better time resolution)
Time-frequency tradeoff:
Longer window → better frequency resolution, worse time resolution
Shorter window → better time resolution, worse frequency resolution
This is the Heisenberg uncertainty principle for time-frequency analysis.
"""
nperseg = int(window_size * fs) # samples per window
noverlap = int((window_size - hop_size) * fs) # overlapping samples
if n_fft is None:
n_fft = nperseg
freqs, times, Sxx = spectrogram(
signal,
fs=fs,
nperseg=nperseg,
noverlap=noverlap,
nfft=n_fft,
window='hann',
scaling='density'
)
return freqs, times, Sxx
# Non-stationary signal: frequency changes over time
t_chirp = np.linspace(0, 1, fs, endpoint=False)
chirp_signal = np.sin(2 * np.pi * (50 + 100 * t_chirp) * t_chirp) # frequency sweeps 50→150 Hz
freqs_s, times_s, Sxx = compute_spectrogram(chirp_signal, fs=fs)
print(f"Spectrogram shape: {Sxx.shape} (freq_bins × time_frames)")
print(f"Frequency resolution: {freqs_s[1] - freqs_s[0]:.2f} Hz")
print(f"Time resolution: {times_s[1] - times_s[0]:.4f} sec")
Mel Spectrogram and MFCCs
The mel scale maps frequencies to how humans perceive pitch (roughly logarithmic). This is the input to most audio ML models.
def hz_to_mel(hz: float) -> float:
"""Convert Hz to mel scale (O'Shaughnessy formula)."""
return 2595 * np.log10(1 + hz / 700)
def mel_to_hz(mel: float) -> float:
"""Convert mel to Hz."""
return 700 * (10**(mel / 2595) - 1)
def mel_filterbank(
n_filters: int = 26,
fft_size: int = 512,
fs: float = 16000,
f_min: float = 0.0,
f_max: float = None
) -> np.ndarray:
"""
Compute mel filterbank matrix.
Maps linear FFT bins to mel-scale filter outputs.
Used in Whisper, wav2vec, and all audio transformers.
"""
if f_max is None:
f_max = fs / 2
# Evenly spaced mel points
mel_min = hz_to_mel(f_min)
mel_max = hz_to_mel(f_max)
mel_points = np.linspace(mel_min, mel_max, n_filters + 2)
hz_points = np.array([mel_to_hz(m) for m in mel_points])
# Convert Hz to FFT bin indices
bin_points = np.floor((fft_size + 1) * hz_points / fs).astype(int)
# Build triangular filters
filterbank = np.zeros((n_filters, fft_size // 2 + 1))
for m in range(1, n_filters + 1):
f_left = bin_points[m - 1]
f_center = bin_points[m]
f_right = bin_points[m + 1]
for k in range(f_left, f_center):
if f_center > f_left:
filterbank[m-1, k] = (k - f_left) / (f_center - f_left)
for k in range(f_center, f_right):
if f_right > f_center:
filterbank[m-1, k] = (f_right - k) / (f_right - f_center)
return filterbank
def compute_mfcc(
signal: np.ndarray,
fs: float = 16000,
n_mfcc: int = 13,
n_filters: int = 26,
frame_length: float = 0.025,
frame_step: float = 0.010
) -> np.ndarray:
"""
Compute MFCCs (Mel-Frequency Cepstral Coefficients).
Pipeline:
1. Frame signal into overlapping windows
2. Apply FFT to each frame → power spectrum
3. Apply mel filterbank → mel spectrum
4. Take log → log mel spectrum
5. Apply DCT → cepstral coefficients (MFCCs)
The DCT decorrelates the mel filter outputs (similar to PCA).
Result: compact, decorrelated features for speech/audio classification.
"""
# Step 1: Pre-emphasis (boost high frequencies)
pre_emphasis = 0.97
emphasized = np.append(signal[0], signal[1:] - pre_emphasis * signal[:-1])
# Step 2: Framing
frame_len = int(frame_length * fs)
frame_step_samples = int(frame_step * fs)
n_frames = 1 + (len(emphasized) - frame_len) // frame_step_samples
indices = (np.tile(np.arange(frame_len), (n_frames, 1)) +
np.tile(np.arange(n_frames) * frame_step_samples, (frame_len, 1)).T)
frames = emphasized[indices.astype(int)]
# Step 3: Windowing and FFT
frames *= np.hanning(frame_len)
NFFT = 512
mag_frames = np.absolute(np.fft.rfft(frames, NFFT))
pow_frames = (1.0 / NFFT) * (mag_frames ** 2)
# Step 4: Mel filterbank
fbank = mel_filterbank(n_filters, NFFT, fs)
filter_banks = pow_frames @ fbank.T
filter_banks = np.where(filter_banks == 0, np.finfo(float).eps, filter_banks)
filter_banks = 20 * np.log10(filter_banks) # dB scale
# Step 5: DCT (keep n_mfcc coefficients)
from scipy.fft import dct
mfccs = dct(filter_banks, type=2, axis=1, norm='ortho')[:, :n_mfcc]
return mfccs # shape: (n_frames, n_mfcc)
# Generate a simple speech-like signal
np.random.seed(42)
audio = np.random.randn(16000) # 1 second of "noise"
mfcc_features = compute_mfcc(audio, fs=16000, n_mfcc=13)
print(f"MFCC shape: {mfcc_features.shape}") # (n_frames, 13)
print(f"Time frames: {mfcc_features.shape[0]}, coefficients: {mfcc_features.shape[1]}")
Fourier Features in Neural Networks
Sinusoidal Positional Encodings
The transformer architecture uses Fourier-inspired positional encodings:
where is the position and is the dimension index. Each dimension uses a different frequency.
def sinusoidal_positional_encoding(
seq_len: int,
d_model: int,
base: float = 10000.0
) -> np.ndarray:
"""
Transformer sinusoidal positional encoding (Vaswani et al., 2017).
Design choices explained via Fourier:
1. Different frequencies for different dimensions → unique position fingerprint
2. sin/cos pair → model can learn relative positions via rotation
(PE(pos+k) is a linear function of PE(pos))
3. base=10000 → frequencies range from 1 (low dim) to 1/10000 (high dim)
- covers sequence lengths from 1 to ~10000 tokens
The key property: for any offset k, there exists a fixed linear transformation
M_k such that PE(pos+k) = M_k @ PE(pos). This lets the model attend
to relative positions.
"""
positions = np.arange(seq_len)[:, np.newaxis] # (seq_len, 1)
dims = np.arange(d_model)[np.newaxis, :] # (1, d_model)
# Dimension frequencies: 1/base^(2i/d_model)
freqs = 1.0 / (base ** (2 * (dims // 2) / d_model)) # (1, d_model)
angles = positions * freqs # (seq_len, d_model)
# Apply sin to even dims, cos to odd dims
pe = np.zeros((seq_len, d_model))
pe[:, 0::2] = np.sin(angles[:, 0::2]) # even: sin
pe[:, 1::2] = np.cos(angles[:, 1::2]) # odd: cos
return pe
pe = sinusoidal_positional_encoding(seq_len=100, d_model=64)
print(f"Positional encoding shape: {pe.shape}")
# Verify the relative position property
# For d=2 (one sin/cos pair): PE(pos+k) = R(k) @ PE(pos) where R is rotation matrix
d = 2
freq = 1.0 # for simplest case
def pe_2d(pos, freq=1.0):
return np.array([np.sin(freq * pos), np.cos(freq * pos)])
pos, k = 10, 5
pe_pos = pe_2d(pos)
pe_pos_k = pe_2d(pos + k)
# The rotation matrix
theta = freq * k
R = np.array([[np.cos(theta), -np.sin(theta)],
[np.sin(theta), np.cos(theta)]])
pe_rotated = R @ pe_pos
print(f"\nRelative position property:")
print(f"PE(pos+k): {pe_pos_k}")
print(f"R(k) @ PE(pos): {pe_rotated}")
print(f"Max error: {np.max(np.abs(pe_pos_k - pe_rotated)):.2e}")
Fourier Features for Tabular and Time Series ML
def fourier_date_features(
timestamps: np.ndarray,
periods: list[float] = None
) -> np.ndarray:
"""
Encode cyclical time features using Fourier pairs.
Instead of: hour=23, hour=0 (treated as far apart by models)
Use: sin(2π*hour/24), cos(2π*hour/24) (wraps correctly)
This gives neural networks a continuous, periodic representation
of time-of-day, day-of-week, month-of-year, etc.
periods: list of cycle lengths in same units as timestamps
"""
if periods is None:
periods = [24, 24*7, 24*365] # daily, weekly, yearly (if timestamps are hours)
features = []
for period in periods:
angle = 2 * np.pi * (timestamps % period) / period
features.append(np.sin(angle))
features.append(np.cos(angle))
return np.stack(features, axis=-1)
# Example: hourly timestamps for one week
hours = np.arange(0, 24 * 7) # 168 hours
time_features = fourier_date_features(hours, periods=[24, 24*7])
print(f"Fourier time features shape: {time_features.shape}") # (168, 4)
print(f"Features: [sin_daily, cos_daily, sin_weekly, cos_weekly]")
# Verify periodicity: hour 0 ≈ hour 24 in sin/cos space
h0 = time_features[0]
h24 = time_features[24]
print(f"\nHour 0 daily features: sin={h0[0]:.4f}, cos={h0[1]:.4f}")
print(f"Hour 24 daily features: sin={h24[0]:.4f}, cos={h24[1]:.4f}")
print(f"(Same encoding → no discontinuity at midnight)")
Convolution via FFT
The convolution theorem states that convolution in the time domain equals pointwise multiplication in the frequency domain:
This allows convolution instead of for kernel size .
def fft_convolution(signal: np.ndarray, kernel: np.ndarray) -> np.ndarray:
"""
Compute convolution via FFT - O(N log N) instead of O(N*M).
Used in:
- Large 1D CNN layers (when kernel is large)
- Audio filtering in preprocessing pipelines
- Computing attention patterns via polynomial multiplication
"""
N = len(signal)
M = len(kernel)
output_size = N + M - 1
# Zero-pad both to output_size (avoids circular convolution artifacts)
padded_size = 2**int(np.ceil(np.log2(output_size))) # next power of 2
X = fft(signal, n=padded_size)
H = fft(kernel, n=padded_size)
Y = X * H # pointwise multiplication in frequency domain
y = np.real(ifft(Y))[:output_size] # inverse FFT, trim
return y
# Verify: FFT convolution == direct convolution
signal_test = np.random.randn(1024)
kernel_test = np.array([0.25, 0.5, 0.25]) # moving average filter
y_fft = fft_convolution(signal_test, kernel_test)
y_direct = np.convolve(signal_test, kernel_test)
print(f"FFT convolution == direct: max diff = {np.max(np.abs(y_fft - y_direct)):.2e}")
# Performance for large kernels
import time
large_signal = np.random.randn(10000)
large_kernel = np.random.randn(1000) # large kernel - FFT shines here
t0 = time.perf_counter()
for _ in range(10):
np.convolve(large_signal, large_kernel)
t_direct = (time.perf_counter() - t0) / 10 * 1000
t0 = time.perf_counter()
for _ in range(10):
fft_convolution(large_signal, large_kernel)
t_fft = (time.perf_counter() - t0) / 10 * 1000
print(f"\nLarge convolution (N=10000, M=1000):")
print(f" Direct: {t_direct:.2f} ms")
print(f" FFT: {t_fft:.2f} ms")
print(f" Speedup: {t_direct/t_fft:.1f}x")
Fourier Features for Time Series Forecasting
Modern forecasting models like N-BEATS and N-HiTS use explicit Fourier basis functions.
def fourier_basis_features(
time_index: np.ndarray,
period: float,
n_harmonics: int = 5
) -> np.ndarray:
"""
Generate Fourier basis functions for period T with n_harmonics.
Each harmonic k contributes: sin(2π*k*t/T), cos(2π*k*t/T)
Total features: 2 * n_harmonics
Use case: capture seasonal patterns in time series forecasting.
Linear regression on these features = fit seasonality.
Neural net on these features = flexible seasonal modeling.
"""
features = []
for k in range(1, n_harmonics + 1):
angle = 2 * np.pi * k * time_index / period
features.append(np.sin(angle))
features.append(np.cos(angle))
return np.stack(features, axis=-1)
# Generate monthly data with complex seasonality
np.random.seed(42)
n_months = 120
t = np.arange(n_months)
# True signal: fundamental + 2nd harmonic + trend + noise
annual_period = 12
y_true = (
3.0 * np.sin(2 * np.pi * t / annual_period) + # fundamental
1.5 * np.sin(2 * np.pi * 2 * t / annual_period) + # 2nd harmonic
0.1 * t + # trend
np.random.normal(0, 0.5, n_months) # noise
)
# Fit using Fourier features + linear regression
from sklearn.linear_model import LinearRegression
fourier_feats = fourier_basis_features(t, period=annual_period, n_harmonics=6)
trend_feat = t.reshape(-1, 1)
X_regression = np.hstack([fourier_feats, trend_feat, np.ones((n_months, 1))])
# Train on first 80%, predict on last 20%
split = int(0.8 * n_months)
model = LinearRegression().fit(X_regression[:split], y_true[:split])
y_pred = model.predict(X_regression[split:])
from sklearn.metrics import mean_squared_error
rmse = np.sqrt(mean_squared_error(y_true[split:], y_pred))
print(f"Fourier regression RMSE on test set: {rmse:.4f}")
print(f"Baseline (mean) RMSE: {np.std(y_true[split:]):.4f}")
print(f"R² on test: {model.score(X_regression[split:], y_true[split:]):.4f}")
Nyquist Theorem and Aliasing
A critical concept for ML engineers working with sensor data:
Nyquist-Shannon Sampling Theorem: To faithfully reconstruct a signal with maximum frequency , you must sample at rate .
If you sample too slowly (below the Nyquist rate), higher frequencies appear as lower frequencies - aliasing.
def demonstrate_aliasing() -> None:
"""
Show aliasing: 110 Hz signal sampled at 100 Hz appears as 10 Hz.
The "true" frequency f and aliased frequency f_alias are related by:
f_alias = |f - round(f/fs) * fs|
"""
fs_low = 100 # sampling rate (Hz)
f_signal = 110 # signal frequency (Hz) - above Nyquist!
nyquist = fs_low / 2 # = 50 Hz
t_cont = np.linspace(0, 0.5, 10000) # continuous time (high res)
t_sampled = np.arange(0, 0.5, 1/fs_low) # sampled at 100 Hz
# Original signal (110 Hz)
x_cont = np.sin(2 * np.pi * f_signal * t_cont)
x_sampled = np.sin(2 * np.pi * f_signal * t_sampled)
# Aliased frequency: 110 Hz sampled at 100 Hz → appears as 10 Hz
f_alias = abs(f_signal - round(f_signal / fs_low) * fs_low)
x_alias = np.sin(2 * np.pi * f_alias * t_cont)
print(f"Signal frequency: {f_signal} Hz")
print(f"Sampling rate: {fs_low} Hz")
print(f"Nyquist frequency: {nyquist} Hz")
print(f"Aliased frequency: {f_alias} Hz")
print(f"\n→ 110 Hz signal aliased to {f_alias} Hz!")
print(f"→ To capture 110 Hz, must sample at ≥ {2*f_signal} Hz")
demonstrate_aliasing()
:::warning Aliasing in ML data pipelines When working with sensor data, ensure your sampling rate is at least twice your maximum signal frequency. If you downsample a 1 kHz vibration sensor to 100 Hz, all frequencies above 50 Hz will alias. In motor fault detection (where fault harmonics often appear at 100–300 Hz), this destroys the signal. :::
Spectral Density Estimation
For stochastic processes, we estimate the power spectral density (PSD) - how power is distributed across frequencies.
from scipy.signal import welch, periodogram
def compare_psd_methods(signal: np.ndarray, fs: float) -> None:
"""
Compare periodogram vs Welch's method for PSD estimation.
Periodogram: one FFT over the full signal - high frequency resolution,
high variance (noisy estimates)
Welch's method: average periodograms of overlapping segments -
lower variance (smoother), lower frequency resolution
The standard choice for ML feature extraction.
"""
# Periodogram (single FFT)
f_per, P_per = periodogram(signal, fs=fs, window='hann')
# Welch's method (averaged segments)
f_welch, P_welch = welch(
signal,
fs=fs,
nperseg=256, # segment length
noverlap=128, # 50% overlap
window='hann',
average='mean'
)
print(f"Periodogram: {len(f_per)} frequency bins, variance = {np.var(P_per):.4f}")
print(f"Welch's method: {len(f_welch)} frequency bins, variance = {np.var(P_welch):.4f}")
print(f"Variance reduction: {np.var(P_per)/np.var(P_welch):.1f}x")
compare_psd_methods(signal, fs=fs)
Interview Questions
Q1: What is the FFT and why does it matter for ML? What's the computational complexity?
The Fast Fourier Transform (FFT) is an algorithm for computing the Discrete Fourier Transform. The naive DFT is - for audio samples, that's operations vs for FFT.
The FFT uses the divide-and-conquer Cooley-Tukey algorithm: split the DFT into two half-sized DFTs (even/odd indexed), apply recursively until , then combine using twiddle factors. This gives .
Why it matters for ML:
- Audio features: STFT for mel spectrograms (Whisper, wav2vec2, ASR) uses FFT as the core operation - without FFT, spectral feature extraction would be 100-1000x slower
- Convolution theorem: Large CNN kernels can be applied via FFT in rather than
- Positional encodings: Transformer PE is based on different frequency sinusoids
- Frequency-domain forecasting: Models like N-BEATS, FEDformer operate in frequency domain
- Anomaly detection: Spectral analysis finds unusual harmonics in industrial sensor data
Q2: What is spectral leakage and how do you mitigate it?
Spectral leakage occurs when a frequency component doesn't fit exactly within the analysis window - its energy "leaks" into neighboring frequency bins.
Root cause: The DFT implicitly assumes the signal repeats periodically. If the window doesn't contain an integer number of cycles, there's a discontinuity at the boundary. In the frequency domain, a discontinuity introduces high-frequency components (Gibbs phenomenon), spreading energy across all bins.
Example: A 50 Hz sine wave in a 1-second window (1000 samples, Hz) falls exactly on bin 50 - no leakage. But a 50.5 Hz sine doesn't fall on any bin and leaks.
Mitigation - windowing: Apply a window function that tapers the signal to zero at the edges before FFT:
- Hann window: - good tradeoff
- Hamming: Slightly better main lobe, slightly worse sidelobes
- Blackman: Best sidelobe suppression, widest main lobe
- Rectangular (no window): Sharpest peaks but worst leakage
Trade-off: windowing reduces leakage but widens the main lobe (reduces frequency resolution). For narrow-band signals that need precise frequency localization, Hann is standard.
Q3: Explain how sinusoidal positional encodings work in transformers and why they were designed using Fourier ideas.
The original transformer (Vaswani et al., 2017) uses:
The Fourier connection:
Each dimension pair corresponds to a specific frequency . Low dimensions have high frequency (changes rapidly with position); high dimensions have low frequency (changes slowly).
This gives each position a unique "fingerprint" - like a Fourier basis representation of position.
Why sin/cos pairs? For any offset , there exists a fixed linear transformation (rotation matrix) such that . This lets the model represent relative positions via a linear operation on the encodings.
Why these specific frequencies? The range of frequencies covers sequence lengths from 1 to ~10,000 tokens. With dimensions, you get different frequencies - enough coverage that no two positions within typical sequence lengths produce the same encoding.
Modern developments: RoPE (Rotary Position Embedding) makes this even more explicit - it rotates the query and key vectors by a frequency-dependent angle proportional to position, directly encoding relative positions in the attention computation.
Q4: What is the Nyquist theorem and how does it affect ML data pipelines?
The Nyquist-Shannon Sampling Theorem: A bandlimited signal with maximum frequency Hz can be perfectly reconstructed from samples taken at a rate Hz. The minimum rate is the Nyquist rate.
Aliasing: Sampling below the Nyquist rate causes high frequencies to be indistinguishable from lower frequencies. Specifically, a frequency sampled at rate appears as .
ML data pipeline implications:
-
IoT/sensor data: If you downsample 1 kHz vibration data to 100 Hz, all frequencies above 50 Hz alias. Motor bearing faults often produce harmonics at 80–200 Hz - they alias to incorrect frequencies or cancel out, making fault detection fail.
-
Anti-aliasing filter: Before downsampling, always apply a low-pass filter with cutoff at (anti-aliasing filter). scipy's
resample_polyanddecimatefunctions do this automatically;signal[::k](naive downsampling) does not. -
Audio ML: Speech is bandlimited to ~8 kHz (telephone quality) or ~22 kHz (studio). Whisper uses 16 kHz sampling → captures up to 8 kHz - adequate for speech intelligibility.
-
Feature engineering with Nyquist in mind: Don't compute FFT features expecting to find 200 Hz signal in 100 Hz sampled data. Know your data's sampling rate before interpreting spectral features.
Q5: How would you use Fourier analysis for anomaly detection in time series?
Fourier-based anomaly detection uses the insight that normal operation of a system has a characteristic frequency signature. Anomalies introduce unexpected frequencies or alter normal frequency magnitudes.
Approach 1: Spectral residual
- Compute FFT of the time series
- Estimate the "normal" spectrum (e.g., moving average of log-magnitude spectrum)
- Compute residual: actual spectrum minus smooth expected spectrum
- Inverse FFT of the residual → spectral residual signal
- High values in the residual indicate anomalies
Approach 2: Threshold on power spectrum
- Compute power spectrum with Welch's method (lower variance)
- Fit normal distribution to power at each frequency bin (from historical data)
- Flag bins where current power exceeds threshold
- Anomaly score = number of flagged bins or max z-score
Approach 3: Autoencoder in frequency domain
- Compute STFT → 2D spectrogram
- Train autoencoder on normal spectrograms
- Reconstruction error = anomaly score
Practical considerations:
- Use Welch's method (averaged periodogram) for stable power estimates
- Apply window functions to reduce spectral leakage before FFT
- For non-stationary signals, use STFT to get time-varying spectra
- Known fault frequencies (e.g., ball pass frequency for bearing faults) can be monitored directly
This approach is widely used in predictive maintenance (industrial motors, pumps, wind turbines) where bearing faults, unbalance, and misalignment each produce characteristic spectral signatures.
Key Takeaways
- The DFT decomposes a signal into frequency components; the FFT computes it in instead of
- The power spectrum shows which frequencies carry the most energy
- Spectral leakage is mitigated by windowing (Hann, Hamming, Blackman) before FFT
- STFT provides time-frequency analysis - essential for audio, speech, and non-stationary signals
- Fourier features (sin/cos of frequencies) provide continuous, periodic encodings of time - used in transformer positional encodings and seasonal feature engineering
- The convolution theorem enables convolution via FFT - relevant for large CNN kernels
- The Nyquist theorem: sample at or face aliasing; always use anti-aliasing filters before downsampling
Next: Lesson 04: ARIMA Models →
:::tip 🎮 Interactive Playground
Visualize this concept: Try the Fourier Transform demo on the EngineersOfAI Playground - no code required.
:::
