Skip to main content

Project 02 - Lazy Evaluation Pipeline

Estimated time: 5–8 hours core | Level: Intermediate → Advanced

Before reading the requirements, consider this question: what is the difference between [x * 2 for x in range(1_000_000)] and (x * 2 for x in range(1_000_000))? The list materialises one million integers in memory immediately. The generator produces them one at a time, on demand, consuming O(1) memory regardless of the input size. This project is about building an entire data processing system on that second property.

Learning Objectives

By the time this project is complete, you will have practiced:

  • Implementing generators that wrap other generators (generator chaining)
  • Building a fluent interface where each operation returns a new, modified object
  • Reasoning about which operations can be lazy and which require buffering
  • Handling multiple input formats (CSV, JSONL, arbitrary iterables) uniformly
  • Understanding the __iter__ and __next__ protocol
  • Documenting architectural tradeoffs explicitly - group_by cannot be lazy; you must explain why

System Overview

You are building a Pipeline class that wraps a data source and a sequence of transformations. No transformation executes until the pipeline is iterated. When iteration begins, data flows through the chain of generators one record at a time. The pipeline never holds more than a constant number of records in memory (except for group_by, which buffers by group key - documented as a known tradeoff).

The target API:

pipeline = (
Pipeline.from_file("huge.csv")
.filter(lambda row: row["age"] > 18)
.map(lambda row: {**row, "age_group": age_group(row["age"])})
.group_by("age_group")
.aggregate(count=len, avg_age=lambda rows: sum(r["age"] for r in rows) / len(rows))
.limit(10)
)

for result in pipeline:
print(result)

Requirements

R1 - Pipeline class structure

  • Pipeline wraps a callable or iterable source and a list of transformation steps.
  • Each operation method (filter, map, group_by, aggregate, limit) returns a new Pipeline instance. The original Pipeline is not mutated.
  • Pipeline implements __iter__ - it is iterable. Each time __iter__ is called, it starts a fresh iteration from the source.
  • The source is re-opened on each iteration (not cached). This means for result in pipeline can be run multiple times and produces the same output each time (assuming the underlying file has not changed).

R2 - Pipeline.from_file(path, format="csv")

A class method that creates a Pipeline from a file. Supported formats:

  • "csv": reads the file as a CSV, yielding each row as a dict[str, str]. Uses csv.DictReader. The file is opened lazily - only when the pipeline is iterated, not when from_file is called.
  • "jsonl": reads the file line by line, parsing each line as JSON and yielding the resulting dict.

The file must not be read until iteration begins. Do not call open() at construction time.

R3 - Pipeline.from_iterable(iterable)

A class method that creates a Pipeline from any iterable (list, generator, range, etc.). Because the iterable may be a one-shot generator, document that iterating the pipeline twice may not work if the source is a generator (this is expected and acceptable).

R4 - .filter(predicate)

Returns a new Pipeline that yields only records for which predicate(record) returns truthy. Fully lazy - uses a generator internally.

R5 - .map(transform)

Returns a new Pipeline that yields transform(record) for each record. Fully lazy - uses a generator internally.

R6 - .group_by(key)

Returns a new Pipeline that groups records by the value of record[key]. The output is a sequence of (group_key, [records]) tuples - one tuple per unique key value.

Important: group_by is not fully lazy. It must consume the entire upstream to build the groups before yielding any output. This is a documented architectural tradeoff - document it in a comment in your implementation and in the docstring.

The key parameter can be:

  • A string: groups by record[key]
  • A callable: groups by key(record)

R7 - .aggregate(**aggregators)

Returns a new Pipeline that applies aggregation functions to each group produced by group_by. Each keyword argument to aggregate is a function applied to the list of records in a group. The output is a sequence of dicts: one per group, containing the group key and each aggregated value.

.aggregate(count=len, avg_age=lambda rows: sum(r["age"] for r in rows) / len(rows))
# yields: {"age_group": "18-24", "count": 4512, "avg_age": 21.3}

If called without a preceding group_by, raises PipelineConfigurationError.

R8 - .limit(n)

Returns a new Pipeline that yields at most n records. Fully lazy - stops pulling from upstream after n records.

R9 - .collect()

A terminal operation that materialises all results into a list. Equivalent to list(pipeline). Provided as a convenience method.

R10 - Error handling

Define a custom PipelineConfigurationError exception for invalid pipeline configurations (e.g., calling aggregate without group_by). Invalid configurations should be detected at iteration time, not at construction time, consistent with the lazy design.

