Skip to main content

:::tip ๐ŸŽฎ Interactive Playground Visualize this concept: Try the Spark Batch Processing demo on the EngineersOfAI Playground - no code required. :::

Data Engineering with Python

Reading time: 40โ€“45 min | Relevance: Very high for data engineers and ML engineers | Target roles: Data Engineer, Analytics Engineer, ML Engineer, AI Platform Engineer


The 500GB Problemโ€‹

It is 9 AM on a Monday and the data engineering team at a mid-size fintech has a new requirement: process 500 GB of daily transaction CSV files, extract features for their fraud detection model, and write the results to partitioned Parquet by 11 AM. The previous approach - loading everything into a Postgres instance, running SQL, exporting - was taking 6 hours. They could not afford a Spark cluster for this single daily job. They needed a Python-native solution that fit on a single machine.

The senior data engineer on the team, Priya, started where most engineers do: pd.read_csv("transactions.csv"). Python immediately threw MemoryError. The file was 500 GB. Her machine had 32 GB of RAM. Pandas was trying to load the entire file into memory as a DataFrame, where strings are stored as Python objects (about 50โ€“100 bytes per object overhead on top of the actual string data). A 500 GB CSV would expand to well over 1 TB in a naive pandas DataFrame.

Priya knew three things: first, pandas has chunksize for processing CSV files in pieces. Second, PyArrow can read Parquet files with zero-copy column access, so you only load the columns you need. Third, DuckDB can run SQL directly against Parquet files on disk without loading them into memory at all. By the end of the day, she had a pipeline that processed the 500 GB in 47 minutes using 8 GB of peak RAM on a single $50/month VM.

This lesson is about the Python data engineering toolkit that made that possible. Not pandas the tutorial tool, but pandas the production tool - with all the memory gotchas, dtype tricks, and escape hatches you need when data stops fitting in RAM. And the newer tools (DuckDB, Polars, PyArrow) that are replacing pandas for many ETL workloads entirely.


Why This Existsโ€‹

Python became the language of data engineering almost by accident. It was already the language of scientific computing (NumPy, SciPy) and machine learning (scikit-learn, TensorFlow). When data engineers needed to write pipeline glue code, they reached for the same language they already knew. Pandas, built on NumPy by Wes McKinney at AQR Capital in 2008, gave Python a table-like data structure that made data manipulation approachable.

But pandas was designed for the data that existed in 2008: tens of millions of rows that fit comfortably in RAM on a typical server. It uses a row-oriented memory layout (internally, NumPy arrays per column, but pandas operations are often row-wise), it stores strings as Python objects (not compact byte arrays), and it is single-threaded for most operations.

By 2018, "big data" had trickled down from FAANG to startups. Data engineers at mid-size companies were regularly dealing with datasets that did not fit in RAM on a single machine but were too small to justify a Spark cluster. The gap between "pandas works" and "you need Spark" was enormous.

Three tools filled this gap:

  1. PyArrow: A Python interface to Apache Arrow, which defines a columnar in-memory format for tabular data. Arrow is the interchange format - the common language between pandas, Spark, DuckDB, Parquet, and Polars. Zero-copy reads mean you can work with data in Parquet files without fully materializing it in Python.

  2. DuckDB: An embeddable, in-process analytical SQL engine. It can read Parquet, CSV, and JSON files directly with SQL queries, push predicate and projection filters down into the file reader, and use vectorized execution that is typically 10โ€“30x faster than pandas for analytical queries.

  3. Polars: A DataFrame library written in Rust, with a Python API. It uses the Arrow columnar format natively, has a lazy execution engine that plans queries before running them, and executes operations in parallel across CPU cores by default. For large datasets, Polars is typically 10โ€“100x faster than pandas.

Understanding when to use each tool - and when to combine them - is the core skill of modern Python data engineering.


Historical Contextโ€‹

pandas was created by Wes McKinney in 2008 while he was at AQR Capital Management, a hedge fund. He needed a tool for time-series financial data analysis. He open-sourced it in 2009, published the book "Python for Data Analysis" in 2012, and pandas became the foundational data tool for the Python ecosystem. By 2019 it had 5+ million users. Wes later went on to create Apache Arrow and co-found Ursa Computing.

Apache Arrow was announced by Wes McKinney and Hadley Wickham (of R's tidyverse fame) in 2016. The key insight was that different analytical tools (pandas, Spark, R, Julia) all had their own in-memory formats, and converting between them was expensive. Arrow defined a common columnar format that could be shared without copying - the "zero-copy" format. The project now underlies pandas 2.0, DuckDB, Polars, and many other tools.

DuckDB was created at CWI (Centrum Wiskunde and Informatica) in Amsterdam by Mark Raasveldt and Hannes Mรผhleisen, published in 2019. The designers drew on decades of database research to build an analytical engine optimized for modern hardware - vectorized execution, SIMD instructions, column-oriented storage. DuckDB runs entirely in-process (like SQLite but for analytics) with no server setup required. It has grown explosively: from niche research project to production data engineering tool used at companies like DoorDash, Stripe, and Microsoft.

Polars was created by Ritchie Vink in 2020. He was frustrated with pandas' performance and memory usage and wrote a new DataFrame library from scratch in Rust. Polars uses Arrow as its memory format, has a lazy API (similar to Spark's DataFrame API), and executes in parallel by default. It has become the fastest-growing DataFrame library in the Python ecosystem.


Core Conceptsโ€‹

Python's Role in Data Engineeringโ€‹

