Skip to main content

Project 03 - Functional Data Processor

Estimated time: 5–7 hours core | Level: Intermediate → Advanced

Before reading the requirements, consider this question: what is the difference between a function that raises an exception when its input is invalid, and a function that returns a value representing failure? The first breaks the composition chain - you cannot pipe its output into the next function. The second keeps the chain intact. This project is built entirely around that distinction.

Learning Objectives

By the time this project is complete, you will have practiced:

  • Implementing compose and pipe for right-to-left and left-to-right function composition
  • Using functools.partial to create specialised versions of general functions
  • Using functools.singledispatch for type-based dispatch without isinstance chains
  • Implementing the Result monad (Ok / Err) as a pattern for error propagation without exceptions
  • Writing genuinely pure functions (no side effects, same input always gives same output)
  • Testing pure functions without mocks - the simplest test suite you will ever write
  • Connecting these patterns to production use cases in ETL, FastAPI middleware, and data pipelines

System Overview

You are building a data transformation system where raw input flows through a composed pipeline of pure functions. The central function is compose (or pipe), which takes a sequence of functions and returns a new function that applies them in order. Error handling is done via a Result type that carries either a success value (Ok) or an error description (Err). Functions that can fail return Result; the composition chain short-circuits on Err without raising exceptions.

The target API:

process = compose(
parse_json,
validate_schema(user_schema),
transform(normalize_email, hash_password, add_timestamps),
partial(enrich, lookup_table=country_codes),
serialize("json"),
)

result = process(raw_input)

if isinstance(result, Ok):
print(result.value)
else:
print(f"Processing failed: {result.error}")

Requirements

R1 - Result type: Ok and Err

Implement two classes: Ok and Err. They represent the two possible outcomes of a computation that might fail.

  • Ok(value): wraps a successful result. ok.value returns the wrapped value.
  • Err(error): wraps a failure description (a string or exception). err.error returns the description.
  • Both must implement __repr__: Ok(42) and Err('invalid email format').
  • Both must implement __eq__ so tests can compare results by value: Ok(42) == Ok(42) is True.
  • Ok.map(fn): applies fn to the value and returns Ok(fn(value)). If fn raises, returns Err(str(exception)).
  • Err.map(fn): returns self unchanged - errors propagate through map automatically.
  • Ok.and_then(fn): applies fn to the value where fn itself returns a Result. Returns whatever fn returns.
  • Err.and_then(fn): returns self unchanged.

This is the minimal Result monad. map is for functions that return plain values. and_then (also called flat_map or bind) is for functions that return Result.

R2 - compose(*fns) and pipe(*fns)

  • compose(*fns): returns a function that applies fns right-to-left. compose(f, g, h)(x) computes f(g(h(x))).
  • pipe(*fns): returns a function that applies fns left-to-right. pipe(h, g, f)(x) computes f(g(h(x))). More readable for data pipelines where you think "first do h, then g, then f".
  • Both must handle Result-aware composition: if a function in the chain returns an Err, subsequent functions are not called - the Err propagates to the end.
  • Both must work with zero functions: compose()(x) returns x unchanged.
  • Both must work with a single function: compose(f)(x) returns f(x).

Result-aware composition behaviour: if the intermediate value is an Ok, unwrap it before passing to the next function. If it is an Err, short-circuit.

R3 - parse_json(raw) and parse_csv_line(raw)

  • parse_json(raw: str) -> Result: parses a JSON string. Returns Ok(dict) on success, Err("parse error: ...") on failure.
  • parse_csv_line(raw: str, headers: list[str]) -> Result: splits a CSV line and maps fields to headers. Returns Ok(dict) on success, Err(...) if field count does not match header count.
  • Both are pure functions - no side effects, no global state.

Use functools.singledispatch to implement a unified parse function that dispatches on input type:

  • parse(str_input) → attempts JSON parse
  • parse(bytes_input) → decodes bytes to str, then parses as JSON
  • parse(dict_input) → returns Ok(dict_input) directly (already parsed)

