Python Build a Scalable Data Pipeline: Practice Problems & Exercises
Practice: Build a Scalable Data Pipeline
← Back to lessonEasy
Build a three-stage generator pipeline: read_numbers -> filter_even -> double. Demonstrate that items flow lazily.
from typing import Iterator
def read_numbers(data: list) -> Iterator[int]:
for item in data:
yield item
def filter_even(source: Iterator[int]) -> Iterator[int]:
for n in source:
if n % 2 == 0:
yield n
def double(source: Iterator[int]) -> Iterator[int]:
for n in source:
yield n * 2
# Build and run the pipeline
raw_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
pipeline = double(filter_even(read_numbers(raw_data)))
result = list(pipeline)
print(f"Processed: {result}")
print("Memory: only one item in memory at a time")
Solution
from typing import Iterator
def read_numbers(data: list) -> Iterator[int]:
for item in data:
yield item
def filter_even(source: Iterator[int]) -> Iterator[int]:
for n in source:
if n % 2 == 0:
yield n
def double(source: Iterator[int]) -> Iterator[int]:
for n in source:
yield n * 2
raw_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
pipeline = double(filter_even(read_numbers(raw_data)))
result = list(pipeline)
print(f"Processed: {result}")
print("Memory: only one item in memory at a time")
Why generators matter at scale: A list-based pipeline loads the entire dataset into memory at each stage. A generator pipeline processes one item at a time through all stages — constant memory regardless of whether the input has 10 records or 10 billion. This is how Apache Beam, Spark's RDD transformations, and Python's itertools all work internally.
Expected Output
Processed: [2, 4, 6, 8, 10]\nMemory: only one item in memory at a timeHints
Hint 1: Chain generators using `yield from` or by passing one generator as input to another. Each stage is a generator function that yields transformed items.
Hint 2: The key property: no item moves past stage N+1 until stage N+1 requests it. This is lazy evaluation — O(1) memory regardless of input size.
Implement batch(source, size) that groups a stream of items into fixed-size batches, yielding each batch as a list.
from typing import Iterator, List, TypeVar
T = TypeVar('T')
def batch(source: Iterator[T], size: int) -> Iterator[List[T]]:
buf = []
for item in source:
buf.append(item)
if len(buf) == size:
yield buf
buf = []
if buf:
yield buf
# Test
items = iter(range(1, 11))
for i, b in enumerate(batch(items, 3), 1):
suffix = " (partial last batch)" if len(b) < 3 else ""
print(f"Batch {i}: {b}{suffix}")
Solution
from typing import Iterator, List, TypeVar
T = TypeVar('T')
def batch(source: Iterator[T], size: int) -> Iterator[List[T]]:
buf = []
for item in source:
buf.append(item)
if len(buf) == size:
yield buf
buf = []
if buf:
yield buf
items = iter(range(1, 11))
for i, b in enumerate(batch(items, 3), 1):
suffix = " (partial last batch)" if len(b) < 3 else ""
print(f"Batch {i}: {b}{suffix}")
Batching economics: A PostgreSQL INSERT of 1000 rows in a single batch is roughly 100x faster than 1000 individual inserts. The overhead is in network round-trips and transaction setup, not the actual data writing. Every production ETL pipeline batches writes — the question is just what batch size to use (typically 100-10000 rows depending on row size and network latency).
Expected Output
Batch 1: [1, 2, 3]\nBatch 2: [4, 5, 6]\nBatch 3: [7, 8, 9]\nBatch 4: [10] (partial last batch)Hints
Hint 1: Use a list as a buffer. Append items until `len(buffer) == batch_size`, then yield the buffer and reset it. After the loop, yield any remaining items.
Hint 2: This is the "micro-batching" pattern used in Spark Streaming and Flink. Sending individual records to a database is slow; batching amortizes the per-record overhead.
Medium
Build a fluent Pipeline class with chainable .map(), .filter(), .batch(), .take(), and .collect() methods.
from typing import Callable, Iterator, List
import itertools
class Pipeline:
def __init__(self, source):
self._source = source
@classmethod
def from_iterable(cls, data) -> 'Pipeline':
return cls(iter(data))
def map(self, fn: Callable) -> 'Pipeline':
return Pipeline(fn(x) for x in self._source)
def filter(self, fn: Callable) -> 'Pipeline':
return Pipeline(x for x in self._source if fn(x))
def batch(self, size: int) -> 'Pipeline':
def _batch(src, n):
buf = []
for item in src:
buf.append(item)
if len(buf) == n:
yield buf
buf = []
if buf:
yield buf
return Pipeline(_batch(self._source, size))
def collect(self) -> list:
return list(self._source)
def take(self, n: int) -> list:
return list(itertools.islice(self._source, n))
# Test
data = range(1, 15)
result = (
Pipeline.from_iterable(data)
.filter(lambda x: x % 2 == 0)
.map(lambda x: x * 2)
.batch(2)
.collect()
)
print(f"Batched: {result}")
first3 = (
Pipeline.from_iterable(range(1, 100))
.filter(lambda x: x % 2 == 0)
.take(3)
)
print(f"First 3 evens: {first3}")
Solution
from typing import Callable
import itertools
class Pipeline:
def __init__(self, source):
self._source = source
@classmethod
def from_iterable(cls, data) -> 'Pipeline':
return cls(iter(data))
def map(self, fn: Callable) -> 'Pipeline':
return Pipeline(fn(x) for x in self._source)
def filter(self, fn: Callable) -> 'Pipeline':
return Pipeline(x for x in self._source if fn(x))
def batch(self, size: int) -> 'Pipeline':
def _batch(src, n):
buf = []
for item in src:
buf.append(item)
if len(buf) == n:
yield buf
buf = []
if buf:
yield buf
return Pipeline(_batch(self._source, size))
def collect(self) -> list:
return list(self._source)
def take(self, n: int) -> list:
return list(itertools.islice(self._source, n))
data = range(1, 15)
result = (
Pipeline.from_iterable(data)
.filter(lambda x: x % 2 == 0)
.map(lambda x: x * 2)
.batch(2)
.collect()
)
print(f"Batched: {result}")
first3 = (
Pipeline.from_iterable(range(1, 100))
.filter(lambda x: x % 2 == 0)
.take(3)
)
print(f"First 3 evens: {first3}")
Real-world fluent pipelines: Apache Beam, PySpark, and LINQ all expose exactly this fluent chaining pattern. In Beam: pcollection | 'Map' >> beam.Map(fn) | 'Filter' >> beam.Filter(pred) | 'Write' >> beam.io.WriteToText(...). The pipeline graph is built lazily and only executed when you call .run().
from typing import Iterator, Callable, TypeVar, List
T = TypeVar('T')
class Pipeline:
"""Fluent composable data pipeline.
Usage:
result = (
Pipeline.from_iterable(data)
.map(transform_fn)
.filter(predicate_fn)
.batch(size)
.collect()
)
"""
def __init__(self, source):
self._source = source
@classmethod
def from_iterable(cls, data) -> 'Pipeline':
pass
def map(self, fn: Callable) -> 'Pipeline':
pass
def filter(self, fn: Callable) -> 'Pipeline':
pass
def batch(self, size: int) -> 'Pipeline':
pass
def collect(self) -> list:
pass
def take(self, n: int) -> list:
passExpected Output
Batched: [[4, 6], [8, 10], [12]]\nFirst 3 evens: [2, 4, 6]Hints
Hint 1: Each method should wrap `self._source` in a new generator and return a new `Pipeline(new_generator)`. This is the builder/fluent pattern — each method returns `self` or a new instance.
Hint 2: For `batch()`, reuse the batch generator from the previous problem. For `take()`, use `itertools.islice` or a manual counter.
Build a safe_process() pipeline stage that catches errors, retries failed items, and routes persistent failures to a dead letter queue.
from typing import Iterator, Callable, TypeVar, List, Tuple, Any
from dataclasses import dataclass, field
import time
T = TypeVar('T')
@dataclass
class ProcessingResult:
successes: List[Any] = field(default_factory=list)
failures: List[Tuple[Any, str]] = field(default_factory=list)
def safe_process(
source,
transform: Callable,
max_retries: int = 2,
) -> ProcessingResult:
result = ProcessingResult()
retry_counts = {}
queue = list(source)
while queue:
item = queue.pop(0)
try:
output = transform(item)
result.successes.append(output)
except Exception as e:
retry_counts[id(item)] = retry_counts.get(id(item), 0) + 1
key = (item, type(item))
if key not in retry_counts:
retry_counts[key] = 0
retry_counts[key] += 1
if retry_counts[key] <= max_retries:
queue.append(item)
else:
result.failures.append((item, f"{type(e).__name__}: {e}"))
return result
# Transform: doubles even numbers, raises for odd
attempt_counts = {}
def transform_evens(x):
attempt_counts[x] = attempt_counts.get(x, 0) + 1
if x % 2 != 0:
raise ValueError(f"odd number")
return x * 2
items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
result = safe_process(
(x for x in items if x in [2, 3, 4, 6, 7, 8, 10]),
transform_evens,
max_retries=1,
)
print(f"Successes: {result.successes}")
print(f"Failures (DLQ): {[(item, err) for item, err in result.failures]}")
Solution
from typing import Callable, Any, List, Tuple
from dataclasses import dataclass, field
@dataclass
class ProcessingResult:
successes: List[Any] = field(default_factory=list)
failures: List[Tuple[Any, str]] = field(default_factory=list)
def safe_process(source, transform: Callable, max_retries: int = 2) -> ProcessingResult:
result = ProcessingResult()
items_with_counts = [(item, 0) for item in source]
while items_with_counts:
item, attempts = items_with_counts.pop(0)
try:
result.successes.append(transform(item))
except Exception as e:
if attempts < max_retries:
items_with_counts.append((item, attempts + 1))
else:
result.failures.append((item, f"{type(e).__name__}: {e}"))
return result
def transform_evens(x):
if x % 2 != 0:
raise ValueError("odd number")
return x * 2
result = safe_process([2, 3, 4, 6, 7, 8, 10], transform_evens, max_retries=1)
print(f"Successes: {result.successes}")
print(f"Failures (DLQ): {result.failures}")
Dead letter queues in production: AWS SQS DLQ, Kafka dead letter topics, and RabbitMQ dead letter exchanges all implement this pattern at the infrastructure level. Failed messages accumulate in the DLQ for manual inspection and replay. The retry count is typically stored in the message metadata. Production systems also add exponential backoff between retries to avoid thundering-herd on a flaky downstream service.
from typing import Iterator, Callable, TypeVar, List, Tuple, Any
from dataclasses import dataclass, field
T = TypeVar('T')
@dataclass
class ProcessingResult:
successes: List[Any] = field(default_factory=list)
failures: List[Tuple[Any, str]] = field(default_factory=list)
def safe_process(
source: Iterator[T],
transform: Callable[[T], Any],
max_retries: int = 2,
) -> ProcessingResult:
"""Process items with retry logic and dead letter queue.
Retry each failed item up to max_retries times.
If still failing, add to failures list as (item, error_message).
Successes go to result.successes.
"""
passExpected Output
Successes: [2, 4, 6, 8, 10]\nFailures (DLQ): [(3, 'ValueError: odd number'), (7, 'ValueError: odd number')]Hints
Hint 1: Wrap the transform call in a try/except. On failure, decrement a retry counter. If retries exhausted, append (item, str(exception)) to failures.
Hint 2: Retrying in a pipeline context: keep a per-item retry counter (use a dict). On each failure, increment the counter and re-add the item to a retry queue.
Implement a sliding window aggregator that applies an aggregation function (e.g., mean, max) over a sliding window of configurable size and step.
from typing import Iterator, Callable
from collections import deque
class SlidingWindowAggregator:
def __init__(self, window_size: int, step: int = 1):
self.window_size = window_size
self.step = step
def process(self, source: Iterator, aggregate_fn: Callable) -> Iterator:
window = deque()
count = 0
for item in source:
window.append(item)
count += 1
if len(window) > self.window_size:
window.popleft()
if len(window) == self.window_size and (count - self.window_size) % self.step == 0:
yield aggregate_fn(list(window))
def mean(xs):
return sum(xs) / len(xs)
data = list(range(1, 11))
sliding = SlidingWindowAggregator(window_size=3, step=1)
print(f"Window averages (size=3, step=1): {list(sliding.process(iter(data), mean))}")
tumbling = SlidingWindowAggregator(window_size=3, step=3)
print(f"Tumbling windows (size=3, step=3): {list(tumbling.process(iter(data), mean))}")
Solution
from typing import Iterator, Callable
from collections import deque
class SlidingWindowAggregator:
def __init__(self, window_size: int, step: int = 1):
self.window_size = window_size
self.step = step
def process(self, source: Iterator, aggregate_fn: Callable) -> Iterator:
window = deque()
count = 0
for item in source:
window.append(item)
count += 1
if len(window) > self.window_size:
window.popleft()
if len(window) == self.window_size and (count - self.window_size) % self.step == 0:
yield aggregate_fn(list(window))
def mean(xs):
return sum(xs) / len(xs)
data = list(range(1, 11))
sliding = SlidingWindowAggregator(window_size=3, step=1)
print(f"Window averages (size=3, step=1): {list(sliding.process(iter(data), mean))}")
tumbling = SlidingWindowAggregator(window_size=3, step=3)
print(f"Tumbling windows (size=3, step=3): {list(tumbling.process(iter(data), mean))}")
Window types in streaming: Apache Flink and Kafka Streams distinguish tumbling windows (no overlap), sliding windows (partial overlap), and session windows (gap-based). This single class with a step parameter captures the first two. Time-based windowing adds a timestamp comparison; count-based windowing (as here) is simpler but only works for uniform-rate streams.
from typing import Iterator, List, Dict, Any, Callable
from collections import deque
class SlidingWindowAggregator:
"""Sliding window aggregation over a stream.
window_size: number of items in the window
step: how many items to advance the window each time
For each complete window, calls aggregate_fn(window) and yields the result.
"""
def __init__(self, window_size: int, step: int = 1):
pass
def process(self, source: Iterator, aggregate_fn: Callable) -> Iterator:
passExpected Output
Window averages (size=3, step=1): [2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]\nTumbling windows (size=3, step=3): [2.0, 5.0, 8.0]Hints
Hint 1: Use a `collections.deque(maxlen=window_size)` as the sliding window buffer. On each item, append to deque. When len(deque) == window_size and (count % step == 0), yield aggregate_fn(list(deque)).
Hint 2: A tumbling window is just a sliding window where step == window_size. A sliding window is step == 1. The step parameter controls the overlap.
Implement a parallel map stage using ThreadPoolExecutor that supports both ordered and unordered result collection with error handling.
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Iterator, Callable, List, Any, TypeVar
import time
T = TypeVar('T')
def parallel_map(
source,
transform: Callable,
workers: int = 4,
ordered: bool = True,
) -> List[Any]:
items = list(source)
def safe_transform(item):
try:
return transform(item)
except Exception as e:
print(f"Warning: transform failed for {item}: {e}")
return None
with ThreadPoolExecutor(max_workers=workers) as executor:
if ordered:
return list(executor.map(safe_transform, items))
else:
futures = {executor.submit(safe_transform, item): i for i, item in enumerate(items)}
results = [None] * len(items)
for future in as_completed(futures):
idx = futures[future]
results[idx] = future.result()
return results
# Test: slow I/O-bound transform
def slow_double(x):
time.sleep(0.01)
return x * 2
def failing_double(x):
if x == 3:
raise ValueError("cannot double 3")
return x * 2
data = [1, 2, 3, 4, 5]
print(f"Ordered results: {parallel_map(iter(data), slow_double, workers=3, ordered=True)}")
unordered = parallel_map(iter(data), slow_double, workers=3, ordered=False)
print(f"Unordered results (may vary): {len(unordered)} items, sum={sum(unordered)}")
print(f"With one error: {parallel_map(iter(data), failing_double, workers=3, ordered=True)}")
Solution
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable, List, Any
import time
def parallel_map(source, transform: Callable, workers: int = 4, ordered: bool = True) -> List[Any]:
items = list(source)
def safe_transform(item):
try:
return transform(item)
except Exception as e:
print(f"Warning: transform failed for {item}: {e}")
return None
with ThreadPoolExecutor(max_workers=workers) as executor:
if ordered:
return list(executor.map(safe_transform, items))
else:
futures = {executor.submit(safe_transform, item): i for i, item in enumerate(items)}
results = [None] * len(items)
for future in as_completed(futures):
results[futures[future]] = future.result()
return results
def slow_double(x):
time.sleep(0.01)
return x * 2
def failing_double(x):
if x == 3:
raise ValueError("cannot double 3")
return x * 2
data = [1, 2, 3, 4, 5]
print(f"Ordered results: {parallel_map(iter(data), slow_double, ordered=True)}")
unordered = parallel_map(iter(data), slow_double, ordered=False)
print(f"Unordered results: {len(unordered)} items, sum={sum(unordered)}")
print(f"With one error: {parallel_map(iter(data), failing_double, ordered=True)}")
Thread pool vs process pool: For I/O-bound work (HTTP requests, database queries, file reads), ThreadPoolExecutor is correct — threads release the GIL during I/O. For CPU-bound work (numerical computation, image processing), use ProcessPoolExecutor to bypass the GIL. In data pipelines, most stages are I/O-bound (reading from sources, writing to sinks), so threads are the right tool.
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Iterator, Callable, List, Any, TypeVar
T = TypeVar('T')
def parallel_map(
source: Iterator[T],
transform: Callable[[T], Any],
workers: int = 4,
ordered: bool = True,
) -> List[Any]:
"""Apply transform to each item using a thread pool.
If ordered=True, return results in input order.
If ordered=False, return results as they complete (faster for variable-time tasks).
Handle exceptions: failed items should produce None + print a warning.
"""
passExpected Output
Ordered results: [2, 4, 6, 8, 10]\nUnordered results (may vary): 5 items, sum=30\nWith one error: [2, None, 6, 8, 10]Hints
Hint 1: For ordered=True, use executor.map(transform, items) which preserves order. For ordered=False, use submit() + as_completed().
Hint 2: To handle exceptions in ordered mode, wrap transform in a try/except lambda. In unordered mode, call future.result() inside a try/except.
Hard
Build a backpressure-aware async pipeline using bounded queues to prevent fast producers from overwhelming slow consumers.
import asyncio
from typing import AsyncIterator, Callable
async def bounded_pipeline(
source,
transform: Callable,
sink: Callable,
buffer_size: int = 5,
workers: int = 3,
) -> dict:
queue: asyncio.Queue = asyncio.Queue(maxsize=buffer_size)
stats = {"processed": 0, "errors": 0}
_SENTINEL = object()
async def producer():
async for item in source:
await queue.put(item)
for _ in range(workers):
await queue.put(_SENTINEL)
async def worker():
while True:
item = await queue.get()
if item is _SENTINEL:
queue.task_done()
break
try:
result = await transform(item)
await sink(result)
stats["processed"] += 1
except Exception as e:
stats["errors"] += 1
finally:
queue.task_done()
await asyncio.gather(
producer(),
*[worker() for _ in range(workers)],
)
return stats
# Test
async def async_source():
for i in range(20):
yield i
await asyncio.sleep(0.001)
collected = []
async def slow_transform(x):
await asyncio.sleep(0.005)
return x * 2
async def sink(item):
collected.append(item)
async def main():
stats = await bounded_pipeline(
async_source(),
slow_transform,
sink,
buffer_size=5,
workers=4,
)
print(f"Processed: {stats['processed']} items")
print(f"Errors: {stats['errors']}")
print(f"Backpressure events: buffer_size=5 with 4 workers, producer blocks when full")
asyncio.run(main())
Solution
import asyncio
from typing import Callable
async def bounded_pipeline(source, transform, sink, buffer_size=5, workers=3):
queue = asyncio.Queue(maxsize=buffer_size)
stats = {"processed": 0, "errors": 0}
_SENTINEL = object()
async def producer():
async for item in source:
await queue.put(item)
for _ in range(workers):
await queue.put(_SENTINEL)
async def worker():
while True:
item = await queue.get()
if item is _SENTINEL:
queue.task_done()
break
try:
result = await transform(item)
await sink(result)
stats["processed"] += 1
except Exception:
stats["errors"] += 1
finally:
queue.task_done()
await asyncio.gather(producer(), *[worker() for _ in range(workers)])
return stats
collected = []
async def async_source():
for i in range(20):
yield i
await asyncio.sleep(0.001)
async def slow_transform(x):
await asyncio.sleep(0.005)
return x * 2
async def sink(item):
collected.append(item)
async def main():
stats = await bounded_pipeline(async_source(), slow_transform, sink, buffer_size=5, workers=4)
print(f"Processed: {stats['processed']} items")
print(f"Errors: {stats['errors']}")
asyncio.run(main())
Backpressure explained: Without a bounded queue, a fast producer can create millions of queued items, exhausting memory. asyncio.Queue(maxsize=N) is the solution — await queue.put() suspends the producer coroutine when the queue is full, giving workers time to catch up. This is the same mechanism Kafka uses: producers block when the broker is overwhelmed, propagating pressure upstream rather than crashing the system.
import asyncio
from typing import AsyncIterator, Callable, Any
async def bounded_pipeline(
source: AsyncIterator,
transform: Callable,
sink: Callable,
buffer_size: int = 10,
workers: int = 3,
) -> dict:
"""Async pipeline with bounded buffer for backpressure.
- Source produces items into a bounded queue (blocks when full)
- Worker coroutines pull from queue, apply transform, push to sink
- Returns stats: {'processed': N, 'errors': M}
The bounded queue size IS the backpressure mechanism.
When queue is full, the source is forced to wait.
"""
passExpected Output
Processed: 20 items\nErrors: 0\nBackpressure events: > 0 (queue was full at some point)Hints
Hint 1: Use `asyncio.Queue(maxsize=buffer_size)`. Producer does `await queue.put(item)` which blocks automatically when queue is full. Workers do `await queue.get()`.
Hint 2: Send a sentinel value (e.g., None) for each worker to signal shutdown. After the producer finishes, put N sentinel values (one per worker). Workers exit when they receive the sentinel.
Build a checkpointed pipeline that saves progress and can resume from where it left off after a crash.
import json
import os
from typing import Callable, Any
class CheckpointedPipeline:
def __init__(self, checkpoint_file: str):
self.checkpoint_file = checkpoint_file
def _load_checkpoint(self) -> int:
if os.path.exists(self.checkpoint_file):
with open(self.checkpoint_file) as f:
return json.load(f).get("last_processed", -1)
return -1
def _save_checkpoint(self, index: int) -> None:
tmp = self.checkpoint_file + ".tmp"
with open(tmp, "w") as f:
json.dump({"last_processed": index}, f)
os.replace(tmp, self.checkpoint_file)
def run(self, source: list, transform: Callable, sink: Callable) -> dict:
last_processed = self._load_checkpoint()
stats = {"processed": 0, "skipped": 0, "errors": 0}
for i, item in enumerate(source):
if i <= last_processed:
stats["skipped"] += 1
continue
try:
result = transform(item)
sink(result)
self._save_checkpoint(i)
stats["processed"] += 1
except Exception as e:
stats["errors"] += 1
raise
return stats
def reset(self) -> None:
if os.path.exists(self.checkpoint_file):
os.remove(self.checkpoint_file)
# Simulate crash-and-resume
results = []
def double(x):
return x * 2
def collect(x):
results.append(x)
pipeline = CheckpointedPipeline("/tmp/test_checkpoint.json")
pipeline.reset()
data = list(range(10))
first_run_stats = pipeline.run(data[:5], double, collect)
print(f"First run: processed {first_run_stats['processed']} items before crash")
second_run_stats = pipeline.run(data, double, collect)
print(f"Resumed from checkpoint: processed {second_run_stats['processed']} more items")
print(f"Total processed: {len(results)}")
pipeline.reset()
Solution
import json
import os
from typing import Callable, Any
class CheckpointedPipeline:
def __init__(self, checkpoint_file: str):
self.checkpoint_file = checkpoint_file
def _load_checkpoint(self) -> int:
if os.path.exists(self.checkpoint_file):
with open(self.checkpoint_file) as f:
return json.load(f).get("last_processed", -1)
return -1
def _save_checkpoint(self, index: int) -> None:
tmp = self.checkpoint_file + ".tmp"
with open(tmp, "w") as f:
json.dump({"last_processed": index}, f)
os.replace(tmp, self.checkpoint_file)
def run(self, source: list, transform: Callable, sink: Callable) -> dict:
last_processed = self._load_checkpoint()
stats = {"processed": 0, "skipped": 0, "errors": 0}
for i, item in enumerate(source):
if i <= last_processed:
stats["skipped"] += 1
continue
try:
result = transform(item)
sink(result)
self._save_checkpoint(i)
stats["processed"] += 1
except Exception:
stats["errors"] += 1
raise
return stats
def reset(self) -> None:
if os.path.exists(self.checkpoint_file):
os.remove(self.checkpoint_file)
results = []
pipeline = CheckpointedPipeline("/tmp/test_checkpoint.json")
pipeline.reset()
data = list(range(10))
s1 = pipeline.run(data[:5], lambda x: x * 2, results.append)
print(f"First run: processed {s1['processed']} items before crash")
s2 = pipeline.run(data, lambda x: x * 2, results.append)
print(f"Resumed from checkpoint: processed {s2['processed']} more items")
print(f"Total processed: {len(results)}")
pipeline.reset()
At-least-once vs exactly-once: This implementation provides at-least-once processing — if the process crashes after sink() but before _save_checkpoint(), the item will be re-processed on resume. Exactly-once requires a two-phase commit: record the checkpoint AND the sink operation atomically. Kafka achieves this with transactional producers; Flink uses distributed snapshots (Chandy-Lamport algorithm).
import json
import os
from typing import Iterator, Callable, Any
class CheckpointedPipeline:
"""Pipeline that saves progress and resumes after failure.
- Processes items from source one at a time
- After each successful item, saves checkpoint (last processed index)
- On restart, skips already-processed items and resumes
- Checkpoint stored as JSON in checkpoint_file
"""
def __init__(self, checkpoint_file: str):
pass
def run(
self,
source: list,
transform: Callable[[Any], Any],
sink: Callable[[Any], None],
) -> dict:
pass
def reset(self) -> None:
passExpected Output
First run: processed 5 items before crash\nResumed from checkpoint: processed 5 more items\nTotal processed: 10Hints
Hint 1: Load the checkpoint at start. If checkpoint exists, skip the first N items (where N = checkpoint index + 1). After each successful item, write the current index to the checkpoint file.
Hint 2: Use atomic writes for checkpoints: write to a temp file, then `os.replace()` to atomically swap it into place. This prevents corrupt checkpoint files if the process crashes mid-write.
Build a schema-validated record transformer that coerces types, applies custom validators, and cleanly separates valid from invalid records.
from typing import Dict, Any, List, Callable, Tuple, Optional
from dataclasses import dataclass, field
@dataclass
class FieldSchema:
name: str
type: type
required: bool = True
default: Any = None
validator: Optional[Callable] = None
class RecordTransformer:
def __init__(self):
self._schema: List[FieldSchema] = []
def define_schema(self, fields: List[FieldSchema]) -> 'RecordTransformer':
self._schema = fields
return self
def transform(self, record: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, str]]:
result = {}
errors = {}
for fs in self._schema:
value = record.get(fs.name)
if value is None:
if fs.required:
errors[fs.name] = "required field missing"
continue
result[fs.name] = fs.default
continue
# Type coercion
try:
coerced = fs.type(value)
except (ValueError, TypeError) as e:
errors[fs.name] = f"type coercion failed: {e}"
continue
# Custom validator
if fs.validator:
validation_error = fs.validator(coerced)
if validation_error:
errors[fs.name] = validation_error
continue
result[fs.name] = coerced
return result, errors
def process_batch(self, records: List[Dict]) -> Tuple[List[Dict], List[Tuple[Dict, Dict]]]:
valid = []
invalid = []
for record in records:
transformed, errors = self.transform(record)
if errors:
invalid.append((record, errors))
else:
valid.append(transformed)
return valid, invalid
# Test
transformer = RecordTransformer()
transformer.define_schema([
FieldSchema("name", str, required=True),
FieldSchema("age", int, required=True, validator=lambda x: "must be positive" if x <= 0 else None),
FieldSchema("score", float, required=False, default=0.0,
validator=lambda x: "must be between 0 and 100" if not (0 <= x <= 100) else None),
])
good = {"name": "Alice", "age": "30", "score": "95.5"}
bad = {"name": "Bob", "age": "-5", "score": "150"}
rec, errs = transformer.transform(good)
print(f"Valid record: {rec}")
_, errs = transformer.transform(bad)
print(f"Invalid: {errs}")
batch = [
{"name": "Alice", "age": "25", "score": "80"},
{"name": "Bob", "age": "-1", "score": "50"},
{"name": "Carol", "age": "30"},
{"name": "", "age": "20", "score": "200"},
{"name": "Dave", "age": "40", "score": "99.9"},
]
valid, invalid = transformer.process_batch(batch)
print(f"Batch: {len(valid)} valid, {len(invalid)} invalid")
Solution
from typing import Dict, Any, List, Callable, Tuple, Optional
from dataclasses import dataclass
@dataclass
class FieldSchema:
name: str
type: type
required: bool = True
default: Any = None
validator: Optional[Callable] = None
class RecordTransformer:
def __init__(self):
self._schema: List[FieldSchema] = []
def define_schema(self, fields: List[FieldSchema]) -> 'RecordTransformer':
self._schema = fields
return self
def transform(self, record: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, str]]:
result, errors = {}, {}
for fs in self._schema:
value = record.get(fs.name)
if value is None:
if fs.required:
errors[fs.name] = "required field missing"
else:
result[fs.name] = fs.default
continue
try:
coerced = fs.type(value)
except (ValueError, TypeError) as e:
errors[fs.name] = f"type coercion failed: {e}"
continue
if fs.validator:
err = fs.validator(coerced)
if err:
errors[fs.name] = err
continue
result[fs.name] = coerced
return result, errors
def process_batch(self, records):
valid, invalid = [], []
for record in records:
t, errors = self.transform(record)
(invalid if errors else valid).append((record, errors) if errors else t)
return valid, invalid
transformer = RecordTransformer()
transformer.define_schema([
FieldSchema("name", str),
FieldSchema("age", int, validator=lambda x: "must be positive" if x <= 0 else None),
FieldSchema("score", float, required=False, default=0.0,
validator=lambda x: "must be between 0 and 100" if not (0 <= x <= 100) else None),
])
rec, errs = transformer.transform({"name": "Alice", "age": "30", "score": "95.5"})
print(f"Valid record: {rec}")
_, errs = transformer.transform({"name": "Bob", "age": "-5", "score": "150"})
print(f"Invalid: {errs}")
batch = [
{"name": "Alice", "age": "25", "score": "80"},
{"name": "Bob", "age": "-1"},
{"name": "Carol", "age": "30"},
{"name": "Dave", "age": "40", "score": "99.9"},
{"name": "", "age": "20", "score": "200"},
]
valid, invalid = transformer.process_batch(batch)
print(f"Batch: {len(valid)} valid, {len(invalid)} invalid")
Schema enforcement in data engineering: This pattern is central to data quality in ETL. Great Expectations, Pandera, and Pydantic all implement the same idea: define a schema, validate data against it, and route bad records to a quarantine table rather than silently accepting them. Silent corruption is far worse than noisy rejection — bad data that sneaks through corrupts downstream analytics.
from typing import Dict, Any, List, Callable, Tuple, Optional
from dataclasses import dataclass
@dataclass
class FieldSchema:
name: str
type: type
required: bool = True
default: Any = None
validator: Optional[Callable] = None
class RecordTransformer:
"""Schema-validated record transformer.
- define_schema(fields) sets the expected schema
- transform(record) validates + coerces + applies custom validators
- Returns (transformed_record, errors) tuple
- process_batch(records) transforms all and separates valid from invalid
"""
passExpected Output
Valid record: {'name': 'Alice', 'age': 30, 'score': 95.5}\nInvalid: {'age': 'must be positive', 'score': 'must be between 0 and 100'}\nBatch: 3 valid, 2 invalidHints
Hint 1: For each field: check if present (required check), attempt type coercion (int("30") = 30), then run custom validator if provided.
Hint 2: Return both the transformed record AND the errors dict. Callers can then decide whether to accept partial records or reject the whole record if any field fails.
Build an instrumented pipeline that automatically collects per-stage latency, throughput, and error rate metrics.
import time
from typing import Callable, Iterator, Any
from dataclasses import dataclass
from threading import Lock
@dataclass
class StageMetrics:
name: str
items_in: int = 0
items_out: int = 0
errors: int = 0
total_latency_ms: float = 0.0
min_latency_ms: float = float('inf')
max_latency_ms: float = 0.0
@property
def avg_latency_ms(self) -> float:
return self.total_latency_ms / self.items_in if self.items_in else 0.0
@property
def throughput_pct(self) -> float:
return (self.items_out / self.items_in * 100) if self.items_in else 0.0
def __str__(self):
return (f"Stage: {self.name} | in={self.items_in}, out={self.items_out}, "
f"errors={self.errors}, avg_latency={self.avg_latency_ms:.2f}ms")
class InstrumentedPipeline:
def __init__(self):
self._stages = {}
self._lock = Lock()
def stage(self, name: str):
"""Decorator that wraps a generator stage with metrics collection."""
metrics = StageMetrics(name=name)
self._stages[name] = metrics
def decorator(fn: Callable) -> Callable:
def wrapper(source):
for item in source:
start = time.perf_counter()
with self._lock:
metrics.items_in += 1
try:
result = fn(item)
latency = (time.perf_counter() - start) * 1000
with self._lock:
metrics.items_out += 1
metrics.total_latency_ms += latency
metrics.min_latency_ms = min(metrics.min_latency_ms, latency)
metrics.max_latency_ms = max(metrics.max_latency_ms, latency)
if result is not None:
yield result
except Exception:
latency = (time.perf_counter() - start) * 1000
with self._lock:
metrics.errors += 1
metrics.total_latency_ms += latency
return wrapper
return decorator
def report(self):
for metrics in self._stages.values():
print(metrics)
# Test
pl = InstrumentedPipeline()
@pl.stage("ingest")
def ingest(item):
return item
@pl.stage("transform")
def transform(item):
if item % 5 == 0:
raise ValueError("divisible by 5")
return item * 2
@pl.stage("sink")
def sink(item):
return item
data = range(1, 11)
result = list(sink(transform(ingest(iter(data)))))
pl.report()
Solution
import time
from dataclasses import dataclass
from threading import Lock
from typing import Callable
@dataclass
class StageMetrics:
name: str
items_in: int = 0
items_out: int = 0
errors: int = 0
total_latency_ms: float = 0.0
min_latency_ms: float = float('inf')
max_latency_ms: float = 0.0
@property
def avg_latency_ms(self):
return self.total_latency_ms / self.items_in if self.items_in else 0.0
def __str__(self):
return (f"Stage: {self.name} | in={self.items_in}, out={self.items_out}, "
f"errors={self.errors}, avg_latency={self.avg_latency_ms:.2f}ms")
class InstrumentedPipeline:
def __init__(self):
self._stages = {}
self._lock = Lock()
def stage(self, name: str):
metrics = StageMetrics(name=name)
self._stages[name] = metrics
def decorator(fn: Callable) -> Callable:
def wrapper(source):
for item in source:
start = time.perf_counter()
with self._lock:
metrics.items_in += 1
try:
result = fn(item)
latency = (time.perf_counter() - start) * 1000
with self._lock:
metrics.items_out += 1
metrics.total_latency_ms += latency
metrics.min_latency_ms = min(metrics.min_latency_ms, latency)
metrics.max_latency_ms = max(metrics.max_latency_ms, latency)
if result is not None:
yield result
except Exception:
latency = (time.perf_counter() - start) * 1000
with self._lock:
metrics.errors += 1
metrics.total_latency_ms += latency
return wrapper
return decorator
def report(self):
for m in self._stages.values():
print(m)
pl = InstrumentedPipeline()
@pl.stage("ingest")
def ingest(item): return item
@pl.stage("transform")
def transform(item):
if item % 5 == 0:
raise ValueError("divisible by 5")
return item * 2
@pl.stage("sink")
def sink(item): return item
list(sink(transform(ingest(iter(range(1, 11))))))
pl.report()
Observability in pipelines: This is how production data platforms (Airflow, Prefect, Dagster) expose pipeline metrics. In Prometheus/Grafana setups, each metric counter (items_in, items_out, errors) becomes a labeled metric gauge. Alerts trigger when errors / items_in > threshold or when avg_latency_ms exceeds an SLA. The decorator pattern ensures instrumentation is separate from business logic.
import time
from typing import Callable, Iterator, Any
from dataclasses import dataclass, field
from threading import Lock
@dataclass
class StageMetrics:
name: str
items_in: int = 0
items_out: int = 0
errors: int = 0
total_latency_ms: float = 0.0
min_latency_ms: float = float('inf')
max_latency_ms: float = 0.0
@property
def avg_latency_ms(self) -> float:
pass
@property
def throughput_pct(self) -> float:
pass
class InstrumentedPipeline:
"""Pipeline with per-stage metrics collection.
Wrap each stage to measure latency, throughput, and error rate.
"""
passExpected Output
Stage: ingest | in=10, out=10, errors=0, avg_latency=X.XXms\nStage: transform | in=10, out=8, errors=2, avg_latency=X.XXms\nStage: sink | in=8, out=8, errors=0, avg_latency=X.XXmsHints
Hint 1: Wrap each stage function with a decorator that records start/end time, increments items_in/items_out/errors, and updates latency stats.
Hint 2: The instrumented stage returns a generator that yields from the original stage but updates metrics on each item. Use a context manager for timing.
Build an async fan-in merger that combines multiple async data sources into a single stream, supporting both arrival-order and round-robin modes.
import asyncio
from typing import AsyncIterator, List, Tuple, Any
async def merge_arrival(sources: List[AsyncIterator]) -> AsyncIterator[Tuple[int, Any]]:
queue: asyncio.Queue = asyncio.Queue()
active = len(sources)
async def producer(idx: int, source: AsyncIterator):
nonlocal active
try:
async for item in source:
await queue.put((idx, item))
finally:
await queue.put(None)
tasks = [asyncio.create_task(producer(i, src)) for i, src in enumerate(sources)]
finished = 0
while finished < len(sources):
item = await queue.get()
if item is None:
finished += 1
else:
yield item
for t in tasks:
await t
async def merge_round_robin(sources: List[AsyncIterator]) -> AsyncIterator[Tuple[int, Any]]:
iters = list(enumerate(sources))
while iters:
exhausted = []
for idx, source in iters:
try:
item = await source.__anext__()
yield (idx, item)
except StopAsyncIteration:
exhausted.append((idx, source))
iters = [(i, s) for i, s in iters if (i, s) not in exhausted]
async def make_source(items, delay=0.01):
for item in items:
await asyncio.sleep(delay)
yield item
async def main():
src1 = make_source([1, 2, 3], delay=0.01)
src2 = make_source([10, 20, 30], delay=0.015)
src3 = make_source([100, 200], delay=0.005)
arrival = []
async for src_idx, item in merge_arrival([src1, src2, src3]):
arrival.append((src_idx, item))
print(f"Arrival order: {arrival}")
src1 = make_source([1, 2, 3], delay=0.0)
src2 = make_source([10, 20, 30], delay=0.0)
src3 = make_source([100, 200], delay=0.0)
rr = []
async for src_idx, item in merge_round_robin([src1, src2, src3]):
rr.append((src_idx, item))
print(f"Round-robin order: {rr}")
asyncio.run(main())
Solution
import asyncio
from typing import AsyncIterator, List, Tuple, Any
async def merge_arrival(sources: List[AsyncIterator]) -> AsyncIterator[Tuple[int, Any]]:
queue: asyncio.Queue = asyncio.Queue()
async def producer(idx: int, source: AsyncIterator):
try:
async for item in source:
await queue.put((idx, item))
finally:
await queue.put(None)
tasks = [asyncio.create_task(producer(i, src)) for i, src in enumerate(sources)]
finished = 0
while finished < len(sources):
item = await queue.get()
if item is None:
finished += 1
else:
yield item
for t in tasks:
await t
async def merge_round_robin(sources: List[AsyncIterator]) -> AsyncIterator[Tuple[int, Any]]:
iters = list(enumerate(sources))
while iters:
still_active = []
for idx, source in iters:
try:
item = await source.__anext__()
yield (idx, item)
still_active.append((idx, source))
except StopAsyncIteration:
pass
iters = still_active
async def make_source(items, delay=0.0):
for item in items:
await asyncio.sleep(delay)
yield item
async def main():
src1, src2, src3 = make_source([1,2,3], 0.01), make_source([10,20,30], 0.015), make_source([100,200], 0.005)
arrival = [(i, v) async for i, v in merge_arrival([src1, src2, src3])]
print(f"Arrival order: {arrival}")
src1, src2, src3 = make_source([1,2,3]), make_source([10,20,30]), make_source([100,200])
rr = [(i, v) async for i, v in merge_round_robin([src1, src2, src3])]
print(f"Round-robin order: {rr}")
asyncio.run(main())
Fan-in in streaming systems: Kafka Streams uses fan-in when merging multiple topic partitions into a single processing stream. Flink's DataStream.union() implements arrival-order merging. Round-robin is fairer when sources have different volumes — it prevents a high-volume source from starving the others. The correct choice depends on whether you need fairness (round-robin) or low latency (arrival order).
import asyncio
from typing import AsyncIterator, List, Any, Optional
async def merge_sources(
sources: List[AsyncIterator],
order: str = 'arrival',
) -> AsyncIterator:
"""Merge multiple async sources into a single stream.
order='arrival': yield items as they arrive from any source (interleaved)
order='round_robin': take one item from each source in turn
Each item is tagged with its source index: (source_idx, item)
Stops when all sources are exhausted.
"""
passExpected Output
Arrival order: items tagged with source index\nRound-robin order: alternates between sources evenlyHints
Hint 1: For arrival order: use asyncio.Queue. Each source has a producer coroutine that puts (idx, item) into a shared queue. Use a counter to track when all producers finish, then send a sentinel.
Hint 2: For round-robin: cycle through sources with itertools.cycle, using asyncio.wait_for() to get one item from each source in turn. Track which sources are exhausted.