Python is the orchestration and transformation layer in modern data stacks. Specifically, Python handles:

  • Extract: reading from APIs, databases, files (CSV, JSON, Parquet, Avro)
  • Transform: cleaning, filtering, joining, aggregating, feature engineering
  • Load: writing to databases, data warehouses, object storage
  • Orchestration: defining pipeline dependencies (Airflow, Prefect, Dagster DAGs are all Python)
  • Testing: validating data quality, schema conformance, statistical properties

What Python does not handle well in its native forms: distributed processing of petabyte-scale data (that is Spark/Flink), sub-millisecond latency serving (that is C++/Go), or GPU-accelerated computation (that is CUDA directly, though cuDF brings GPU acceleration to pandas-like code).

pandas: The Indispensable Gotcha Machineโ€‹

Every data engineer uses pandas daily. Most data engineers have been bitten by the same set of pandas gotchas. Understanding the memory model is essential.

The object dtype trap: When pandas reads a CSV column that contains strings, it stores each string as a Python object in an object-dtype column. This means every string is a full Python str object on the heap - 50 bytes minimum plus the actual string bytes. For a column of 50 million short strings, this is 2.5 GB+ just in Python object overhead.

import pandas as pd
import numpy as np

# Typical pandas memory usage for strings vs category
data = {"country": ["US", "GB", "DE", "JP", "BR"] * 10_000_000} # 50M rows

df_object = pd.DataFrame(data)
print(f"object dtype: {df_object.memory_usage(deep=True)['country'] / 1e9:.2f} GB")

# Convert to category - stores unique values once, uses integer codes
df_object["country"] = df_object["country"].astype("category")
print(f"category dtype: {df_object.memory_usage(deep=True)['country'] / 1e6:.1f} MB")

# object: ~3.0 GB
# category: ~51 MB - about 60x smaller

The category dtype works by storing each unique value once and using integer codes to reference them. For a column with n distinct values across N rows, memory goes from O(Nโ‹…avg_string_length)O(N \cdot avg\_string\_length) to O(nโ‹…avg_string_length+Nโ‹…โŒˆlogโก2nโŒ‰/8)O(n \cdot avg\_string\_length + N \cdot \lceil \log_2 n \rceil / 8).

For a country column with 200 distinct country codes across 50 million rows, category uses about 50MB instead of 3GB.

Chunked processing for files larger than RAM:

import pandas as pd
from pathlib import Path

def process_large_csv(filepath: str, output_path: str) -> None:
"""Process a CSV larger than RAM using chunked reads."""
chunk_results = []

# Specify dtypes upfront - avoids pandas guessing wrong types
# and prevents memory waste from object columns
dtype_map = {
"user_id": "int64",
"transaction_amount": "float32", # float32 instead of float64 saves 50% memory
"country_code": "category",
"transaction_type": "category",
"merchant_id": "int32",
"is_fraud": "bool",
}

chunk_size = 500_000 # 500K rows per chunk, tune based on available RAM

for chunk_num, chunk in enumerate(
pd.read_csv(
filepath,
chunksize=chunk_size,
dtype=dtype_map,
parse_dates=["transaction_timestamp"],
# Low memory mode: sample dtype from file instead of full scan
low_memory=False,
)
):
# Transform this chunk
chunk = chunk[chunk["transaction_amount"] > 0] # filter negatives
chunk["hour_of_day"] = chunk["transaction_timestamp"].dt.hour
chunk["day_of_week"] = chunk["transaction_timestamp"].dt.dayofweek

# Aggregate per chunk
agg = chunk.groupby(["country_code", "transaction_type"]).agg(
total_amount=("transaction_amount", "sum"),
count=("transaction_amount", "count"),
fraud_count=("is_fraud", "sum"),
).reset_index()

chunk_results.append(agg)

if chunk_num % 10 == 0:
print(f"Processed chunk {chunk_num}, {chunk_num * chunk_size:,} rows")

# Combine chunk aggregates and re-aggregate
combined = pd.concat(chunk_results, ignore_index=True)
final = combined.groupby(["country_code", "transaction_type"]).sum().reset_index()
final["fraud_rate"] = final["fraud_count"] / final["count"]

# Write to Parquet (much smaller than CSV, schema preserved)
final.to_parquet(output_path, index=False)
print(f"Wrote {len(final):,} rows to {output_path}")

Vectorization vs apply: df.apply(func, axis=1) iterates row by row in Python - it is essentially a Python for loop. For any operation that can be expressed as vectorized numpy operations, apply is 10โ€“100x slower.

import pandas as pd
import numpy as np
import time

df = pd.DataFrame({
"amount": np.random.uniform(0, 1000, 1_000_000),
"fee_rate": np.random.uniform(0.001, 0.05, 1_000_000),
})

# BAD: Python-loop apply
start = time.perf_counter()
df["fee_slow"] = df.apply(lambda row: row["amount"] * row["fee_rate"], axis=1)
slow_time = time.perf_counter() - start
print(f"apply: {slow_time:.2f}s") # ~3.5s for 1M rows

# GOOD: vectorized operation
start = time.perf_counter()
df["fee_fast"] = df["amount"] * df["fee_rate"]
fast_time = time.perf_counter() - start
print(f"vectorized: {fast_time:.4f}s") # ~0.005s for 1M rows
print(f"Speedup: {slow_time / fast_time:.0f}x") # ~700x faster

The rule: if you find yourself writing df.apply(lambda row: ..., axis=1), stop and think about how to express it as column operations. 95% of the time, there is a vectorized equivalent.

PyArrow: The Columnar Foundationโ€‹

PyArrow is the Python binding for Apache Arrow, the columnar in-memory format. It is the bridge between everything: pandas uses Arrow internally (pandas 2.0+), Parquet files are built on Arrow schema definitions, DuckDB uses Arrow for data exchange, and Spark can read/write Arrow format via Apache Arrow Flight.

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc
import pandas as pd