R4 - Schema validation

  • validate_schema(schema: dict) -> Callable: returns a function that validates a dict against a schema. The schema maps field names to types: {"email": str, "age": int, "name": str}.
  • The returned function takes a dict and returns Ok(dict) if all required fields are present and have the correct types. Returns Err("validation error: ...") otherwise.
  • This is a higher-order function - validate_schema is a function that returns a function, suitable for use in compose.

R5 - Transformation functions

All transformation functions must be pure - no mutation of the input, no side effects, no global reads.

  • normalize_email(record: dict) -> dict: returns a new dict with record["email"] lowercased and stripped of whitespace. Does not modify the input dict - returns {**record, "email": cleaned}.
  • hash_password(record: dict) -> dict: if record contains a "password" key, returns a new dict with the password replaced by its SHA-256 hex digest. Use hashlib.sha256. Does not modify the input.
  • add_timestamps(record: dict) -> dict: returns a new dict with "created_at" set to a fixed sentinel string "1970-01-01T00:00:00Z". The timestamp is fixed (not datetime.now()) because a function that calls datetime.now() is not pure - it returns different values on different calls. Document this tradeoff in a comment.
  • transform(*fns) -> Callable: returns a function that applies all fns in sequence to a dict. Each function takes and returns a dict. Equivalent to pipe(*fns) but specifically for dict-to-dict transformations.

R6 - Enrichment

  • enrich(record: dict, lookup_table: dict) -> dict: looks up record.get("country_code") in lookup_table and adds a "country_name" field to the record. If the code is not found, sets "country_name" to "Unknown". Pure - returns a new dict.
  • partial(enrich, lookup_table=country_codes) produces an enrichment function ready for use in a pipeline without the caller needing to pass lookup_table each time.

R7 - Serialization with singledispatch

  • serialize(format: str) -> Callable: returns a function that serializes a dict to a string.
  • Use functools.singledispatch to implement the underlying dispatch:
    • serialize_impl(dict) → dispatches to JSON or CSV serializer based on the registered format argument (note: singledispatch dispatches on type, not on a string argument - think carefully about the design here and document your approach).
    • Alternatively, use singledispatch on the record type, registering handlers for dict and for the Ok type so that serialization unwraps Ok transparently.

R8 - Full pipeline demonstration

Assemble a complete pipeline that:

  1. Parses raw JSON input
  2. Validates against a user schema
  3. Normalises email and hashes password
  4. Enriches with country codes
  5. Serialises to JSON

The pipeline handles both valid and invalid input. For invalid input, it returns an Err with a descriptive message - not an exception.

Starter Code Skeleton

import functools
import hashlib
import json
import operator
from typing import Any, Callable


# ── Result Type ───────────────────────────────────────────────────────────────

class Ok:
"""Represents a successful result wrapping a value."""

def __init__(self, value: Any):
self.value = value

def map(self, fn: Callable) -> "Result":
# TODO: apply fn to self.value
# TODO: if fn raises, return Err(str(exception))
# TODO: otherwise return Ok(fn(self.value))
pass

def and_then(self, fn: Callable) -> "Result":
# TODO: apply fn to self.value - fn returns a Result
# TODO: return whatever fn returns
pass

def __repr__(self) -> str:
return f"Ok({self.value!r})"

def __eq__(self, other) -> bool:
return isinstance(other, Ok) and self.value == other.value


class Err:
"""Represents a failed result wrapping an error description."""

def __init__(self, error: Any):
self.error = error

def map(self, fn: Callable) -> "Err":
# TODO: return self - errors propagate unchanged through map
pass

def and_then(self, fn: Callable) -> "Err":
# TODO: return self - errors propagate unchanged through and_then
pass

def __repr__(self) -> str:
return f"Err({self.error!r})"

def __eq__(self, other) -> bool:
return isinstance(other, Err) and self.error == other.error


Result = Ok | Err


# ── compose and pipe ──────────────────────────────────────────────────────────