Starter Code Skeleton

import csv
import json
from typing import Any, Callable, Iterable, Iterator


# ── Custom Exceptions ─────────────────────────────────────────────────────────

class PipelineConfigurationError(Exception):
pass


# ── Pipeline ──────────────────────────────────────────────────────────────────

class Pipeline:
def __init__(self, source: Callable[[], Iterator], steps: list[Callable] = None):
"""
source: a zero-argument callable that returns a fresh iterator each time.
steps: list of generator-wrapping functions, applied left to right.
"""
self._source = source
self._steps = steps or []

# ── Class method constructors ─────────────────────────────────────────────

@classmethod
def from_file(cls, path: str, format: str = "csv") -> "Pipeline":
"""
Create a Pipeline from a file. File is opened lazily at iteration time.
Supported formats: 'csv', 'jsonl'.
"""
def source() -> Iterator[dict]:
# TODO: if format == "csv", open path and use csv.DictReader
# TODO: if format == "jsonl", open path, parse each line as json.loads
# TODO: raise ValueError for unknown format
# IMPORTANT: use 'with open(path)' inside this function, not outside
pass
return cls(source)

@classmethod
def from_iterable(cls, iterable: Iterable) -> "Pipeline":
"""
Create a Pipeline from any iterable.
Note: if the iterable is a one-shot generator, the pipeline can only be iterated once.
"""
# TODO: return cls(lambda: iter(iterable))
pass

# ── Transformation methods ────────────────────────────────────────────────

def filter(self, predicate: Callable[[Any], bool]) -> "Pipeline":
"""Yield only records for which predicate(record) is truthy. Fully lazy."""
def step(upstream: Iterator) -> Iterator:
# TODO: implement as a generator - yield record if predicate(record)
pass
return Pipeline(self._source, self._steps + [step])

def map(self, transform: Callable[[Any], Any]) -> "Pipeline":
"""Yield transform(record) for each record. Fully lazy."""
def step(upstream: Iterator) -> Iterator:
# TODO: implement as a generator - yield transform(record) for each record
pass
return Pipeline(self._source, self._steps + [step])

def group_by(self, key) -> "Pipeline":
"""
Group records by key. key can be a string (field name) or callable.

ARCHITECTURAL TRADEOFF: group_by is not lazy. It must consume the entire
upstream iterator to build groups before yielding any output. Memory usage
is O(n) in the number of input records. For streaming use cases where
the input is sorted by the group key, consider using itertools.groupby
instead (which IS lazy, but requires pre-sorted input).
"""
key_fn = key if callable(key) else lambda r: r[key]

def step(upstream: Iterator) -> Iterator:
# TODO: consume entire upstream into a dict of {group_key: [records]}
# TODO: yield (group_key, records) tuples
pass
return Pipeline(self._source, self._steps + [step])

def aggregate(self, **aggregators: Callable) -> "Pipeline":
"""
Apply aggregation functions to each (group_key, records) tuple from group_by.
Each aggregator receives the list of records in the group.
Must follow group_by - raises PipelineConfigurationError otherwise.
"""
def step(upstream: Iterator) -> Iterator:
for item in upstream:
# TODO: check that item is a (key, list) tuple
# TODO: if not, raise PipelineConfigurationError
# TODO: build result dict with group_key field + aggregated values
# TODO: yield result dict
pass
return Pipeline(self._source, self._steps + [step])

def limit(self, n: int) -> "Pipeline":
"""Yield at most n records. Fully lazy."""
def step(upstream: Iterator) -> Iterator:
# TODO: yield from upstream, stopping after n records
pass
return Pipeline(self._source, self._steps + [step])

# ── Iteration protocol ────────────────────────────────────────────────────

def __iter__(self) -> Iterator:
"""
Execute the pipeline. Opens the source, threads it through each step,
and yields results. Each call to __iter__ starts a fresh iteration.
"""
# TODO: start with iterator = self._source()
# TODO: for each step in self._steps, iterator = step(iterator)
# TODO: yield from iterator
pass

# ── Terminal operations ───────────────────────────────────────────────────

def collect(self) -> list:
"""Materialise all results into a list."""
return list(self)

def __repr__(self) -> str:
return f"Pipeline(steps={len(self._steps)})"


# ── Helper functions ──────────────────────────────────────────────────────────