# Read Parquet with column projection - only loads needed columns
# No full file scan, no unnecessary memory usage
table = pq.read_table(
"transactions.parquet",
columns=["user_id", "transaction_amount", "country_code", "is_fraud"],
# Push filters down into the Parquet reader - skips entire row groups
filters=[
("transaction_amount", ">", 0),
("country_code", "in", ["US", "GB", "DE"])
]
)

print(f"Loaded {table.num_rows:,} rows, {table.num_columns} columns")
print(f"Schema: {table.schema}")

# Arrow compute operations - fast, no Python loop
total_by_country = (
table
.group_by("country_code")
.aggregate([
("transaction_amount", "sum"),
("is_fraud", "sum"),
("user_id", "count_distinct")
])
)

# Convert to pandas only when needed (e.g., for matplotlib, scikit-learn)
df = total_by_country.to_pandas()

# RecordBatch processing - streaming large files chunk by chunk
with pq.open_file("large_transactions.parquet") as pf:
for batch_num in range(pf.num_row_groups):
batch = pf.read_row_group(
batch_num,
columns=["user_id", "transaction_amount"]
)
# batch is an Arrow Table - process without loading full file
amounts = batch.column("transaction_amount")
total = pc.sum(amounts).as_py()
print(f"Row group {batch_num}: sum={total:,.2f}")

# Schema enforcement at read time
expected_schema = pa.schema([
("user_id", pa.int64()),
("transaction_amount", pa.float64()),
("country_code", pa.string()),
("is_fraud", pa.bool_()),
])

# This will raise if schema does not match
try:
table_validated = pq.read_table(
"transactions.parquet",
schema=expected_schema
)
except Exception as e:
print(f"Schema mismatch: {e}")

Zero-copy from Parquet: when you call pq.read_table(), Arrow reads the Parquet file into Arrow format. If you then call .to_pandas(), pandas 2.0+ can use the Arrow memory directly without copying (for most types). The pa.Table object and the resulting DataFrame point to the same memory. For read-heavy workloads this eliminates a major memory spike.

DuckDB: In-Process Analytical SQLโ€‹

DuckDB is the most underrated tool in the Python data engineering toolkit. It is an embedded SQL engine - no server to spin up, no JDBC drivers, no connection pooling. Just import duckdb and start running analytical SQL against files on disk.

import duckdb
import time

# DuckDB can query Parquet files directly
# No loading into memory first
conn = duckdb.connect()

# Read Parquet, filter, aggregate - all pushed down to the file reader
start = time.perf_counter()
result = conn.execute("""
SELECT
country_code,
transaction_type,
SUM(transaction_amount) AS total_amount,
COUNT(*) AS num_transactions,
SUM(is_fraud::INTEGER) AS fraud_count,
AVG(transaction_amount) AS avg_amount
FROM read_parquet('transactions/*.parquet', hive_partitioning = true)
WHERE transaction_amount > 0
AND transaction_timestamp >= '2024-01-01'
GROUP BY country_code, transaction_type
ORDER BY total_amount DESC
""").df() # .df() returns a pandas DataFrame
elapsed = time.perf_counter() - start
print(f"DuckDB query: {elapsed:.2f}s, {len(result):,} rows")

# Read CSV directly - no pandas read_csv needed
conn.execute("""
CREATE TABLE transactions AS
SELECT * FROM read_csv_auto('transactions.csv',
dtypes={'user_id': 'BIGINT', 'amount': 'DOUBLE'},
header=true
)
""")

# COPY to partitioned Parquet - one DuckDB statement
conn.execute("""
COPY (
SELECT
user_id,
transaction_amount,
country_code,
DATE_TRUNC('day', transaction_timestamp) AS date_partition,
EXTRACT(hour FROM transaction_timestamp) AS hour_of_day,
is_fraud
FROM transactions
WHERE transaction_amount > 0
)
TO 'output/'
(FORMAT PARQUET, PARTITION_BY (date_partition, country_code),
COMPRESSION SNAPPY, OVERWRITE_OR_IGNORE TRUE)
""")
print("Written partitioned Parquet output")

# Query JSON directly
conn.execute("""
SELECT
json_extract_string(payload, '$.user_id') AS user_id,
json_extract_string(payload, '$.event_type') AS event_type,
COUNT(*) as cnt
FROM read_json_auto('events/*.json')
GROUP BY 1, 2
""").df()

# DuckDB integrates with PyArrow
import pyarrow.parquet as pq
arrow_table = pq.read_table("transactions.parquet")
# Register Arrow table - zero copy, DuckDB reads Arrow memory directly
conn.register("arrow_transactions", arrow_table)
result = conn.execute("SELECT COUNT(*) FROM arrow_transactions").fetchone()
print(f"Row count: {result[0]:,}")

When to use DuckDB vs pandas:

  • DuckDB: aggregations, joins, window functions, filtering, group-by - any SQL-expressible transformation on large datasets
  • pandas: complex row-wise transformations, integration with scikit-learn/matplotlib, when you need the rich pandas ecosystem

For most ETL pipelines, the pattern is: DuckDB for the heavy lifting, pandas for the final output or complex transformations.

Polars: Lazy DataFrames at Rust Speedโ€‹

Polars is a Rust-native DataFrame library with a Python API. Its two key advantages over pandas:

  1. Lazy evaluation: you build a query plan, Polars optimizes it (predicate pushdown, projection pushdown, constant folding), then executes it. Similar to Spark's DataFrame API.
  2. Parallel execution: Polars uses all CPU cores by default for most operations. Pandas is single-threaded.
import polars as pl
import time

# Eager API (like pandas - immediate execution)
df = pl.read_parquet("transactions.parquet")