def compose(*fns: Callable) -> Callable:
"""
Right-to-left function composition.
compose(f, g, h)(x) == f(g(h(x)))
Result-aware: if any step returns Err, subsequent steps are skipped.
"""
def composed(value: Any) -> Any:
# TODO: apply fns in reverse order
# TODO: if current value is Err, short-circuit and return it
# TODO: if current value is Ok, unwrap before passing to next fn
pass
return composed


def pipe(*fns: Callable) -> Callable:
"""
Left-to-right function composition.
pipe(h, g, f)(x) == f(g(h(x)))
Result-aware: if any step returns Err, subsequent steps are skipped.
"""
def piped(value: Any) -> Any:
# TODO: apply fns in order
# TODO: same Result-awareness as compose
pass
return piped


# ── Parsing ───────────────────────────────────────────────────────────────────

def parse_json(raw: str) -> Result:
"""Parse a JSON string. Returns Ok(dict) or Err(message)."""
# TODO: json.loads(raw) - return Ok on success, Err on json.JSONDecodeError
pass


def parse_csv_line(raw: str, headers: list[str]) -> Result:
"""Split a CSV line and map to headers. Returns Ok(dict) or Err(message)."""
# TODO: split on comma, check field count matches len(headers)
# TODO: return Ok(dict(zip(headers, fields))) or Err(message)
pass


@functools.singledispatch
def parse(raw) -> Result:
"""
Type-dispatched parser. Register handlers for str, bytes, and dict.
Default (unregistered type) returns Err.
"""
return Err(f"parse: unsupported input type {type(raw).__name__}")


@parse.register(str)
def _parse_str(raw: str) -> Result:
# TODO: delegate to parse_json
pass


@parse.register(bytes)
def _parse_bytes(raw: bytes) -> Result:
# TODO: decode to str, then delegate to parse_json
pass


@parse.register(dict)
def _parse_dict(raw: dict) -> Result:
# TODO: return Ok(raw) - already parsed
pass


# ── Schema Validation ─────────────────────────────────────────────────────────

def validate_schema(schema: dict[str, type]) -> Callable[[dict], Result]:
"""
Returns a validation function for the given schema.
Schema: {"field_name": expected_type, ...}
All fields in schema are required.
"""
def validate(record: dict) -> Result:
# TODO: check all keys in schema exist in record
# TODO: check isinstance(record[key], schema[key]) for each key
# TODO: return Ok(record) or Err("validation error: ...")
pass
return validate


# ── Transformation Functions ──────────────────────────────────────────────────

def normalize_email(record: dict) -> dict:
"""Return new dict with email lowercased and stripped. Pure - does not mutate input."""
# TODO: return {**record, "email": record["email"].strip().lower()}
pass


def hash_password(record: dict) -> dict:
"""Return new dict with password replaced by SHA-256 hex digest. Pure."""
# TODO: if "password" not in record, return record unchanged
# TODO: hashed = hashlib.sha256(record["password"].encode()).hexdigest()
# TODO: return {**record, "password": hashed}
pass


def add_timestamps(record: dict) -> dict:
"""
Return new dict with created_at set to a fixed sentinel timestamp.

NOTE: This function uses a fixed timestamp instead of datetime.now() because
datetime.now() would make the function impure - it returns different values
on different calls for the same input. In production, the timestamp would
typically be injected as a parameter or provided by a clock abstraction
that can be controlled in tests.
"""
# TODO: return {**record, "created_at": "1970-01-01T00:00:00Z"}
pass


def transform(*fns: Callable[[dict], dict]) -> Callable[[dict], dict]:
"""
Returns a function that applies all fns in sequence to a dict.
Each fn must be a pure dict -> dict transformation.
"""
# TODO: return pipe(*fns) applied to the input dict
# Hint: this is just pipe(*fns) restricted to dict inputs
pass


# ── Enrichment ────────────────────────────────────────────────────────────────

