Project 03 - Functional Data Processor
Estimated time: 5–7 hours core | Level: Intermediate → Advanced
Before reading the requirements, consider this question: what is the difference between a function that raises an exception when its input is invalid, and a function that returns a value representing failure? The first breaks the composition chain - you cannot pipe its output into the next function. The second keeps the chain intact. This project is built entirely around that distinction.
Learning Objectives
By the time this project is complete, you will have practiced:
- Implementing
composeandpipefor right-to-left and left-to-right function composition - Using
functools.partialto create specialised versions of general functions - Using
functools.singledispatchfor type-based dispatch withoutisinstancechains - Implementing the Result monad (
Ok/Err) as a pattern for error propagation without exceptions - Writing genuinely pure functions (no side effects, same input always gives same output)
- Testing pure functions without mocks - the simplest test suite you will ever write
- Connecting these patterns to production use cases in ETL, FastAPI middleware, and data pipelines
System Overview
You are building a data transformation system where raw input flows through a composed pipeline of pure functions. The central function is compose (or pipe), which takes a sequence of functions and returns a new function that applies them in order. Error handling is done via a Result type that carries either a success value (Ok) or an error description (Err). Functions that can fail return Result; the composition chain short-circuits on Err without raising exceptions.
The target API:
process = compose(
parse_json,
validate_schema(user_schema),
transform(normalize_email, hash_password, add_timestamps),
partial(enrich, lookup_table=country_codes),
serialize("json"),
)
result = process(raw_input)
if isinstance(result, Ok):
print(result.value)
else:
print(f"Processing failed: {result.error}")
Requirements
R1 - Result type: Ok and Err
Implement two classes: Ok and Err. They represent the two possible outcomes of a computation that might fail.
Ok(value): wraps a successful result.ok.valuereturns the wrapped value.Err(error): wraps a failure description (a string or exception).err.errorreturns the description.- Both must implement
__repr__:Ok(42)andErr('invalid email format'). - Both must implement
__eq__so tests can compare results by value:Ok(42) == Ok(42)isTrue. Ok.map(fn): appliesfnto the value and returnsOk(fn(value)). Iffnraises, returnsErr(str(exception)).Err.map(fn): returnsselfunchanged - errors propagate throughmapautomatically.Ok.and_then(fn): appliesfnto the value wherefnitself returns aResult. Returns whateverfnreturns.Err.and_then(fn): returnsselfunchanged.
This is the minimal Result monad. map is for functions that return plain values. and_then (also called flat_map or bind) is for functions that return Result.
R2 - compose(*fns) and pipe(*fns)
compose(*fns): returns a function that appliesfnsright-to-left.compose(f, g, h)(x)computesf(g(h(x))).pipe(*fns): returns a function that appliesfnsleft-to-right.pipe(h, g, f)(x)computesf(g(h(x))). More readable for data pipelines where you think "first do h, then g, then f".- Both must handle
Result-aware composition: if a function in the chain returns anErr, subsequent functions are not called - theErrpropagates to the end. - Both must work with zero functions:
compose()(x)returnsxunchanged. - Both must work with a single function:
compose(f)(x)returnsf(x).
Result-aware composition behaviour: if the intermediate value is an Ok, unwrap it before passing to the next function. If it is an Err, short-circuit.
R3 - parse_json(raw) and parse_csv_line(raw)
parse_json(raw: str) -> Result: parses a JSON string. ReturnsOk(dict)on success,Err("parse error: ...")on failure.parse_csv_line(raw: str, headers: list[str]) -> Result: splits a CSV line and maps fields to headers. ReturnsOk(dict)on success,Err(...)if field count does not match header count.- Both are pure functions - no side effects, no global state.
Use functools.singledispatch to implement a unified parse function that dispatches on input type:
parse(str_input)→ attempts JSON parseparse(bytes_input)→ decodes bytes to str, then parses as JSONparse(dict_input)→ returnsOk(dict_input)directly (already parsed)
R4 - Schema validation
validate_schema(schema: dict) -> Callable: returns a function that validates a dict against a schema. The schema maps field names to types:{"email": str, "age": int, "name": str}.- The returned function takes a dict and returns
Ok(dict)if all required fields are present and have the correct types. ReturnsErr("validation error: ...")otherwise. - This is a higher-order function -
validate_schemais a function that returns a function, suitable for use incompose.
R5 - Transformation functions
All transformation functions must be pure - no mutation of the input, no side effects, no global reads.
normalize_email(record: dict) -> dict: returns a new dict withrecord["email"]lowercased and stripped of whitespace. Does not modify the input dict - returns{**record, "email": cleaned}.hash_password(record: dict) -> dict: ifrecordcontains a"password"key, returns a new dict with the password replaced by its SHA-256 hex digest. Usehashlib.sha256. Does not modify the input.add_timestamps(record: dict) -> dict: returns a new dict with"created_at"set to a fixed sentinel string"1970-01-01T00:00:00Z". The timestamp is fixed (notdatetime.now()) because a function that callsdatetime.now()is not pure - it returns different values on different calls. Document this tradeoff in a comment.transform(*fns) -> Callable: returns a function that applies allfnsin sequence to a dict. Each function takes and returns a dict. Equivalent topipe(*fns)but specifically for dict-to-dict transformations.
R6 - Enrichment
enrich(record: dict, lookup_table: dict) -> dict: looks uprecord.get("country_code")inlookup_tableand adds a"country_name"field to the record. If the code is not found, sets"country_name"to"Unknown". Pure - returns a new dict.partial(enrich, lookup_table=country_codes)produces an enrichment function ready for use in a pipeline without the caller needing to passlookup_tableeach time.
R7 - Serialization with singledispatch
serialize(format: str) -> Callable: returns a function that serializes a dict to a string.- Use
functools.singledispatchto implement the underlying dispatch:serialize_impl(dict)→ dispatches to JSON or CSV serializer based on the registeredformatargument (note:singledispatchdispatches on type, not on a string argument - think carefully about the design here and document your approach).- Alternatively, use
singledispatchon the record type, registering handlers fordictand for theOktype so that serialization unwrapsOktransparently.
R8 - Full pipeline demonstration
Assemble a complete pipeline that:
- Parses raw JSON input
- Validates against a user schema
- Normalises email and hashes password
- Enriches with country codes
- Serialises to JSON
The pipeline handles both valid and invalid input. For invalid input, it returns an Err with a descriptive message - not an exception.
Starter Code Skeleton
import functools
import hashlib
import json
import operator
from typing import Any, Callable
# ── Result Type ───────────────────────────────────────────────────────────────
class Ok:
"""Represents a successful result wrapping a value."""
def __init__(self, value: Any):
self.value = value
def map(self, fn: Callable) -> "Result":
# TODO: apply fn to self.value
# TODO: if fn raises, return Err(str(exception))
# TODO: otherwise return Ok(fn(self.value))
pass
def and_then(self, fn: Callable) -> "Result":
# TODO: apply fn to self.value - fn returns a Result
# TODO: return whatever fn returns
pass
def __repr__(self) -> str:
return f"Ok({self.value!r})"
def __eq__(self, other) -> bool:
return isinstance(other, Ok) and self.value == other.value
class Err:
"""Represents a failed result wrapping an error description."""
def __init__(self, error: Any):
self.error = error
def map(self, fn: Callable) -> "Err":
# TODO: return self - errors propagate unchanged through map
pass
def and_then(self, fn: Callable) -> "Err":
# TODO: return self - errors propagate unchanged through and_then
pass
def __repr__(self) -> str:
return f"Err({self.error!r})"
def __eq__(self, other) -> bool:
return isinstance(other, Err) and self.error == other.error
Result = Ok | Err
# ── compose and pipe ──────────────────────────────────────────────────────────
def compose(*fns: Callable) -> Callable:
"""
Right-to-left function composition.
compose(f, g, h)(x) == f(g(h(x)))
Result-aware: if any step returns Err, subsequent steps are skipped.
"""
def composed(value: Any) -> Any:
# TODO: apply fns in reverse order
# TODO: if current value is Err, short-circuit and return it
# TODO: if current value is Ok, unwrap before passing to next fn
pass
return composed
def pipe(*fns: Callable) -> Callable:
"""
Left-to-right function composition.
pipe(h, g, f)(x) == f(g(h(x)))
Result-aware: if any step returns Err, subsequent steps are skipped.
"""
def piped(value: Any) -> Any:
# TODO: apply fns in order
# TODO: same Result-awareness as compose
pass
return piped
# ── Parsing ───────────────────────────────────────────────────────────────────
def parse_json(raw: str) -> Result:
"""Parse a JSON string. Returns Ok(dict) or Err(message)."""
# TODO: json.loads(raw) - return Ok on success, Err on json.JSONDecodeError
pass
def parse_csv_line(raw: str, headers: list[str]) -> Result:
"""Split a CSV line and map to headers. Returns Ok(dict) or Err(message)."""
# TODO: split on comma, check field count matches len(headers)
# TODO: return Ok(dict(zip(headers, fields))) or Err(message)
pass
@functools.singledispatch
def parse(raw) -> Result:
"""
Type-dispatched parser. Register handlers for str, bytes, and dict.
Default (unregistered type) returns Err.
"""
return Err(f"parse: unsupported input type {type(raw).__name__}")
@parse.register(str)
def _parse_str(raw: str) -> Result:
# TODO: delegate to parse_json
pass
@parse.register(bytes)
def _parse_bytes(raw: bytes) -> Result:
# TODO: decode to str, then delegate to parse_json
pass
@parse.register(dict)
def _parse_dict(raw: dict) -> Result:
# TODO: return Ok(raw) - already parsed
pass
# ── Schema Validation ─────────────────────────────────────────────────────────
def validate_schema(schema: dict[str, type]) -> Callable[[dict], Result]:
"""
Returns a validation function for the given schema.
Schema: {"field_name": expected_type, ...}
All fields in schema are required.
"""
def validate(record: dict) -> Result:
# TODO: check all keys in schema exist in record
# TODO: check isinstance(record[key], schema[key]) for each key
# TODO: return Ok(record) or Err("validation error: ...")
pass
return validate
# ── Transformation Functions ──────────────────────────────────────────────────
def normalize_email(record: dict) -> dict:
"""Return new dict with email lowercased and stripped. Pure - does not mutate input."""
# TODO: return {**record, "email": record["email"].strip().lower()}
pass
def hash_password(record: dict) -> dict:
"""Return new dict with password replaced by SHA-256 hex digest. Pure."""
# TODO: if "password" not in record, return record unchanged
# TODO: hashed = hashlib.sha256(record["password"].encode()).hexdigest()
# TODO: return {**record, "password": hashed}
pass
def add_timestamps(record: dict) -> dict:
"""
Return new dict with created_at set to a fixed sentinel timestamp.
NOTE: This function uses a fixed timestamp instead of datetime.now() because
datetime.now() would make the function impure - it returns different values
on different calls for the same input. In production, the timestamp would
typically be injected as a parameter or provided by a clock abstraction
that can be controlled in tests.
"""
# TODO: return {**record, "created_at": "1970-01-01T00:00:00Z"}
pass
def transform(*fns: Callable[[dict], dict]) -> Callable[[dict], dict]:
"""
Returns a function that applies all fns in sequence to a dict.
Each fn must be a pure dict -> dict transformation.
"""
# TODO: return pipe(*fns) applied to the input dict
# Hint: this is just pipe(*fns) restricted to dict inputs
pass
# ── Enrichment ────────────────────────────────────────────────────────────────
def enrich(record: dict, lookup_table: dict) -> dict:
"""
Look up country_code in lookup_table, add country_name field.
Pure - returns new dict, does not modify input.
"""
# TODO: country_name = lookup_table.get(record.get("country_code"), "Unknown")
# TODO: return {**record, "country_name": country_name}
pass
# ── Serialization ─────────────────────────────────────────────────────────────
def serialize(format: str) -> Callable:
"""
Returns a serializer function for the given format.
Supported: "json", "csv" (keys sorted alphabetically, values joined by comma).
Uses functools.singledispatch internally to handle Ok wrapping transparently.
"""
@functools.singledispatch
def _serialize(record) -> Result:
return Err(f"serialize: unsupported type {type(record).__name__}")
@_serialize.register(dict)
def _serialize_dict(record: dict) -> Result:
# TODO: if format == "json": return Ok(json.dumps(record))
# TODO: if format == "csv": return Ok(",".join(str(record[k]) for k in sorted(record)))
# TODO: else Err(f"unknown format: {format}")
pass
@_serialize.register(Ok)
def _serialize_ok(result: Ok) -> Result:
# TODO: unwrap and delegate to _serialize_dict
pass
return _serialize
# ── Full Pipeline Demonstration ───────────────────────────────────────────────
if __name__ == "__main__":
# ── Setup ─────────────────────────────────────────────────────────────────
country_codes = {
"GB": "United Kingdom",
"US": "United States",
"DE": "Germany",
"FR": "France",
}
user_schema = {
"email": str,
"name": str,
"age": int,
"country_code": str,
}
# ── Build pipeline ────────────────────────────────────────────────────────
process = pipe(
parse_json,
validate_schema(user_schema),
transform(normalize_email, hash_password, add_timestamps),
functools.partial(enrich, lookup_table=country_codes),
serialize("json"),
)
# ── Valid input ───────────────────────────────────────────────────────────
valid_raw = json.dumps({
"name": "Alice",
"age": 30,
"country_code": "GB",
"password": "hunter2",
})
print("=== valid input ===")
result = process(valid_raw)
print(result)
# ── Invalid JSON ──────────────────────────────────────────────────────────
print("\n=== invalid JSON ===")
result = process("{not valid json}")
print(result)
# ── Schema violation ──────────────────────────────────────────────────────
print("\n=== schema violation (age is string) ===")
bad_schema_raw = json.dumps({
"name": "Bob",
"age": "thirty", # wrong type
"country_code": "US",
})
result = process(bad_schema_raw)
print(result)
# ── Unknown country code ──────────────────────────────────────────────────
print("\n=== unknown country code ===")
unknown_country_raw = json.dumps({
"name": "Carol",
"age": 25,
"country_code": "XX",
})
result = process(unknown_country_raw)
print(result)
# ── singledispatch parse demo ─────────────────────────────────────────────
print("\n=== singledispatch parse ===")
print(parse('{"key": "value"}'))
print(parse(b'{"key": "value"}'))
print(parse({"key": "value"}))
print(parse(12345))
# ── compose vs pipe ───────────────────────────────────────────────────────
print("\n=== compose vs pipe (same result, different readability) ===")
add_one = lambda x: x + 1
double = lambda x: x * 2
square = lambda x: x ** 2
piped = pipe(add_one, double, square) # (x+1)*2, then squared
composed = compose(square, double, add_one) # same operations, right-to-left notation
print(f"pipe(add_one, double, square)(3) = {piped(3)}")
print(f"compose(square, double, add_one)(3) = {composed(3)}")
assert piped(3) == composed(3)
print("compose and pipe agree: True")
Expected Output
=== valid input ===
Ok('{"age": 30, "country_code": "GB", "country_name": "United Kingdom", "created_at": "1970-01-01T00:00:00Z", "email": "[email protected]", "name": "Alice", "password": "f52fbd32b2b3b86ff88ef6c490628285f482af15ddcb29541f94bcf526a3f6c7"}')
=== invalid JSON ===
Err('parse error: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)')
=== schema violation (age is string) ===
Err('validation error: field "age" expected int, got str')
=== unknown country code ===
Ok('{"age": 25, "country_code": "XX", "country_name": "Unknown", "created_at": "1970-01-01T00:00:00Z", "email": "[email protected]", "name": "Carol"}')
=== singledispatch parse ===
Ok({'key': 'value'})
Ok({'key': 'value'})
Ok({'key': 'value'})
Err('parse: unsupported input type int')
=== compose vs pipe (same result, different readability) ===
pipe(add_one, double, square)(3) = 64
compose(square, double, add_one)(3) = 64
compose and pipe agree: True
Note: the SHA-256 hash of "hunter2" is deterministic - it must match the expected output exactly.
The Result Monad - Concept Explanation
The Result type is a pattern from functional programming that solves a specific problem: how do you compose functions that can fail without scattering try/except blocks across every call site?
Without Result:
def process(raw):
try:
record = parse_json(raw)
except json.JSONDecodeError as e:
return None # or raise, or return {}
try:
validated = validate_schema(user_schema)(record)
except ValidationError as e:
return None
# ... and so on for every step
With Result:
def process(raw):
return pipe(
parse_json, # returns Ok(dict) or Err(message)
validate_schema(schema), # receives dict, returns Ok(dict) or Err(message)
normalize_email, # receives dict, returns dict (no failure possible)
serialize("json"), # returns Ok(str) or Err(message)
)(raw)
# If any step returns Err, the rest are skipped automatically
The composition chain handles error propagation. Individual functions do not need to know about each other's failure modes.
map vs and_then:
- Use
mapwhen wrapping a function that returns a plain value:Ok(42).map(lambda x: x * 2)→Ok(84) - Use
and_thenwhen wrapping a function that returns aResultitself:Ok("raw").and_then(parse_json)→ whateverparse_jsonreturns
If you used map with a Result-returning function, you would get Ok(Ok(value)) - a nested Result. and_then flattens it.
Step-by-Step Hints
Hint 1 - Start with Ok and Err, then test them in isolation.
Before touching compose or pipe, confirm that Ok(42).map(lambda x: x + 1) == Ok(43) and Err("oops").map(lambda x: x + 1) == Err("oops"). These four tests cover the entire map contract.
Hint 2 - compose applies functions in reverse order.
def compose(*fns):
def composed(value):
for fn in reversed(fns): # right-to-left
if isinstance(value, Err):
return value # short-circuit
if isinstance(value, Ok):
value = value.value # unwrap Ok before passing
value = fn(value)
return value
return composed
pipe is the same but without reversed.
Hint 3 - Pure functions never mutate their input.
The clearest sign of an impure function is assignment to record["key"] = value. This modifies the dict that was passed in, which means the caller's dict has also changed. Always use {**record, "key": new_value} to create a new dict with the modification.
Hint 4 - functools.singledispatch dispatches on the first argument's type.
@functools.singledispatch
def parse(raw):
return Err(f"unsupported type: {type(raw).__name__}")
@parse.register(str)
def _parse_str(raw):
...
@parse.register(bytes)
def _parse_bytes(raw):
...
Call parse("some string") - Python looks up the type str, finds _parse_str, and calls it. If no handler is registered for the type, the base function is called.
Hint 5 - functools.partial for enrich.
functools.partial(enrich, lookup_table=country_codes) returns a new function that behaves like enrich but with lookup_table already bound. The returned function only needs record as its argument, making it suitable for use in a pipe.
Hint 6 - validate_schema is a closure.
def validate_schema(schema):
def validate(record):
for field, expected_type in schema.items():
if field not in record:
return Err(f'validation error: required field "{field}" missing')
if not isinstance(record[field], expected_type):
return Err(f'validation error: field "{field}" expected {expected_type.__name__}, '
f'got {type(record[field]).__name__}')
return Ok(record)
return validate
schema lives in the closure. Every call to validate_schema(my_schema) creates a new validate function with a different schema baked in.
Hint 7 - compose and pipe with zero or one function.
compose()(x) # reversed([]) is [], loop does not run, returns x unchanged
compose(f)(x) # reversed([f]) is [f], loop runs once: value = f(x)
No special-casing needed - the loop handles both naturally.
FP Concepts Tested
| Concept | Where it appears |
|---|---|
| Function composition | compose and pipe |
functools.partial | partial(enrich, lookup_table=country_codes) |
functools.singledispatch | parse dispatches on input type; serialize dispatches on Ok/dict |
| Pure functions | All transformation functions: no mutation, no side effects |
| Closures | validate_schema, transform, serialize return closures |
| Higher-order functions | validate_schema, transform, serialize take and return functions |
| Result monad | Ok/Err with map and and_then |
| Error propagation without exceptions | Pipeline short-circuits on Err silently |
operator module | Can replace simple lambdas: operator.itemgetter("email") instead of lambda r: r["email"] |
Engineering Notes - Where These Patterns Appear in Production
compose and pipe are the foundation of functional data pipeline libraries. Apache Beam's Python SDK expresses transforms as composable PTransform objects. Pandas' .pipe() method uses exactly this pattern - df.pipe(clean).pipe(validate).pipe(aggregate). The toolz and cytoolz libraries provide production-grade compose and pipe that handle edge cases you will discover in the extension challenges.
The Result monad appears in Rust's Result<T, E> type (the language requires you to handle both arms), Haskell's Either, and in Python via the returns library. FastAPI's response model validation uses a conceptually similar pattern: a request is either valid (proceed) or invalid (return 400 without running the handler). Railway-oriented programming is the name for the design pattern where data flows down a "happy path" and errors flow down an "error track".
functools.singledispatch is used in the Python standard library itself - json.JSONEncoder uses it internally for type dispatching. The dataclasses module uses a similar dispatch mechanism. In production, singledispatch replaces isinstance chains in serializers and formatters, making it easy to add support for new types without modifying existing code (the open/closed principle).
functools.partial appears throughout production Python. Flask's app.route decorator creates partial applications. Django's class-based views use as_view() to produce a partial application of the view class. Any time a function has a parameter that is constant in a particular context - a database connection, a configuration object, a lookup table - partial is cleaner than a lambda wrapper.
Pure functions and testability is not an academic concern. Functions that call datetime.now(), random.random(), or open() cannot be tested without mocking. Every mock is technical debt - it couples the test to an implementation detail. Pure functions have no implementation details visible to the outside. The test suite for this project requires zero mocks: same input, same output, every time.
Extension Challenges
Extension 1 - Result.map_err(fn)
Add map_err(fn) to both Ok and Err. Ok.map_err returns self unchanged. Err.map_err(fn) returns Err(fn(self.error)). This is useful for transforming error messages at specific points in a pipeline - for example, adding context: result.map_err(lambda e: f"in user validation: {e}").
Extension 2 - pipe with async functions
Modify pipe to detect async functions using inspect.iscoroutinefunction and return an async composed function if any step is async. This mirrors what FastAPI's dependency injection system does when composing sync and async dependencies.
Extension 3 - memoize(fn) from scratch
Implement memoize as a decorator that caches the results of a pure function. Because the function is pure, caching is always safe - same input always gives same output. Use a dict keyed by (args, frozenset(sorted(kwargs.items()))). Compare to functools.lru_cache - what does lru_cache add that yours does not?
Extension 4 - Applicative: Ok.apply(fn_result)
Implement apply where fn_result is an Ok wrapping a function. Ok(5).apply(Ok(lambda x: x * 2)) returns Ok(10). This is the applicative functor pattern - it allows applying a wrapped function to a wrapped value. Useful when both the function and the value might be the result of fallible computations.
Extension 5 - Type-safe schema using dataclasses
Replace the {"field": type} schema dict with a @dataclass definition. Use dataclasses.fields() and dataclasses.asdict() to implement validate_schema and serialize generically for any dataclass. The pipeline then becomes: parse JSON → validate into a typed dataclass → transform → serialize back.