def age_group(age: int) -> str:
"""Categorise an integer age into a string bucket."""
if age < 18:
return "under-18"
elif age < 25:
return "18-24"
elif age < 35:
return "25-34"
elif age < 50:
return "35-49"
else:
return "50+"


# ── Demonstration ─────────────────────────────────────────────────────────────

if __name__ == "__main__":
import io

# ── Demo 1: in-memory iterable ────────────────────────────────────────────
records = [
{"name": "Alice", "age": 23, "city": "London"},
{"name": "Bob", "age": 17, "city": "Paris"},
{"name": "Carol", "age": 31, "city": "London"},
{"name": "Dave", "age": 23, "city": "Berlin"},
{"name": "Eve", "age": 45, "city": "London"},
]

print("=== filter + map ===")
results = (
Pipeline.from_iterable(records)
.filter(lambda r: r["age"] >= 18)
.map(lambda r: {**r, "age_group": age_group(r["age"])})
.collect()
)
for r in results:
print(r)

print("\n=== group_by + aggregate ===")
results = (
Pipeline.from_iterable(records)
.filter(lambda r: r["age"] >= 18)
.group_by("city")
.aggregate(
count=len,
avg_age=lambda rows: round(sum(r["age"] for r in rows) / len(rows), 1),
)
.collect()
)
for r in results:
print(r)

print("\n=== limit ===")
results = (
Pipeline.from_iterable(range(1000))
.map(lambda x: x ** 2)
.filter(lambda x: x % 2 == 0)
.limit(5)
.collect()
)
print(results)

# ── Demo 2: CSV file ──────────────────────────────────────────────────────
# Write a small CSV to a temp file and process it
import tempfile, os

csv_data = "name,age,city\nAlice,23,London\nBob,17,Paris\nCarol,31,London\n"
with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f:
f.write(csv_data)
tmp_path = f.name

print("\n=== from_file (CSV) ===")
results = (
Pipeline.from_file(tmp_path, format="csv")
.map(lambda r: {**r, "age": int(r["age"])})
.filter(lambda r: r["age"] >= 18)
.collect()
)
for r in results:
print(r)

os.unlink(tmp_path)

Expected Output

=== filter + map ===
{'name': 'Alice', 'age': 23, 'city': 'London', 'age_group': '18-24'}
{'name': 'Carol', 'age': 31, 'city': 'London', 'age_group': '25-34'}
{'name': 'Dave', 'age': 23, 'city': 'Berlin', 'age_group': '18-24'}
{'name': 'Eve', 'age': 45, 'city': 'London', 'age_group': '50+'}

=== group_by + aggregate ===
{'city': 'London', 'count': 3, 'avg_age': 33.0}
{'city': 'Berlin', 'count': 1, 'avg_age': 23.0}

=== limit ===
[0, 4, 16, 36, 64]

=== from_file (CSV) ===
{'name': 'Alice', 'age': 23, 'city': 'London'}
{'name': 'Carol', 'age': 31, 'city': 'London'}

Step-by-Step Hints

Hint 1 - Start with from_iterable and __iter__. These two methods are the core of the pipeline. Get them working together first: Pipeline.from_iterable([1, 2, 3]).collect() should return [1, 2, 3] before you add any transformation steps.

Hint 2 - _source is a callable, not an iterator. This is the key design decision. If _source were an iterator directly, the pipeline could only be iterated once (iterators are exhausted after one pass). By storing a zero-argument callable that returns a fresh iterator, the pipeline can be iterated multiple times:

# source is a function, not the file handle or list itself
def source():
return iter(my_list)

# Each call to __iter__ calls source() to get a fresh iterator

Hint 3 - Steps are generator-wrapping functions. Each step is a function that takes an iterator and returns a generator. Chaining them in __iter__ is just function composition:

iterator = self._source()
for step in self._steps:
iterator = step(iterator) # each step wraps the previous iterator
yield from iterator

Hint 4 - filter and map as generators.

def filter(self, predicate):
def step(upstream):
for record in upstream:
if predicate(record):
yield record
return Pipeline(self._source, self._steps + [step])

The yield keyword is what makes step a generator function. When step(upstream) is called, it returns a generator object - no code runs until the generator is iterated.

Hint 5 - group_by must consume the entire upstream.

def step(upstream):
groups = {}
for record in upstream: # consumes entire upstream here
k = key_fn(record)
groups.setdefault(k, []).append(record)
for group_key, records in groups.items():
yield (group_key, records) # yields tuples one at a time