def enrich(record: dict, lookup_table: dict) -> dict:
"""
Look up country_code in lookup_table, add country_name field.
Pure - returns new dict, does not modify input.
"""
# TODO: country_name = lookup_table.get(record.get("country_code"), "Unknown")
# TODO: return {**record, "country_name": country_name}
pass


# ── Serialization ─────────────────────────────────────────────────────────────

def serialize(format: str) -> Callable:
"""
Returns a serializer function for the given format.
Supported: "json", "csv" (keys sorted alphabetically, values joined by comma).
Uses functools.singledispatch internally to handle Ok wrapping transparently.
"""
@functools.singledispatch
def _serialize(record) -> Result:
return Err(f"serialize: unsupported type {type(record).__name__}")

@_serialize.register(dict)
def _serialize_dict(record: dict) -> Result:
# TODO: if format == "json": return Ok(json.dumps(record))
# TODO: if format == "csv": return Ok(",".join(str(record[k]) for k in sorted(record)))
# TODO: else Err(f"unknown format: {format}")
pass

@_serialize.register(Ok)
def _serialize_ok(result: Ok) -> Result:
# TODO: unwrap and delegate to _serialize_dict
pass

return _serialize


# ── Full Pipeline Demonstration ───────────────────────────────────────────────

if __name__ == "__main__":
# ── Setup ─────────────────────────────────────────────────────────────────

country_codes = {
"GB": "United Kingdom",
"US": "United States",
"DE": "Germany",
"FR": "France",
}

user_schema = {
"email": str,
"name": str,
"age": int,
"country_code": str,
}

# ── Build pipeline ────────────────────────────────────────────────────────

process = pipe(
parse_json,
validate_schema(user_schema),
transform(normalize_email, hash_password, add_timestamps),
functools.partial(enrich, lookup_table=country_codes),
serialize("json"),
)

# ── Valid input ───────────────────────────────────────────────────────────
valid_raw = json.dumps({
"email": " [email protected] ",
"name": "Alice",
"age": 30,
"country_code": "GB",
"password": "hunter2",
})

print("=== valid input ===")
result = process(valid_raw)
print(result)

# ── Invalid JSON ──────────────────────────────────────────────────────────
print("\n=== invalid JSON ===")
result = process("{not valid json}")
print(result)

# ── Schema violation ──────────────────────────────────────────────────────
print("\n=== schema violation (age is string) ===")
bad_schema_raw = json.dumps({
"email": "[email protected]",
"name": "Bob",
"age": "thirty", # wrong type
"country_code": "US",
})
result = process(bad_schema_raw)
print(result)

# ── Unknown country code ──────────────────────────────────────────────────
print("\n=== unknown country code ===")
unknown_country_raw = json.dumps({
"email": "[email protected]",
"name": "Carol",
"age": 25,
"country_code": "XX",
})
result = process(unknown_country_raw)
print(result)

# ── singledispatch parse demo ─────────────────────────────────────────────
print("\n=== singledispatch parse ===")
print(parse('{"key": "value"}'))
print(parse(b'{"key": "value"}'))
print(parse({"key": "value"}))
print(parse(12345))

# ── compose vs pipe ───────────────────────────────────────────────────────
print("\n=== compose vs pipe (same result, different readability) ===")
add_one = lambda x: x + 1
double = lambda x: x * 2
square = lambda x: x ** 2

piped = pipe(add_one, double, square) # (x+1)*2, then squared
composed = compose(square, double, add_one) # same operations, right-to-left notation
print(f"pipe(add_one, double, square)(3) = {piped(3)}")
print(f"compose(square, double, add_one)(3) = {composed(3)}")
assert piped(3) == composed(3)
print("compose and pipe agree: True")

Expected Output

=== valid input ===
Ok('{"age": 30, "country_code": "GB", "country_name": "United Kingdom", "created_at": "1970-01-01T00:00:00Z", "email": "[email protected]", "name": "Alice", "password": "f52fbd32b2b3b86ff88ef6c490628285f482af15ddcb29541f94bcf526a3f6c7"}')

=== invalid JSON ===
Err('parse error: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)')