# Lazy API - recommended for large datasets
query = (
pl.scan_parquet("transactions/*.parquet") # scan = lazy, no I/O yet
.filter(pl.col("transaction_amount") > 0)
.filter(pl.col("transaction_timestamp") >= pl.lit("2024-01-01").str.to_date())
.with_columns([
(pl.col("transaction_amount") * 0.02).alias("fee_amount"),
pl.col("transaction_timestamp").dt.hour().alias("hour_of_day"),
pl.col("transaction_timestamp").dt.weekday().alias("day_of_week"),
])
.group_by(["country_code", "transaction_type", "hour_of_day"])
.agg([
pl.col("transaction_amount").sum().alias("total_amount"),
pl.col("transaction_amount").count().alias("num_transactions"),
pl.col("is_fraud").sum().alias("fraud_count"),
pl.col("user_id").n_unique().alias("unique_users"),
])
.sort("total_amount", descending=True)
)

# .explain() shows the optimized query plan without executing
print(query.explain())

# .collect() executes - Polars applies predicate/projection pushdown
start = time.perf_counter()
result = query.collect() # returns a Polars DataFrame
elapsed = time.perf_counter() - start
print(f"Polars lazy query: {elapsed:.2f}s, {result.shape[0]:,} rows")

# Polars expression API - more explicit than pandas method chaining
result = result.with_columns([
(pl.col("fraud_count") / pl.col("num_transactions")).alias("fraud_rate"),
pl.col("total_amount").rank(descending=True).over("country_code").alias("rank_in_country"),
])

# Convert to pandas when needed for downstream ML tools
df_pandas = result.to_pandas()

# Polars streaming mode - process datasets larger than RAM
# Using streaming=True, Polars processes the LazyFrame in chunks
streaming_result = (
pl.scan_csv("huge_file.csv")
.filter(pl.col("amount") > 100)
.group_by("category")
.agg(pl.col("amount").sum())
.collect(streaming=True) # processes without loading full file
)

Polars vs pandas performance: For aggregations and group-by on datasets with millions of rows, Polars is typically 10โ€“50x faster than pandas. The gap widens with dataset size and operation complexity. For small DataFrames (under 100K rows), the difference is negligible. The break-even point where switching to Polars pays off is around 1โ€“5 million rows for typical data engineering operations.


Full ETL Pipeline: DuckDB + Partitioned Parquetโ€‹

This is the complete pipeline that solved Priya's 500GB CSV problem. It reads a large dataset from CSV, transforms it, and writes partitioned Parquet - using DuckDB throughout.

import duckdb
import os
import time
from pathlib import Path
from datetime import datetime

def run_fraud_feature_pipeline(
input_dir: str,
output_dir: str,
start_date: str,
end_date: str
) -> dict:
"""
Full ETL pipeline using DuckDB:
1. Read partitioned Parquet source (or CSV with read_csv_auto)
2. Compute fraud-detection features
3. Write daily-partitioned Parquet output

Returns: pipeline run metrics
"""
start_time = time.perf_counter()
Path(output_dir).mkdir(parents=True, exist_ok=True)

# DuckDB in-process - no server, no setup
# Use in-memory mode for pure transformation pipelines
conn = duckdb.connect(":memory:")

# Enable progress bar for long-running queries
conn.execute("PRAGMA enable_progress_bar;")

# Set memory limit - prevents OOM on shared machines
available_gb = 8
conn.execute(f"SET memory_limit = '{available_gb}GB';")
conn.execute("SET threads = 8;") # use 8 CPU cores

# Step 1: Register source data
# DuckDB reads Parquet natively with predicate pushdown
conn.execute(f"""
CREATE VIEW raw_transactions AS
SELECT * FROM read_parquet(
'{input_dir}/**/*.parquet',
hive_partitioning = TRUE
)
WHERE transaction_date BETWEEN '{start_date}' AND '{end_date}'
""")

row_count = conn.execute("SELECT COUNT(*) FROM raw_transactions").fetchone()[0]
print(f"Source rows: {row_count:,}")

# Step 2: Data quality checks before transformation
null_counts = conn.execute("""
SELECT
COUNT(*) FILTER (WHERE user_id IS NULL) AS null_user_ids,
COUNT(*) FILTER (WHERE amount IS NULL) AS null_amounts,
COUNT(*) FILTER (WHERE amount < 0) AS negative_amounts,
COUNT(*) FILTER (WHERE merchant_id IS NULL) AS null_merchants
FROM raw_transactions
""").fetchdf()

print("Data quality check:")
print(null_counts.to_string(index=False))

# Step 3: Feature engineering
conn.execute("""
CREATE VIEW features AS
SELECT
t.user_id,
t.merchant_id,
t.amount,
t.transaction_timestamp,
DATE(t.transaction_timestamp) AS transaction_date,
EXTRACT(hour FROM t.transaction_timestamp) AS hour_of_day,
EXTRACT(dow FROM t.transaction_timestamp) AS day_of_week,
CASE
WHEN EXTRACT(dow FROM t.transaction_timestamp) IN (0, 6) THEN 1
ELSE 0
END AS is_weekend,
t.country_code,
t.transaction_type,
t.is_fraud,

-- User aggregates (window functions)
COUNT(*) OVER (
PARTITION BY t.user_id
ORDER BY t.transaction_timestamp
ROWS BETWEEN 23 PRECEDING AND CURRENT ROW
) AS user_txn_count_24h,

SUM(t.amount) OVER (
PARTITION BY t.user_id
ORDER BY t.transaction_timestamp
ROWS BETWEEN 23 PRECEDING AND CURRENT ROW
) AS user_amount_sum_24h,

AVG(t.amount) OVER (
PARTITION BY t.user_id
ORDER BY t.transaction_timestamp
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS user_avg_amount_7txn,

-- Merchant aggregates
COUNT(*) OVER (
PARTITION BY t.merchant_id
ORDER BY t.transaction_timestamp
ROWS BETWEEN 59 PRECEDING AND CURRENT ROW
) AS merchant_txn_count_1h,

-- Amount relative to user history
t.amount / NULLIF(
AVG(t.amount) OVER (
PARTITION BY t.user_id
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
), 0
) AS amount_vs_user_avg

FROM raw_transactions t
WHERE t.amount > 0
""")