The outer function is still a generator (it has yield), so it satisfies the step interface. But the consumption of upstream happens eagerly at the start of iteration.

Hint 6 - aggregate detects bad pipeline configuration.

def step(upstream):
for item in upstream:
if not (isinstance(item, tuple) and len(item) == 2 and isinstance(item[1], list)):
raise PipelineConfigurationError(
"aggregate() must follow group_by(). "
f"Expected (key, list) tuple, got {type(item).__name__}."
)
group_key, records = item
result = {"group_key_field": group_key} # you will need to know the key name here
for agg_name, agg_fn in aggregators.items():
result[agg_name] = agg_fn(records)
yield result

There is a subtle design question: how does aggregate know what to name the group key field in the output dict? One approach: group_by wraps the tuple with metadata. Another: aggregate uses a fixed name like "key" or receives the field name explicitly. Choose and document your approach.

Hint 7 - limit using enumerate.

def step(upstream):
for i, record in enumerate(upstream):
if i >= n:
break
yield record

Or equivalently use itertools.islice(upstream, n).

Hint 8 - CSV fields are always strings. csv.DictReader returns every field as a string. If your pipeline filters on row["age"] > 18, that comparison will fail or give wrong results because "23" > "18" is a string comparison. The skeleton demo handles this with .map(lambda r: {**r, "age": int(r["age"])}) as the first step. Make sure your test cases account for this.

Architectural Notes

Why lazy evaluation matters

Consider processing a 50GB server log file to find the 10 most common error messages. A naive approach loads the entire file into memory - impossible on a 16GB machine. A lazy pipeline approach:

results = (
Pipeline.from_file("access.log", format="jsonl")
.filter(lambda r: r["level"] == "ERROR")
.map(lambda r: r["message"])
.group_by(lambda msg: msg)
.aggregate(count=len)
# sort and limit would be extensions
)

Only one log record exists in memory at any point during filter and map. group_by buffers by message (O(unique messages) - much smaller than total records). This is how real log processing pipelines work.

The group_by tradeoff

group_by is the unavoidable break in laziness. Producing a complete group requires seeing all records with that key - but records are not sorted, so you cannot know a group is complete until the source is exhausted. The options are:

  1. Buffer all groups (what this project implements): O(n) memory, simple, correct.
  2. Require sorted input (like itertools.groupby): O(1) memory per group, but caller must sort first, which itself requires materialising the data.
  3. Streaming approximation: probabilistic methods (count-min sketch, HyperLogLog) that trade accuracy for memory, used in analytics databases.

Real data pipelines (Apache Spark, Flink, Beam) handle this with distributed aggregation - each worker groups a partition, then a shuffle step merges groups across workers. The tradeoff never disappears; it is just distributed across machines.

FP Concepts Tested

ConceptWhere it appears
GeneratorsEvery step is a generator function
Generator chaining__iter__ chains steps by wrapping each iterator
Lazy evaluationNo data processed until iteration
ImmutabilityEach operation returns a new Pipeline; original unchanged
Higher-order functionsfilter, map, group_by take functions as arguments
ClosuresEach step function closes over its configuration (predicate, key_fn, n)
Iterator protocolPipeline.__iter__ implements the standard Python iterator protocol

Extension Challenges

Extension 1 - .sort_by(key, reverse=False) Implement a sort step. Like group_by, this cannot be lazy - document why. Use sorted(upstream, key=key) internally. This requires materialising the entire upstream.

Extension 2 - .flat_map(transform) Implement flat_map where transform returns an iterable and the pipeline yields each element of each returned iterable (one level of flattening). Fully lazy.

Extension 3 - .distinct(key=None) Yield each record only once, deduplicated by record[key] or by the full record if key is None. Requires a seen set in the closure - documents memory tradeoff for high-cardinality fields.

Extension 4 - .tee(n=2) Implement itertools.tee-style duplication: return n independent pipelines that each produce the full output of the current pipeline. This requires understanding the buffering that tee uses internally.

Extension 5 - JSONL output Add a terminal method .to_file(path, format="jsonl") that iterates the pipeline and writes each record as a JSON line to a file. Pair with from_file for a full read-transform-write pipeline that handles files larger than RAM end-to-end.

Extension 6 - Progress reporting Add .with_progress(every=1000) that prints a progress message every every records as the pipeline runs. This is a side-effecting step (it prints) - document why it violates pure functional principles and why it is still useful in practice.

© 2026 EngineersOfAI. All rights reserved.