=== schema violation (age is string) ===
Err('validation error: field "age" expected int, got str')

=== unknown country code ===
Ok('{"age": 25, "country_code": "XX", "country_name": "Unknown", "created_at": "1970-01-01T00:00:00Z", "email": "[email protected]", "name": "Carol"}')

=== singledispatch parse ===
Ok({'key': 'value'})
Ok({'key': 'value'})
Ok({'key': 'value'})
Err('parse: unsupported input type int')

=== compose vs pipe (same result, different readability) ===
pipe(add_one, double, square)(3) = 64
compose(square, double, add_one)(3) = 64
compose and pipe agree: True

Note: the SHA-256 hash of "hunter2" is deterministic - it must match the expected output exactly.

The Result Monad - Concept Explanation

The Result type is a pattern from functional programming that solves a specific problem: how do you compose functions that can fail without scattering try/except blocks across every call site?

Without Result:

def process(raw):
try:
record = parse_json(raw)
except json.JSONDecodeError as e:
return None # or raise, or return {}

try:
validated = validate_schema(user_schema)(record)
except ValidationError as e:
return None

# ... and so on for every step

With Result:

def process(raw):
return pipe(
parse_json, # returns Ok(dict) or Err(message)
validate_schema(schema), # receives dict, returns Ok(dict) or Err(message)
normalize_email, # receives dict, returns dict (no failure possible)
serialize("json"), # returns Ok(str) or Err(message)
)(raw)
# If any step returns Err, the rest are skipped automatically

The composition chain handles error propagation. Individual functions do not need to know about each other's failure modes.

map vs and_then:

  • Use map when wrapping a function that returns a plain value: Ok(42).map(lambda x: x * 2)Ok(84)
  • Use and_then when wrapping a function that returns a Result itself: Ok("raw").and_then(parse_json) → whatever parse_json returns

If you used map with a Result-returning function, you would get Ok(Ok(value)) - a nested Result. and_then flattens it.

Step-by-Step Hints

Hint 1 - Start with Ok and Err, then test them in isolation. Before touching compose or pipe, confirm that Ok(42).map(lambda x: x + 1) == Ok(43) and Err("oops").map(lambda x: x + 1) == Err("oops"). These four tests cover the entire map contract.

Hint 2 - compose applies functions in reverse order.

def compose(*fns):
def composed(value):
for fn in reversed(fns): # right-to-left
if isinstance(value, Err):
return value # short-circuit
if isinstance(value, Ok):
value = value.value # unwrap Ok before passing
value = fn(value)
return value
return composed

pipe is the same but without reversed.

Hint 3 - Pure functions never mutate their input. The clearest sign of an impure function is assignment to record["key"] = value. This modifies the dict that was passed in, which means the caller's dict has also changed. Always use {**record, "key": new_value} to create a new dict with the modification.

Hint 4 - functools.singledispatch dispatches on the first argument's type.

@functools.singledispatch
def parse(raw):
return Err(f"unsupported type: {type(raw).__name__}")

@parse.register(str)
def _parse_str(raw):
...

@parse.register(bytes)
def _parse_bytes(raw):
...

Call parse("some string") - Python looks up the type str, finds _parse_str, and calls it. If no handler is registered for the type, the base function is called.

Hint 5 - functools.partial for enrich. functools.partial(enrich, lookup_table=country_codes) returns a new function that behaves like enrich but with lookup_table already bound. The returned function only needs record as its argument, making it suitable for use in a pipe.

Hint 6 - validate_schema is a closure.

def validate_schema(schema):
def validate(record):
for field, expected_type in schema.items():
if field not in record:
return Err(f'validation error: required field "{field}" missing')
if not isinstance(record[field], expected_type):
return Err(f'validation error: field "{field}" expected {expected_type.__name__}, '
f'got {type(record[field]).__name__}')
return Ok(record)
return validate

schema lives in the closure. Every call to validate_schema(my_schema) creates a new validate function with a different schema baked in.

Hint 7 - compose and pipe with zero or one function.