# Step 4: Write partitioned Parquet
conn.execute(f"""
COPY (
SELECT * FROM features
ORDER BY transaction_date, user_id
)
TO '{output_dir}'
(FORMAT PARQUET,
PARTITION_BY (transaction_date, country_code),
COMPRESSION SNAPPY,
ROW_GROUP_SIZE 100000,
OVERWRITE_OR_IGNORE TRUE)
""")

elapsed = time.perf_counter() - start_time

# Step 5: Verify output
output_stats = conn.execute(f"""
SELECT
COUNT(*) AS output_rows,
COUNT(DISTINCT transaction_date) AS partitions,
SUM(is_fraud::INTEGER) AS fraud_labels
FROM read_parquet('{output_dir}/**/*.parquet', hive_partitioning = TRUE)
""").fetchdf()

conn.close()

return {
"input_rows": row_count,
"output_rows": int(output_stats["output_rows"][0]),
"output_partitions": int(output_stats["partitions"][0]),
"fraud_labels": int(output_stats["fraud_labels"][0]),
"elapsed_seconds": elapsed,
"rows_per_second": row_count / elapsed,
}


if __name__ == "__main__":
metrics = run_fraud_feature_pipeline(
input_dir="s3://my-data-lake/transactions/raw/",
output_dir="s3://my-data-lake/transactions/features/",
start_date="2024-01-01",
end_date="2024-01-31",
)
print(f"\nPipeline complete:")
print(f" Input rows: {metrics['input_rows']:,}")
print(f" Output rows: {metrics['output_rows']:,}")
print(f" Partitions: {metrics['output_partitions']}")
print(f" Elapsed: {metrics['elapsed_seconds']:.1f}s")
print(f" Throughput: {metrics['rows_per_second']:,.0f} rows/s")

Code: pandas Memory Optimizationโ€‹

import pandas as pd
import numpy as np

def optimize_dataframe_memory(df: pd.DataFrame) -> pd.DataFrame:
"""
Reduce DataFrame memory usage using dtype optimization.
Typical reduction: 50-80% smaller.
"""
start_mem = df.memory_usage(deep=True).sum() / 1024**3

for col in df.columns:
col_type = df[col].dtype

if col_type == "object":
# Check cardinality - if < 50% unique values, use category
n_unique = df[col].nunique()
n_total = len(df)
if n_unique / n_total < 0.5:
df[col] = df[col].astype("category")
print(f" {col}: object -> category ({n_unique} unique values)")
# else: keep as object (high-cardinality strings like IDs)

elif col_type in ["float64"]:
# Downcast float64 to float32 if values fit
col_min = df[col].min()
col_max = df[col].max()
if (col_min > np.finfo(np.float32).min and
col_max < np.finfo(np.float32).max):
df[col] = df[col].astype("float32")
print(f" {col}: float64 -> float32")

elif col_type in ["int64"]:
# Downcast int64 to smallest fitting integer type
col_min = df[col].min()
col_max = df[col].max()

if col_min >= 0: # unsigned integers
if col_max < 255:
df[col] = df[col].astype("uint8")
elif col_max < 65535:
df[col] = df[col].astype("uint16")
elif col_max < 4294967295:
df[col] = df[col].astype("uint32")
else: # signed integers
if col_min > -128 and col_max < 127:
df[col] = df[col].astype("int8")
elif col_min > -32768 and col_max < 32767:
df[col] = df[col].astype("int16")
elif col_min > -2147483648 and col_max < 2147483647:
df[col] = df[col].astype("int32")

end_mem = df.memory_usage(deep=True).sum() / 1024**3
reduction = (start_mem - end_mem) / start_mem * 100
print(f"\nMemory: {start_mem:.2f} GB -> {end_mem:.2f} GB ({reduction:.1f}% reduction)")
return df


# Example: loading a 10GB CSV with dtype optimization
dtype_overrides = {
"user_id": "int32", # 4 bytes vs 8 bytes for int64
"merchant_id": "int32",
"amount": "float32", # 4 bytes vs 8 bytes for float64
"country_code": "category", # ~50 bytes/row -> ~1 byte/row
"transaction_type": "category",
"hour_of_day": "uint8", # 0-23, fits in 1 byte
"day_of_week": "uint8", # 0-6, fits in 1 byte
"is_fraud": "bool", # 1 byte vs 8 bytes for int64
}

df = pd.read_csv(
"transactions.csv",
dtype=dtype_overrides,
# Avoid parsing all dates upfront - do it after filtering
parse_dates=False,
)
print(f"RAM after optimized load: {df.memory_usage(deep=True).sum() / 1e9:.2f} GB")
# Typically 2-3GB for a dataset that naive loading would use 10GB+

Code: Pipeline Testing with panderaโ€‹

import pandas as pd
import pandera as pa
from pandera import Column, DataFrameSchema, Check
from pandera.typing import DataFrame, Series
import pandera.typing as pat
from typing import Optional