compose()(x) # reversed([]) is [], loop does not run, returns x unchanged
compose(f)(x) # reversed([f]) is [f], loop runs once: value = f(x)

No special-casing needed - the loop handles both naturally.

FP Concepts Tested

ConceptWhere it appears
Function compositioncompose and pipe
functools.partialpartial(enrich, lookup_table=country_codes)
functools.singledispatchparse dispatches on input type; serialize dispatches on Ok/dict
Pure functionsAll transformation functions: no mutation, no side effects
Closuresvalidate_schema, transform, serialize return closures
Higher-order functionsvalidate_schema, transform, serialize take and return functions
Result monadOk/Err with map and and_then
Error propagation without exceptionsPipeline short-circuits on Err silently
operator moduleCan replace simple lambdas: operator.itemgetter("email") instead of lambda r: r["email"]

Engineering Notes - Where These Patterns Appear in Production

compose and pipe are the foundation of functional data pipeline libraries. Apache Beam's Python SDK expresses transforms as composable PTransform objects. Pandas' .pipe() method uses exactly this pattern - df.pipe(clean).pipe(validate).pipe(aggregate). The toolz and cytoolz libraries provide production-grade compose and pipe that handle edge cases you will discover in the extension challenges.

The Result monad appears in Rust's Result<T, E> type (the language requires you to handle both arms), Haskell's Either, and in Python via the returns library. FastAPI's response model validation uses a conceptually similar pattern: a request is either valid (proceed) or invalid (return 400 without running the handler). Railway-oriented programming is the name for the design pattern where data flows down a "happy path" and errors flow down an "error track".

functools.singledispatch is used in the Python standard library itself - json.JSONEncoder uses it internally for type dispatching. The dataclasses module uses a similar dispatch mechanism. In production, singledispatch replaces isinstance chains in serializers and formatters, making it easy to add support for new types without modifying existing code (the open/closed principle).

functools.partial appears throughout production Python. Flask's app.route decorator creates partial applications. Django's class-based views use as_view() to produce a partial application of the view class. Any time a function has a parameter that is constant in a particular context - a database connection, a configuration object, a lookup table - partial is cleaner than a lambda wrapper.

Pure functions and testability is not an academic concern. Functions that call datetime.now(), random.random(), or open() cannot be tested without mocking. Every mock is technical debt - it couples the test to an implementation detail. Pure functions have no implementation details visible to the outside. The test suite for this project requires zero mocks: same input, same output, every time.

Extension Challenges

Extension 1 - Result.map_err(fn) Add map_err(fn) to both Ok and Err. Ok.map_err returns self unchanged. Err.map_err(fn) returns Err(fn(self.error)). This is useful for transforming error messages at specific points in a pipeline - for example, adding context: result.map_err(lambda e: f"in user validation: {e}").

Extension 2 - pipe with async functions Modify pipe to detect async functions using inspect.iscoroutinefunction and return an async composed function if any step is async. This mirrors what FastAPI's dependency injection system does when composing sync and async dependencies.

Extension 3 - memoize(fn) from scratch Implement memoize as a decorator that caches the results of a pure function. Because the function is pure, caching is always safe - same input always gives same output. Use a dict keyed by (args, frozenset(sorted(kwargs.items()))). Compare to functools.lru_cache - what does lru_cache add that yours does not?

Extension 4 - Applicative: Ok.apply(fn_result) Implement apply where fn_result is an Ok wrapping a function. Ok(5).apply(Ok(lambda x: x * 2)) returns Ok(10). This is the applicative functor pattern - it allows applying a wrapped function to a wrapped value. Useful when both the function and the value might be the result of fallible computations.

Extension 5 - Type-safe schema using dataclasses Replace the {"field": type} schema dict with a @dataclass definition. Use dataclasses.fields() and dataclasses.asdict() to implement validate_schema and serialize generically for any dataclass. The pipeline then becomes: parse JSON → validate into a typed dataclass → transform → serialize back.

© 2026 EngineersOfAI. All rights reserved.