# Define schema for the raw transaction input
raw_transaction_schema = DataFrameSchema(
columns={
"user_id": Column(
pa.Int64,
checks=[
Check.greater_than(0, error="user_id must be positive"),
Check(lambda x: x < 2**31, error="user_id too large for int32"),
],
nullable=False,
),
"transaction_amount": Column(
pa.Float64,
checks=[
Check.greater_than(0, error="amounts must be positive"),
Check.less_than(1_000_000, error="suspicious: amount over $1M"),
],
nullable=False,
),
"country_code": Column(
pa.String,
checks=[
Check(lambda x: x.str.len() == 2, error="country_code must be 2 chars"),
Check(lambda x: x.str.isupper(), error="country_code must be uppercase"),
],
nullable=False,
),
"transaction_type": Column(
pa.String,
checks=[
Check.isin(
["purchase", "refund", "transfer", "withdrawal"],
error="invalid transaction_type"
),
],
nullable=False,
),
"is_fraud": Column(pa.Bool, nullable=False),
"merchant_id": Column(pa.Int64, nullable=True),
"transaction_timestamp": Column(pa.DateTime, nullable=False),
},
index=pa.Index(pa.Int64),
strict=False, # allow extra columns not in schema
coerce=True, # attempt type coercion before validation
)

# Define schema for the features output
features_schema = DataFrameSchema(
columns={
"user_id": Column(pa.Int64, nullable=False),
"transaction_amount": Column(pa.Float64, Check.greater_than(0)),
"hour_of_day": Column(
pa.Int64,
checks=Check.in_range(0, 23),
nullable=False
),
"day_of_week": Column(
pa.Int64,
checks=Check.in_range(0, 6),
nullable=False
),
"user_txn_count_24h": Column(pa.Int64, Check.greater_than_or_equal_to(1)),
"user_amount_sum_24h": Column(pa.Float64, Check.greater_than(0)),
"fraud_rate_7d": Column(
pa.Float64,
checks=[
Check.greater_than_or_equal_to(0),
Check.less_than_or_equal_to(1),
],
nullable=True # null for users with no 7d history
),
},
# Statistical checks at the DataFrame level
checks=[
# Overall fraud rate should be between 0.5% and 5%
Check(
lambda df: 0.005 <= df["is_fraud"].mean() <= 0.05,
error="Fraud rate outside expected range [0.5%, 5%]"
),
]
)


# Using pandera with pytest
import pytest

def test_raw_transaction_schema():
"""Test that raw transaction data meets schema requirements."""
valid_data = pd.DataFrame({
"user_id": [1001, 1002, 1003],
"transaction_amount": [45.99, 1200.00, 7.50],
"country_code": ["US", "GB", "DE"],
"transaction_type": ["purchase", "transfer", "purchase"],
"is_fraud": [False, False, True],
"merchant_id": [500, None, 501],
"transaction_timestamp": pd.to_datetime([
"2024-01-15 10:30:00",
"2024-01-15 11:00:00",
"2024-01-15 11:05:00",
]),
})

# Should pass without raising
validated = raw_transaction_schema.validate(valid_data)
assert len(validated) == 3


def test_invalid_country_code_rejected():
"""Lowercase country codes should be rejected."""
bad_data = pd.DataFrame({
"user_id": [1001],
"transaction_amount": [45.99],
"country_code": ["us"], # lowercase - invalid
"transaction_type": ["purchase"],
"is_fraud": [False],
"merchant_id": [500],
"transaction_timestamp": pd.to_datetime(["2024-01-15 10:30:00"]),
})

with pytest.raises(pa.errors.SchemaError) as exc_info:
raw_transaction_schema.validate(bad_data)

assert "country_code must be uppercase" in str(exc_info.value)


def test_negative_amounts_rejected():
"""Negative transaction amounts should fail validation."""
bad_data = pd.DataFrame({
"user_id": [1001],
"transaction_amount": [-50.00], # negative - invalid
"country_code": ["US"],
"transaction_type": ["refund"],
"is_fraud": [False],
"merchant_id": [None],
"transaction_timestamp": pd.to_datetime(["2024-01-15 10:30:00"]),
})

with pytest.raises(pa.errors.SchemaError):
raw_transaction_schema.validate(bad_data)

Tool Ecosystem Diagramโ€‹


YouTube Resourcesโ€‹

TitleChannelWhy Watch
DuckDB - The Indispensable Tool for Data EngineersData with MarcPractical DuckDB from zero to production, covers Parquet and S3
Polars vs Pandas - Which One Should You Use?Rob MullaSide-by-side performance comparison with real datasets
PyArrow Tutorial for Data EngineersCoding is FunPyArrow's columnar format, zero-copy, and Parquet integration
pandas Memory Optimization TricksPython EngineerDtypes, categories, chunking - practical memory reduction
Data Pipeline Testing with panderaDataTalksClubSchema validation and statistical checks in production pipelines

Production Engineering Notesโ€‹

When to Reach for Each Toolโ€‹

The decision tree is not about preference - it is about the nature of the transformation and the size of the data:

  • Under 1M rows: pandas. Simple, fast enough, everyone knows it.
  • 1Mโ€“100M rows, SQL-expressible transform: DuckDB. Dramatically faster than pandas for aggregations, joins, and window functions. Reads Parquet natively.
  • 1Mโ€“100M rows, complex row-wise transform: Polars lazy API. Parallel execution, lazy optimization.
  • 100M+ rows, single machine: DuckDB with streaming, or Polars streaming mode.
  • Terabytes across multiple machines: Spark or Dask. Do not fight single-machine tools for distributed workloads.

PyArrow as the Lingua Francaโ€‹

Every modern Python data tool speaks Arrow. pandas 2.0 can use Arrow as its backend (pd.DataFrame(..., dtype_backend="pyarrow")), which stores string columns as compact byte arrays instead of Python objects. Polars is built on Arrow. DuckDB exchanges data with Python via Arrow. Parquet files are Arrow on disk. If you learn one low-level format deeply, make it PyArrow.

DuckDB's Memory Budgetโ€‹

DuckDB respects SET memory_limit = '8GB' - it will spill to disk rather than OOM. For pipelines running on shared infrastructure, always set an explicit memory limit. The default is 80% of available RAM, which can starve other processes.

pandas apply() Performance Cliffโ€‹

The performance difference between apply() and vectorized operations is not linear - it is an orders-of-magnitude cliff. apply() with axis=1 calls the Python function once per row, with all the overhead of Python function call dispatch, GIL acquisition, and Python object creation. For 10M rows, this is 10M Python function calls. A vectorized NumPy operation on the same column runs in compiled C code on the entire array at once. If you are calling apply() on more than 100K rows, there is almost certainly a better way.


Common Mistakesโ€‹

:::danger Loading entire files into pandas before filtering

# WRONG: loads entire 100GB file into memory
df = pd.read_csv("huge.csv")
df_filtered = df[df["country_code"] == "US"]

# RIGHT: use DuckDB to filter at the source
import duckdb
df_filtered = duckdb.query("""
SELECT * FROM read_csv_auto('huge.csv')
WHERE country_code = 'US'
""").df()
# DuckDB reads only the rows that match the filter

Never read a large file into memory just to filter it. Push filters to the source using DuckDB, Parquet row group filtering, or pd.read_csv with chunksize + early filtering. :::

:::danger Using pandas string columns at scale without category conversion A DataFrame with 50 million rows and 5 string columns stored as object dtype will use 10โ€“15 GB of RAM for the string columns alone, due to Python object overhead. For any string column with fewer than 50% unique values (status codes, country codes, categories, types), convert to category dtype immediately after loading. The memory reduction is typically 50โ€“100x. :::

:::warning Chaining pandas operations without thinking about copies

# This creates TWO full copies of the DataFrame
df_processed = df[df["amount"] > 0].reset_index(drop=True)

# This modifies in place - use when you don't need the original
df.query("amount > 0", inplace=True)
df.reset_index(drop=True, inplace=True)

# Or use method chaining carefully
# pandas 2.0 uses Copy-on-Write (CoW) to reduce unnecessary copies

In pandas 2.0+, Copy-on-Write semantics reduce implicit copies. But chaining still creates intermediate DataFrames. For large datasets, use inplace=True or switch to DuckDB/Polars which do not have this problem. :::

:::warning Polars lazy evaluation: forgetting .collect()

# This builds a query plan but does NOT execute - result is a LazyFrame
result = pl.scan_parquet("data.parquet").filter(pl.col("amount") > 0)
print(result) # prints query plan, not data!

# You must call .collect() to actually execute
result = pl.scan_parquet("data.parquet").filter(pl.col("amount") > 0).collect()
print(result) # now prints the actual DataFrame

The lazy API is Polars' superpower but also its most common footgun. If you do not see data when you expect to, check that you called .collect(). :::

:::warning pandera validation on production DataFrames is not free Schema validation with pandera iterates over rows to check constraints. For DataFrames with 10M+ rows, validation can add 30โ€“60 seconds to your pipeline. Profile validation separately and consider:

  1. Validating a sample (e.g., 1% of rows) for statistical checks
  2. Running full validation only in staging/CI, not production
  3. Using Arrow schema enforcement at read time for type checks, and pandera only for business logic checks :::

Interview Q&Aโ€‹

Q1: You need to process 500GB of CSV files daily on a single machine with 32GB RAM. Walk through your complete approach.

The key insight is that you never load 500GB into memory. The pipeline breaks into three stages:

Stage 1 - Understand the data (no processing yet): Sample 0.1% of each file to understand schema, cardinalities, and distributions. This lets you set appropriate dtypes before loading anything.

Stage 2 - Use DuckDB for the heavy lifting: DuckDB can read CSV files directly with SQL, applying predicate and projection pushdown. It will use your 32GB RAM as a buffer and spill to disk for operations that exceed memory. For most ETL transformations (filters, joins, aggregations, window functions), DuckDB is the right tool. Set SET memory_limit = '24GB' to leave 8GB for the OS and other processes.

import duckdb
conn = duckdb.connect()
conn.execute("SET memory_limit = '24GB'; SET threads = 8;")
conn.execute("""
COPY (
SELECT user_id, amount, country_code, ...features...
FROM read_csv_auto('data/*.csv')
WHERE amount > 0
)
TO 'output/' (FORMAT PARQUET, PARTITION_BY (date), COMPRESSION SNAPPY)
""")

Stage 3 - Validate and monitor: Use pandera to validate a sample of the output. Write pipeline metrics (row counts, null rates, fraud rates) to a monitoring database.

The result: 500GB of CSV โ†’ DuckDB SQL transformation โ†’ partitioned Parquet output, typically in 45โ€“90 minutes on an 8-core machine with 32GB RAM.

Q2: What is the difference between pandas apply(axis=1) and a vectorized operation? Give a concrete example where you replaced apply with something faster.

apply(axis=1) is a Python for loop hidden behind a DataFrame method. For each row, Python calls your lambda function, passes a Series object (the row), and waits for the result. The overhead is: Python function call, GIL acquisition, Series object construction, attribute access via Python dict. For 10 million rows, this is 10 million Python function calls.

Vectorized operations pass the entire column (a NumPy array or Arrow array) to a compiled C/Fortran/Rust function that processes all elements in a tight loop with SIMD instructions. No Python interpreter involvement per element.

Concrete example: Computing a time-weighted transaction score.

# BEFORE: apply (takes ~8 seconds for 5M rows)
df["score"] = df.apply(
lambda row: row["amount"] * 0.3 + row["frequency"] * 0.5 + row["recency"] * 0.2,
axis=1
)

# AFTER: vectorized (takes ~0.01 seconds for 5M rows)
df["score"] = df["amount"] * 0.3 + df["frequency"] * 0.5 + df["recency"] * 0.2

The speedup is 800x here. The general rule: if you can express the operation as arithmetic or comparison on columns, do it that way. Use np.where for conditional logic, pd.cut/pd.qcut for binning, and string methods for text operations - all vectorized.

Q3: Explain DuckDB's predicate pushdown and why it matters for Parquet files.

Parquet files are organized into row groups (typically 100Kโ€“1M rows each). Each row group stores column statistics: min, max, and null count. When DuckDB (or any Parquet reader) sees a filter like WHERE country_code = 'US', it checks the row group statistics: if the min and max values for country_code in a row group exclude 'US', the entire row group is skipped without reading its data.

This is predicate pushdown: the filter predicate is pushed down into the file reader before any data is loaded into memory.

For a Parquet file with 1 billion rows in 1000 row groups, if only 100 row groups contain US transactions, DuckDB reads 10% of the data. The savings multiply: less I/O, less memory, less CPU for deserialization.

Projection pushdown works similarly: SELECT user_id, amount FROM ... tells the Parquet reader to load only those two columns. Parquet is columnar - each column is stored contiguously on disk, so reading 2 of 20 columns reads roughly 10% of the data.

The practical implication: column selection and row filters are nearly free in DuckDB. Do not be afraid to write wide SELECT queries with many filters - they are efficient. What is expensive is reading more columns or rows than you need.

Q4: What is Polars lazy evaluation and how does it improve performance over eager execution?

In eager execution (pandas, or pl.read_parquet()), every operation executes immediately and materializes a full DataFrame. A five-step chain creates five intermediate DataFrames in memory.

In lazy execution (pl.scan_parquet()), each operation adds a node to a query plan. No data is read or processed until .collect() is called. When you call .collect(), Polars' query optimizer applies several transformations:

  1. Predicate pushdown: .filter() operations are moved as early as possible in the plan - ideally into the file reader, so only matching rows are loaded.
  2. Projection pushdown: unused columns are never loaded.
  3. Constant folding: expressions that can be computed at plan time are precomputed.
  4. Parallelism: independent branches of the query are executed on separate CPU cores.

The result: for a scan โ†’ filter โ†’ group_by โ†’ sort query, Polars may read only 20% of the file (predicate pushdown), use only the needed columns (projection pushdown), and run the aggregation in parallel across 8 cores. The equivalent pandas pipeline would read the entire file, store it in memory, then apply each operation sequentially.

Q5: How do you test a data pipeline's correctness? What is pandera and how does it fit in?

Data pipeline testing has three levels:

Unit tests: test individual transformation functions in isolation, with small synthetic DataFrames. These are fast and catch logic bugs.

Schema validation tests: verify that input and output DataFrames have the correct types, value ranges, null constraints, and uniqueness requirements. This is where pandera shines. You define a DataFrameSchema once and call .validate() in your pipeline or tests. Pandera raises a SchemaError with a clear message (which column, which rows, which check) when validation fails.

Statistical tests: verify that the statistical properties of the output match expectations - fraud rate in [0.5%, 5%], amount distribution has no sudden shift, null rates below threshold. These catch data drift, upstream schema changes, and silent data corruption.

In a production pipeline, the pattern is:

  1. pandera.validate(raw_schema) on ingested data - catch bad upstream data early
  2. Transform
  3. pandera.validate(output_schema) on the output - catch transformation bugs
  4. Write statistical metrics to a monitoring system

Pandera integrates with pytest naturally - schemas raise exceptions that pytest catches, giving you line-level error messages in CI failures.

Q6: When would you use Polars instead of DuckDB? They both seem to do the same thing.

They overlap significantly, but the differences matter:

Use DuckDB when:

  • Your transformation is expressible in SQL (aggregations, joins, window functions, pivots)
  • You need to read from many file formats (CSV, JSON, Parquet, Avro) and write to many formats
  • You want to query remote data (DuckDB has httpfs for S3, GCS)
  • You need to integrate with existing SQL-based tooling

Use Polars when:

  • You need complex row-wise transformations that are awkward in SQL (custom Python functions, ML feature engineering with conditionals)
  • You need the DataFrame API for integration with pandas-based downstream tools
  • You want fine-grained control over column operations via the Expression API
  • You are coming from a pandas background and want a drop-in upgrade for performance

In practice, many engineers use both: DuckDB for the SQL-expressible heavy lifting (reads files, aggregates, joins, writes Parquet), Polars for complex transformations where the Expression API is more natural. DuckDB can exchange data with Polars via Arrow with zero copy.


Summaryโ€‹

Python data engineering has evolved from pandas-for-everything to a rich ecosystem of specialized tools. The key mental model: choose your tool based on the data size and the nature of the transformation. Pandas for small datasets and ML integration. DuckDB for SQL-expressible transformations on medium-to-large datasets. Polars for large datasets where you need the DataFrame API and maximum single-machine performance. PyArrow as the columnar glue between all of them.

Memory discipline matters as much as tool choice. Explicitly specifying dtypes when reading, converting string columns to category, using chunked reads for files larger than RAM, and pushing filters to the source - these practices can reduce memory usage by 5โ€“10x and eliminate the most common class of production data pipeline failures: out-of-memory errors.

Test your pipelines with pandera. Untested pipelines fail silently on bad data, and silent failures in ML feature pipelines are the worst kind - they produce models that appear to train correctly but make predictions on corrupted features.

ยฉ 2026 EngineersOfAI. All rights reserved.