Skip to main content

Serialization Concepts - pickle, dataclasses, and Format Tradeoffs

Reading time: ~20 minutes | Level: Foundation → Engineering

Here is a vulnerability that has appeared in real production systems - including ML platforms:

import pickle

# This looks harmless. It is not.
with open("model_weights.pkl", "rb") as f:
model = pickle.load(f) # Loading from an untrusted source

If model_weights.pkl was crafted by an attacker, this line executes arbitrary code on your machine. Not just reads data - executes code. A pickle file can contain arbitrary Python instructions, and pickle.load() runs them without question.

This is not a theoretical risk. It is a documented attack vector against ML pipelines that download "pretrained models" from the internet.

Serialization is the process of converting in-memory objects to bytes and back. Choosing the wrong format - or using the right format wrong - has consequences from data corruption to remote code execution.

What You Will Learn

  • What serialization is: converting objects to bytes and restoring them
  • pickle: what it can serialize, protocol versions, and its critical security risks
  • Safe uses of pickle and how ML practitioners use it correctly
  • Format comparison: pickle vs json vs msgpack vs protobuf
  • dataclasses: @dataclass, asdict(), astuple(), fields() for clean serializable objects
  • Converting dataclasses to/from JSON for API use
  • Pydantic models: validation and serialization together - the production API standard
  • struct: binary packing/unpacking for network protocols and file formats
  • shelve: persistent dict backed by pickle
  • Schema evolution: handling backward compatibility when your data format changes

Prerequisites

  • Understanding of Python classes and instances
  • Familiarity with json.dumps() and json.loads() (lesson 08)
  • Understanding of context managers (lesson 03)
  • Basic familiarity with file I/O (lessons 01 and 02)

Mental Model: Serialization as the Object Lifecycle

Serialization formats differ in:

  • Which types they can represent
  • Speed (encode + decode)
  • Size (bytes on wire or disk)
  • Cross-language compatibility
  • Security guarantees
  • Schema support (validation, evolution)

Part 1 - pickle: The Python-Native Serializer

What pickle Can Serialize

pickle can serialize almost any Python object:

import pickle
import datetime
from collections import defaultdict

# All of these work with pickle - many fail with json
objects = [
{"key": "value"}, # dict
[1, 2, 3], # list
(4, 5, 6), # tuple (preserved as tuple!)
{1, 2, 3}, # set
datetime.datetime.now(), # datetime (no custom encoder needed)
defaultdict(list, {"a": [1, 2]}), # defaultdict
lambda x: x * 2, # lambda (!)
range(1_000_000), # range object (not expanded)
]

for obj in objects:
data = pickle.dumps(obj)
restored = pickle.loads(data)
print(f"{type(obj).__name__:15} -> {len(data):5} bytes -> {restored!r:.40}")

The Core Four Functions

import pickle

# In-memory: bytes ↔ object
data = {"model": "RandomForest", "n_estimators": 100, "accuracy": 0.94}

pickled = pickle.dumps(data) # object → bytes
print(type(pickled)) # <class 'bytes'>
print(len(pickled)) # ~60 bytes

restored = pickle.loads(pickled) # bytes → object
print(restored) # {'model': 'RandomForest', ...}

# File-based: file ↔ object
with open("model_meta.pkl", "wb") as f: # "wb" - binary write!
pickle.dump(data, f)

with open("model_meta.pkl", "rb") as f: # "rb" - binary read!
restored_from_file = pickle.load(f)

print(restored_from_file["accuracy"]) # 0.94

:::warning Always use binary mode with pickle pickle files are binary. Always open them with "wb" for writing and "rb" for reading. Using text mode ("w" or "r") raises a TypeError. :::

Protocol Versions

pickle has multiple protocol versions that trade compatibility for efficiency:

import pickle

data = {"key": "value", "numbers": list(range(1000))}

for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
pickled = pickle.dumps(data, protocol=protocol)
print(f"Protocol {protocol}: {len(pickled):5} bytes")

# Protocol 0: 10234 bytes (ASCII, human-readable, slowest)
# Protocol 1: 7891 bytes (binary, Python 2 compatible)
# Protocol 2: 7891 bytes (new-style classes, Python 2 compatible)
# Protocol 3: 7876 bytes (bytes support, Python 3.0+)
# Protocol 4: 7871 bytes (large objects, Python 3.4+)
# Protocol 5: 7871 bytes (out-of-band buffers, Python 3.8+)
ProtocolUse when
pickle.HIGHEST_PROTOCOLAlways use for new data - smallest output, fastest
pickle.DEFAULT_PROTOCOLCurrent default (protocol 5)
protocol=2Maximum Python 2 compatibility
protocol=0Human-readable ASCII (debugging)

Always use pickle.HIGHEST_PROTOCOL for new systems - it produces the smallest output and is fastest:

import pickle

# Always use this for new code
with open("data.pkl", "wb") as f:
pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)

Part 2 - pickle Security: The Critical Warning

:::danger Never unpickle untrusted data pickle.load() on a malicious file is equivalent to running arbitrary Python code. An attacker can craft a pickle file that, when loaded, executes any code they choose - spawning a shell, exfiltrating files, installing malware. This is not theoretical. It is documented in Python's own documentation: "Warning: The pickle module is not secure. Only unpickle data you trust." :::

How the Attack Works

# This is what a malicious pickle looks like conceptually
# DO NOT use this code - it is shown purely for understanding

import pickle
import os

class MaliciousPayload:
def __reduce__(self):
# __reduce__ is called during pickling
# The return value is a callable + args to restore the object
# An attacker returns a shell command instead
return (os.system, ("echo 'Attacker was here' > /tmp/pwned",))

# When pickled and sent to a victim:
payload = pickle.dumps(MaliciousPayload())

# When the victim calls pickle.loads(payload):
# → os.system("echo 'Attacker was here' > /tmp/pwned") executes
# Replace with any command: rm -rf, curl | bash, etc.

Safe Uses of pickle

# SAFE: pickling your own objects within your own system
import pickle
from sklearn.ensemble import RandomForestClassifier # type: ignore
import numpy as np

# Train a model
X = np.random.rand(100, 4)
y = np.random.randint(0, 2, 100)
model = RandomForestClassifier(n_estimators=10)
model.fit(X, y)

# Save for later use within the SAME trusted system
with open("model.pkl", "wb") as f:
pickle.dump(model, f, protocol=pickle.HIGHEST_PROTOCOL)

# Load from your own file - safe because YOU wrote it
with open("model.pkl", "rb") as f:
loaded_model = pickle.load(f)

predictions = loaded_model.predict(X[:5])
print(predictions)

# NEVER SAFE:
# - Downloading a .pkl from the internet and loading it
# - Accepting pickle data from an HTTP request
# - Loading .pkl files from untrusted users

:::tip ML model saving alternatives For ML models, consider safer alternatives:

  • scikit-learn models: joblib.dump() / joblib.load() (same pickle risk but slightly safer interface)
  • PyTorch models: torch.save(model.state_dict(), path) + load separately
  • TensorFlow/Keras: model.save() in SavedModel format (not pickle)
  • ONNX: language-neutral binary format, safe to load from external sources :::

Part 3 - Format Comparison: Choosing the Right Serializer

FormatSpeedSizeCross-langSafetySchemaHuman-readable
pickleFastMediumPython onlyUNSAFE*NoneNo (binary)
jsonMediumLargeUniversalSafeNoneYes
msgpackFastSmallUniversalSafeNoneNo (binary)
protobufFastTinyUniversalSafeYesNo (binary)
CSVSlowLargeUniversalSafeNoneYes

* Unsafe = arbitrary code execution if data is from an untrusted source.

pickle - When to Use

  • Saving scikit-learn/sklearn models within a trusted ML pipeline
  • Python-to-Python inter-process communication via multiprocessing.Queue
  • Caching expensive computations (e.g., joblib.Memory)
  • shelve module (covered below)

json - When to Use

  • REST APIs and any cross-language data exchange
  • Config files that humans may edit
  • Structured logging (JSONL)
  • Any data that must survive a Python version upgrade

msgpack - When to Use

# pip install msgpack
import msgpack

data = {"event": "click", "x": 100, "y": 200, "values": list(range(1000))}

# msgpack: binary, fast, small, cross-language
packed = msgpack.packb(data)
unpacked = msgpack.unpackb(packed, raw=False)

import json
json_bytes = json.dumps(data).encode()

print(f"msgpack: {len(packed):6} bytes")
print(f"json: {len(json_bytes):6} bytes")
# msgpack: 4019 bytes
# json: 6891 bytes (≈70% larger)

msgpack is ideal for message queues (Kafka, RabbitMQ), high-frequency telemetry, and anywhere JSON is too verbose.

protobuf - When to Use

Protocol Buffers require defining a schema (.proto file) upfront. They produce the smallest output and are fast, but require code generation:

# Example proto definition:
# message User {
# int32 id = 1;
# string name = 2;
# repeated float scores = 3;
# }

# After code generation:
from user_pb2 import User # type: ignore

user = User(id=42, name="Alice", scores=[95.0, 87.0])
serialized = user.SerializeToString() # Very compact bytes
restored = User.FromString(serialized)
print(restored.name) # Alice

Use protobuf for: gRPC services, internal microservice communication where schema enforcement matters, and data that must be tiny (IoT, mobile).

Part 4 - dataclasses: Clean Serializable Objects

The dataclasses module (Python 3.7+) provides a structured way to define data-holding classes with automatic __init__, __repr__, and __eq__:

from dataclasses import dataclass, field, asdict, astuple, fields
from datetime import datetime
from typing import Optional

@dataclass
class ModelMetadata:
model_id: str
name: str
version: int
accuracy: float
created_at: datetime = field(default_factory=datetime.utcnow)
tags: list[str] = field(default_factory=list)
parent_id: Optional[str] = None

# Instantiation - __init__ is auto-generated
meta = ModelMetadata(
model_id="m-001",
name="SentimentClassifier",
version=3,
accuracy=0.924,
tags=["nlp", "production"],
)

print(meta)
# ModelMetadata(model_id='m-001', name='SentimentClassifier', version=3,
# accuracy=0.924, created_at=datetime.datetime(...), tags=['nlp', 'production'],
# parent_id=None)

# __eq__ is auto-generated
meta2 = ModelMetadata(model_id="m-001", name="SentimentClassifier", version=3, accuracy=0.924)
# meta == meta2 # True if all fields match (ignoring created_at since it's time-based)

asdict() - Convert to Dictionary

from dataclasses import asdict
import json

meta_dict = asdict(meta)
print(meta_dict)
# {'model_id': 'm-001', 'name': 'SentimentClassifier', 'version': 3,
# 'accuracy': 0.924, 'created_at': datetime.datetime(...), 'tags': ['nlp', 'production'],
# 'parent_id': None}

# Note: asdict() recursively converts nested dataclasses too!
@dataclass
class TrainingConfig:
learning_rate: float
batch_size: int

@dataclass
class ExperimentResult:
config: TrainingConfig # Nested dataclass
final_accuracy: float

result = ExperimentResult(
config=TrainingConfig(learning_rate=0.001, batch_size=32),
final_accuracy=0.94,
)

print(asdict(result))
# {'config': {'learning_rate': 0.001, 'batch_size': 32}, 'final_accuracy': 0.94}
# Nested dataclass becomes a nested dict - perfect for JSON

astuple() - Convert to Tuple

from dataclasses import astuple

@dataclass
class Point3D:
x: float
y: float
z: float

p = Point3D(1.0, 2.5, -0.3)
coords = astuple(p)
print(coords) # (1.0, 2.5, -0.3)
print(type(coords)) # <class 'tuple'>

# Useful for writing rows to CSV
import csv
with open("points.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["x", "y", "z"])
writer.writerow(astuple(p))

fields() - Introspect the Schema

from dataclasses import fields

for f in fields(ModelMetadata):
print(f"{f.name:15} type={f.type.__name__ if hasattr(f.type, '__name__') else f.type}")
# model_id type=str
# name type=str
# version type=int
# accuracy type=float
# created_at type=datetime
# tags type=list[str]
# parent_id type=Optional[str]

Dataclass + JSON Serialization

import json
from dataclasses import dataclass, asdict
from datetime import datetime

@dataclass
class Event:
event_id: str
event_type: str
occurred_at: datetime
payload: dict

def to_json(self) -> str:
d = asdict(self)
d["occurred_at"] = self.occurred_at.isoformat() # datetime not JSON-native
return json.dumps(d)

@classmethod
def from_json(cls, json_str: str) -> "Event":
d = json.loads(json_str)
d["occurred_at"] = datetime.fromisoformat(d["occurred_at"])
return cls(**d)


# Usage
event = Event(
event_id="evt-123",
event_type="model_deployed",
occurred_at=datetime(2024, 1, 15, 14, 30),
payload={"model_id": "m-001", "environment": "production"},
)

json_str = event.to_json()
print(json_str)
# {"event_id": "evt-123", "event_type": "model_deployed", "occurred_at": "2024-01-15T14:30:00",
# "payload": {"model_id": "m-001", "environment": "production"}}

restored = Event.from_json(json_str)
print(restored.occurred_at) # 2024-01-15 14:30:00
print(type(restored.occurred_at)) # <class 'datetime.datetime'>

Part 5 - Pydantic: Validation + Serialization Together

Pydantic is the production standard for API request/response models. It combines type validation with serialization in a single class:

# pip install pydantic
from pydantic import BaseModel, Field, field_validator
from datetime import datetime
from decimal import Decimal
from typing import Optional
import json

class UserCreate(BaseModel):
"""Request model for creating a new user. Validated on instantiation."""
username: str = Field(min_length=3, max_length=50, pattern=r"^[a-zA-Z0-9_]+$")
email: str = Field(pattern=r"^[^@]+@[^@]+\.[^@]+$")
age: int = Field(ge=13, le=120) # ge=greater-than-or-equal, le=less-than-or-equal
balance: Decimal = Field(default=Decimal("0.00"), ge=0)

@field_validator("username")
@classmethod
def username_not_reserved(cls, v):
reserved = {"admin", "root", "system"}
if v.lower() in reserved:
raise ValueError(f"Username '{v}' is reserved")
return v.lower() # Normalize to lowercase


class UserResponse(BaseModel):
"""Response model - what the API returns."""
user_id: int
username: str
email: str
created_at: datetime
balance: Decimal

model_config = {"json_encoders": {Decimal: str}}


# Validation on creation
try:
user = UserCreate(username="Alice", email="[email protected]", age=30)
print(user)
# username='alice' email='[email protected]' age=30 balance=Decimal('0.00')

except Exception as e:
print(e)

# Invalid data raises ValidationError with detailed messages
try:
bad = UserCreate(username="ab", email="not-an-email", age=200)
except Exception as e:
print(e)
# 3 validation errors for UserCreate
# username: String should have at least 3 characters
# email: String should match pattern...
# age: Input should be less than or equal to 120

Pydantic Serialization

from pydantic import BaseModel
from datetime import datetime
from decimal import Decimal

class OrderModel(BaseModel):
order_id: str
amount: Decimal
created_at: datetime
items: list[str]

order = OrderModel(
order_id="ord-456",
amount=Decimal("149.99"),
created_at=datetime(2024, 1, 15, 14, 30),
items=["Widget A", "Widget B"],
)

# Serialize to dict
d = order.model_dump()
print(d)
# {'order_id': 'ord-456', 'amount': Decimal('149.99'),
# 'created_at': datetime.datetime(2024, 1, 15, 14, 30), 'items': ['Widget A', 'Widget B']}

# Serialize to JSON string (handles datetime and Decimal natively)
json_str = order.model_dump_json()
print(json_str)
# {"order_id":"ord-456","amount":"149.99","created_at":"2024-01-15T14:30:00","items":["Widget A","Widget B"]}

# Deserialize from dict
order2 = OrderModel.model_validate(d)

# Deserialize from JSON string
order3 = OrderModel.model_validate_json(json_str)

print(order3.amount) # 149.99
print(type(order3.amount)) # <class 'decimal.Decimal'>
print(type(order3.created_at)) # <class 'datetime.datetime'>
Use dataclassesUse Pydantic
Simple data containers, no external inputAPI request/response models (FastAPI, Flask)
Internal application objects (ML features, results)Config files loaded from YAML/JSON/env vars
Zero runtime dependencies requiredData from external sources (user input, APIs)
Python 3.7+ stdlib onlyField-level validation (min/max, regex, custom rules)

Part 6 - struct: Binary Packing for Protocols and File Formats

The struct module packs Python values into binary byte sequences using C-style format strings. This is essential for reading binary file formats, implementing network protocols, and interfacing with C libraries:

import struct

# Format string characters:
# > = big-endian byte order (network byte order)
# < = little-endian (x86 native)
# i = signed 32-bit integer (4 bytes)
# I = unsigned 32-bit integer (4 bytes)
# f = 32-bit float (4 bytes)
# d = 64-bit double (8 bytes)
# h = signed 16-bit short (2 bytes)
# B = unsigned 8-bit byte (1 byte)
# s = bytes (use "4s" for 4-byte string)

# Pack: Python values → bytes
packet = struct.pack(">IhfB", 1234, -7, 3.14, 255)
print(packet) # b'\x00\x00\x04\xd2\xff\xf9@H\xf5\xc3\xff'
print(len(packet)) # 11 bytes (4 + 2 + 4 + 1)

# Unpack: bytes → Python values
values = struct.unpack(">IhfB", packet)
print(values) # (1234, -7, 3.140000104904175, 255)

Real-World: Reading a Binary File Header

Many binary file formats have fixed-size headers. struct lets you read them precisely:

import struct
from pathlib import Path

# Simulate a binary sensor log file format:
# Header: magic (4 bytes) + version (2 bytes) + record_count (4 bytes)
# Records: timestamp (8 bytes double) + sensor_id (4 bytes) + value (4 bytes float)

HEADER_FORMAT = ">4sHI" # big-endian: 4-char string, unsigned short, unsigned int
HEADER_SIZE = struct.calcsize(HEADER_FORMAT) # 10 bytes

RECORD_FORMAT = ">dIf" # big-endian: double, unsigned int, float
RECORD_SIZE = struct.calcsize(RECORD_FORMAT) # 16 bytes

def write_sensor_log(path: str, records: list[tuple]) -> None:
"""Write sensor data to a compact binary file."""
with open(path, "wb") as f:
# Write header
header = struct.pack(HEADER_FORMAT, b"SENS", 1, len(records))
f.write(header)

# Write records
for timestamp, sensor_id, value in records:
f.write(struct.pack(RECORD_FORMAT, timestamp, sensor_id, value))

def read_sensor_log(path: str) -> list[tuple]:
"""Read binary sensor log, returning list of (timestamp, sensor_id, value)."""
records = []
with open(path, "rb") as f:
# Read and validate header
header_data = f.read(HEADER_SIZE)
magic, version, record_count = struct.unpack(HEADER_FORMAT, header_data)

if magic != b"SENS":
raise ValueError(f"Invalid file format: expected b'SENS', got {magic!r}")

print(f"Format version: {version}, Record count: {record_count}")

# Read records
for _ in range(record_count):
record_data = f.read(RECORD_SIZE)
if len(record_data) < RECORD_SIZE:
break
records.append(struct.unpack(RECORD_FORMAT, record_data))

return records


import time

sample_data = [
(time.time(), 1001, 23.5),
(time.time() + 1, 1002, 24.1),
(time.time() + 2, 1001, 23.8),
]

write_sensor_log("/tmp/sensors.bin", sample_data)

file_size = Path("/tmp/sensors.bin").stat().st_size
print(f"Binary file size: {file_size} bytes")
# Binary file size: 58 bytes (10 header + 3 * 16 records)

restored = read_sensor_log("/tmp/sensors.bin")
# Format version: 1, Record count: 3
print(restored[0][2]) # 23.5

:::tip struct vs alternatives struct is for raw binary protocols where byte layout is fixed by external specification (network protocols, file formats like PNG/BMP/WAV, C library interfaces). For Python-to-Python binary data, prefer pickle (within trusted systems) or msgpack (cross-language). For human-readable config, use JSON. :::

Part 7 - shelve: Persistent Dictionary

shelve provides a dictionary-like interface backed by pickle. It persists to disk between program runs:

import shelve

# shelve creates multiple files (*.db, *.bak, *.dir) behind the scenes
with shelve.open("/tmp/app_cache") as db:
# Write - works exactly like a dict
db["user:42"] = {"name": "Alice", "score": 95, "tier": "premium"}
db["user:43"] = {"name": "Bob", "score": 72, "tier": "standard"}
db["config"] = {"max_users": 1000, "feature_flags": ["new_ui", "beta_api"]}

# Data persists between program runs
with shelve.open("/tmp/app_cache") as db:
user = db["user:42"]
print(user)
# {'name': 'Alice', 'score': 95, 'tier': 'premium'}

# Can store any picklable object - not just JSON-compatible types
from datetime import datetime
db["last_run"] = datetime.now()

print(list(db.keys()))
# ['user:42', 'user:43', 'config', 'last_run']

:::warning shelve inherits all pickle security risks Since shelve uses pickle under the hood, the same security warning applies: only open shelve databases you created yourself. A maliciously crafted shelve database can execute arbitrary code when opened. :::

When to Use shelve

Good use cases for shelve:
• Simple caching between program runs (e.g., rate limiter state)
• Small key-value stores that don't justify a full database
• Development-time persistence (quickly saving intermediate results)
• CLI tools that need to remember settings between invocations

Not appropriate for:
• Multi-process or multi-threaded access (no locking)
• Cross-language data sharing
• Large datasets (no indexing, poor performance at scale)
• Any data from external untrusted sources

Part 8 - Schema Evolution: Handling Data Format Changes

Real applications evolve. When you change a data class or serialization format, you must handle old data that was serialized with the previous format.

The Problem

# Version 1 of your model (deployed in January)
@dataclass
class UserProfileV1:
user_id: int
name: str
email: str

# Saved to disk as pickle / JSON / database

# Version 2 (deployed in March) - added fields
@dataclass
class UserProfileV2:
user_id: int
name: str
email: str
created_at: str # NEW FIELD - doesn't exist in V1 data!
role: str # NEW FIELD - doesn't exist in V1 data!

When you try to load V1 pickled data into V2 class, you get TypeError.

Strategy 1: JSON with Default Values

JSON is naturally forward-compatible because you use .get() with defaults:

import json
from datetime import datetime

def load_user_profile(json_str: str) -> dict:
"""Load user profile, handling missing fields from older formats."""
data = json.loads(json_str)

# Apply defaults for fields added in later versions
data.setdefault("created_at", "2020-01-01T00:00:00") # V2 addition
data.setdefault("role", "standard") # V2 addition
data.setdefault("preferences", {}) # V3 addition

return data

# V1 JSON (old data on disk)
v1_json = '{"user_id": 42, "name": "Alice", "email": "[email protected]"}'

# Loads fine - missing fields get defaults
profile = load_user_profile(v1_json)
print(profile)
# {'user_id': 42, 'name': 'Alice', 'email': '[email protected]',
# 'created_at': '2020-01-01T00:00:00', 'role': 'standard', 'preferences': {}}

Strategy 2: Versioned pickle with __reduce__

For pickle-based schema evolution, implement __reduce__ or __getstate__/__setstate__:

import pickle

class UserProfileVersioned:
"""A pickle-serializable class that handles schema evolution."""

CURRENT_VERSION = 2

def __init__(self, user_id, name, email, created_at=None, role="standard"):
self.user_id = user_id
self.name = name
self.email = email
self.created_at = created_at or "2020-01-01T00:00:00"
self.role = role
self._version = self.CURRENT_VERSION

def __getstate__(self):
"""What gets pickled - include version for future migration."""
return {
"_version": self._version,
"user_id": self.user_id,
"name": self.name,
"email": self.email,
"created_at": self.created_at,
"role": self.role,
}

def __setstate__(self, state):
"""Called when unpickling - migrate from old versions."""
version = state.get("_version", 1)

# Apply migrations forward
if version < 2:
# V1 data: add fields introduced in V2
state["created_at"] = "2020-01-01T00:00:00"
state["role"] = "standard"

# Set all attributes
self.__dict__.update(state)
self._version = self.CURRENT_VERSION # Upgrade to current version

# Simulate loading old V1 pickle data (before these fields existed)
v1_obj = UserProfileVersioned.__new__(UserProfileVersioned)
v1_obj.__dict__ = {"_version": 1, "user_id": 42, "name": "Alice", "email": "[email protected]"}
v1_pickle = pickle.dumps(v1_obj)

# Load with __setstate__ migration
restored = pickle.loads(v1_pickle)
print(restored.role) # standard (migrated from V1)
print(restored.created_at) # 2020-01-01T00:00:00 (migrated from V1)

Strategy 3: Add a Version Field to All Records

The most robust approach - always include a version number:

import json

def serialize_record(data: dict, version: int = 1) -> str:
"""Always include a version field in serialized records."""
return json.dumps({"_schema_version": version, **data}, separators=(',', ':'))

def deserialize_record(json_str: str) -> dict:
"""Deserialize and migrate records based on their schema version."""
data = json.loads(json_str)
version = data.pop("_schema_version", 1)

if version == 1:
# Migrate V1 to current
data.setdefault("created_at", "2020-01-01T00:00:00")
data.setdefault("role", "standard")

# Always set current version after migration
data["_migrated_from"] = version
return data

Part 9 - Real-World: ML Model Metadata Serialization

Here is a complete pattern for saving and loading ML experiment results, combining dataclasses, JSON, and version management:

import json
from dataclasses import dataclass, asdict, field
from datetime import datetime
from pathlib import Path
from typing import Optional

@dataclass
class TrainingRun:
"""Complete record of a single model training run."""

# Identifiers
run_id: str
experiment_name: str

# Hyperparameters
learning_rate: float
batch_size: int
max_epochs: int
optimizer: str = "adam"

# Results (filled in after training)
final_train_loss: Optional[float] = None
final_val_loss: Optional[float] = None
best_val_accuracy: Optional[float] = None
best_epoch: Optional[int] = None

# Metadata
started_at: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
finished_at: Optional[str] = None
duration_seconds: Optional[float] = None
notes: str = ""

# Schema version for forward compatibility
_schema_version: int = field(default=2, repr=False)

def complete(self, train_loss: float, val_loss: float, accuracy: float, epoch: int):
"""Mark run as complete with final metrics."""
now = datetime.utcnow()
started = datetime.fromisoformat(self.started_at.rstrip("Z"))
self.final_train_loss = round(train_loss, 6)
self.final_val_loss = round(val_loss, 6)
self.best_val_accuracy = round(accuracy, 4)
self.best_epoch = epoch
self.finished_at = now.isoformat() + "Z"
self.duration_seconds = (now - started).total_seconds()

def save(self, runs_dir: str | Path) -> Path:
"""Save run record to JSON file."""
runs_dir = Path(runs_dir)
runs_dir.mkdir(parents=True, exist_ok=True)

path = runs_dir / f"{self.run_id}.json"
data = asdict(self)

with path.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2)

return path

@classmethod
def load(cls, path: str | Path) -> "TrainingRun":
"""Load a training run from a JSON file."""
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)

# Remove internal fields before passing to constructor
data.pop("_schema_version", None)

return cls(**data)


# Usage
run = TrainingRun(
run_id="run-20240115-001",
experiment_name="sentiment_v3",
learning_rate=0.001,
batch_size=32,
max_epochs=100,
)

# Simulate training completing
run.complete(train_loss=0.042, val_loss=0.058, accuracy=0.924, epoch=73)

# Save to disk
path = run.save("/tmp/ml_runs")
print(f"Saved to: {path}")
print(f"Accuracy: {run.best_val_accuracy}")
print(f"Duration: {run.duration_seconds:.2f}s")

# Load back
loaded = TrainingRun.load(path)
print(loaded.experiment_name) # sentiment_v3
print(loaded.best_val_accuracy) # 0.924

Interview Questions

Q1: What is serialization and why is it needed?

Answer: Serialization is the process of converting an in-memory Python object into a format (bytes or text) that can be stored on disk, sent over a network, or shared with another process. When a program exits, all in-memory state is lost - serialization is how you persist it. Deserialization is the reverse: converting bytes or text back into live Python objects. You need serialization whenever data must outlive a single process execution or cross a process boundary.

Q2: Why is unpickling untrusted data dangerous?

Answer: pickle's __reduce__ protocol allows an object to specify arbitrary Python code that gets executed during deserialization. When you call pickle.load(), Python runs that code unconditionally, with no sandboxing. An attacker who can craft a pickle file can make Python execute any code: spawn a shell, read/write files, connect to remote servers, or install malware. Python's own documentation states this warning explicitly. The rule is absolute: only unpickle data you yourself pickled from trusted code.

Q3: What does asdict() do with nested dataclasses?

Answer: asdict() recursively converts nested dataclasses to dictionaries. If a dataclass has a field that is itself a dataclass, asdict() converts that inner dataclass to a dict as well, producing a fully nested plain-Python structure that can be serialized to JSON without any custom encoder. This makes asdict() the standard bridge between dataclasses and JSON.

Q4: What is the advantage of Pydantic over plain dataclasses for API models?

Answer: Pydantic provides validation at instantiation time - if you pass an invalid value (wrong type, out-of-range number, invalid email format), Pydantic raises a ValidationError with detailed field-level messages immediately, before the object is used. Plain dataclasses do not validate - you can assign any value to any field. For API request/response models, Pydantic also handles serialization natively via model_dump() and model_dump_json(), including handling of datetime, Decimal, and custom types. FastAPI is built on Pydantic and uses models for request validation and response serialization automatically.

Q5: When would you use struct instead of json or pickle?

Answer: Use struct when you need to read or write binary data whose byte-level layout is fixed by an external specification - network protocols (TCP/IP headers, custom binary protocols), binary file formats (PNG, WAV, ELF binaries), or C struct interop. struct gives you precise control over byte order (big-endian vs little-endian), data types, and packing. json and pickle are both higher-level Python-centric formats; struct works at the byte level and is the right tool when the format is defined outside Python.

Q6: What is schema evolution and what are the main strategies for handling it?

Answer: Schema evolution is the challenge of reading old serialized data after you have changed the data format. Strategies include:

  1. JSON with defaults: Use dict.setdefault() or .get(key, default) when deserializing - missing fields from older records get default values. JSON is naturally forward-compatible this way.

  2. Version numbers: Always include a _schema_version or _v field in serialized records. The deserializer checks this and applies migration logic for old versions.

  3. __getstate__/__setstate__: For pickle-based objects, implement these methods to control what gets pickled and apply migrations on deserialization.

  4. Protobuf field numbers: Protocol Buffers handle evolution naturally - adding new fields with new numbers is backward-compatible, and old data simply has those fields as default values when read by newer code.

The fundamental principle: newer code must be able to read older data. Plan for evolution from the start by including version fields and using optional fields with defaults.

Practice Challenges

Beginner: Compare Serialization Formats

Serialize the same data structure using json, pickle, and (if installed) msgpack. Print the size in bytes and verify you can restore the original data from each.

Solution
import json
import pickle
from datetime import datetime

data = {
"model": "GradientBoosting",
"params": {
"n_estimators": 500,
"learning_rate": 0.05,
"max_depth": 6,
},
"metrics": {
"accuracy": 0.934,
"f1_score": 0.921,
"roc_auc": 0.971,
},
"created_at": datetime(2024, 1, 15, 14, 30), # Not JSON-native!
"tags": ["production", "v2", "gbm"],
}

print("=== Serialization Format Comparison ===\n")

# ── JSON ─────────────────────────────────────────────────────────────────────
# datetime is not JSON-serializable; convert first
json_safe = {**data, "created_at": data["created_at"].isoformat()}
json_bytes = json.dumps(json_safe).encode("utf-8")

restored_json = json.loads(json_bytes.decode())
# Note: created_at comes back as a string, must convert manually
restored_json["created_at"] = datetime.fromisoformat(restored_json["created_at"])

print(f"JSON: {len(json_bytes):5} bytes")
assert restored_json["metrics"]["accuracy"] == data["metrics"]["accuracy"]
assert restored_json["created_at"] == data["created_at"]
print(" Round-trip: OK")

# ── pickle ────────────────────────────────────────────────────────────────────
pickle_bytes = pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL)
restored_pickle = pickle.loads(pickle_bytes)

print(f"pickle: {len(pickle_bytes):5} bytes")
assert restored_pickle["created_at"] == data["created_at"] # datetime preserved natively!
assert restored_pickle["params"] == data["params"]
print(" Round-trip: OK (datetime preserved as datetime)")

# ── msgpack ───────────────────────────────────────────────────────────────────
try:
import msgpack

# msgpack doesn't handle datetime natively; convert
msgpack_safe = {**data, "created_at": data["created_at"].isoformat()}
msgpack_bytes = msgpack.packb(msgpack_safe, use_bin_type=True)
restored_msgpack = msgpack.unpackb(msgpack_bytes, raw=False)

print(f"msgpack: {len(msgpack_bytes):5} bytes")
assert restored_msgpack["metrics"]["accuracy"] == data["metrics"]["accuracy"]
print(" Round-trip: OK")
except ImportError:
print("msgpack: not installed (pip install msgpack)")

print("\nSize summary:")
print(f" JSON baseline")
print(f" pickle {len(pickle_bytes)/len(json_bytes)*100:.0f}% of JSON size")
# Typical output: pickle is ~40-60% smaller for simple dicts with protocol 5

Intermediate: Versioned Serializer

Build a VersionedSerializer that:

  • Saves objects to JSON with a _schema_version field
  • Loads objects and applies migrations when the version is old
  • Demonstrates loading V1 data correctly into a V2 schema
Solution
import json
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any

CURRENT_VERSION = 3

def migrate_v1_to_v2(data: dict) -> dict:
"""V1 → V2: Added 'role' field and renamed 'score' to 'reputation_score'."""
data["role"] = "member" # Default for old users
if "score" in data:
data["reputation_score"] = data.pop("score")
return data

def migrate_v2_to_v3(data: dict) -> dict:
"""V2 → V3: Added 'preferences' dict and 'is_active' boolean."""
data["preferences"] = {"theme": "light", "notifications": True}
data["is_active"] = True
return data

MIGRATIONS = {
1: migrate_v1_to_v2,
2: migrate_v2_to_v3,
}

def apply_migrations(data: dict, from_version: int) -> dict:
"""Apply all migrations from from_version to CURRENT_VERSION."""
version = from_version
while version < CURRENT_VERSION:
if version in MIGRATIONS:
data = MIGRATIONS[version](data)
version += 1
return data

def save(path: str | Path, data: dict) -> None:
"""Save data dict with schema version."""
record = {"_schema_version": CURRENT_VERSION, **data}
with open(path, "w", encoding="utf-8") as f:
json.dump(record, f, indent=2)

def load(path: str | Path) -> dict:
"""Load and migrate data to current schema version."""
with open(path, "r", encoding="utf-8") as f:
raw = json.load(f)

version = raw.pop("_schema_version", 1)

if version < CURRENT_VERSION:
print(f" Migrating from V{version} to V{CURRENT_VERSION}...")
raw = apply_migrations(raw, version)

return raw


# Demo
print("=== Versioned Serializer Demo ===\n")

# Create a V1 file (simulate old data on disk)
v1_data = {"user_id": 42, "name": "Alice", "email": "[email protected]", "score": 850}
v1_path = "/tmp/user_v1.json"
with open(v1_path, "w") as f:
json.dump({"_schema_version": 1, **v1_data}, f, indent=2)

print(f"V1 file contents:")
print(open(v1_path).read())

# Load V1 data - migrations apply automatically
print("\nLoading V1 data (auto-migrating to V3):")
migrated = load(v1_path)
print(json.dumps(migrated, indent=2))
# V1 has: user_id, name, email, score
# After V1→V2: role="member", score renamed to reputation_score
# After V2→V3: preferences dict added, is_active=True

# Verify migrations applied correctly
assert migrated["role"] == "member"
assert "reputation_score" in migrated
assert "score" not in migrated
assert migrated["is_active"] is True
assert "preferences" in migrated

# Save in current format
save("/tmp/user_v3.json", migrated)
print("\nLoading V3 file (no migration needed):")
v3 = load("/tmp/user_v3.json")
print(f" role: {v3['role']}, is_active: {v3['is_active']}")
print("All assertions passed.")

Advanced: ML Experiment Tracker with Pickle + JSON Hybrid

Build an experiment tracker that stores:

  • Model objects using pickle (since scikit-learn models are not JSON-serializable)
  • Metadata and metrics using JSON (for human readability and cross-tool access)

The tracker should support: saving, loading, searching by metric, and listing all experiments.

Solution
import json
import pickle
import uuid
from dataclasses import dataclass, asdict, field
from datetime import datetime
from pathlib import Path
from typing import Optional, Iterator

@dataclass
class ExperimentRecord:
"""JSON-serializable metadata for one experiment."""
run_id: str
model_class: str
hyperparams: dict
metrics: dict = field(default_factory=dict)
started_at: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
finished_at: Optional[str] = None
status: str = "running" # running | completed | failed
notes: str = ""

class ExperimentTracker:
"""
Hybrid tracker: JSON for metadata (human-readable),
pickle for model objects (Python-native).
"""

def __init__(self, base_dir: str | Path):
self.base_dir = Path(base_dir)
self.meta_dir = self.base_dir / "metadata"
self.models_dir = self.base_dir / "models"
self.meta_dir.mkdir(parents=True, exist_ok=True)
self.models_dir.mkdir(parents=True, exist_ok=True)

def start_run(self, model_class: str, hyperparams: dict, notes: str = "") -> str:
"""Register a new experiment run. Returns run_id."""
run_id = f"run-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:6]}"

record = ExperimentRecord(
run_id=run_id,
model_class=model_class,
hyperparams=hyperparams,
notes=notes,
)
self._save_metadata(record)
return run_id

def log_metrics(self, run_id: str, metrics: dict) -> None:
"""Update metrics for a running experiment."""
record = self._load_metadata(run_id)
record.metrics.update({k: round(float(v), 6) for k, v in metrics.items()})
self._save_metadata(record)

def save_model(self, run_id: str, model) -> None:
"""Save trained model as pickle. Mark run as completed."""
model_path = self.models_dir / f"{run_id}.pkl"

with model_path.open("wb") as f:
pickle.dump(model, f, protocol=pickle.HIGHEST_PROTOCOL)

record = self._load_metadata(run_id)
record.status = "completed"
record.finished_at = datetime.utcnow().isoformat() + "Z"
self._save_metadata(record)

print(f"Model saved: {model_path} ({model_path.stat().st_size} bytes)")

def load_model(self, run_id: str):
"""Load a trained model from pickle. ONLY use with your own runs."""
model_path = self.models_dir / f"{run_id}.pkl"

if not model_path.exists():
raise FileNotFoundError(f"No model found for run {run_id}")

with model_path.open("rb") as f:
return pickle.load(f) # Safe: we wrote this file ourselves

def find_best_run(self, metric: str, higher_is_better: bool = True) -> Optional[ExperimentRecord]:
"""Find the run with the best value for a given metric."""
best = None
best_value = None

for record in self._all_records():
if metric not in record.metrics:
continue
value = record.metrics[metric]
if best_value is None:
best, best_value = record, value
elif higher_is_better and value > best_value:
best, best_value = record, value
elif not higher_is_better and value < best_value:
best, best_value = record, value

return best

def list_runs(self, status: Optional[str] = None) -> list[ExperimentRecord]:
"""List all experiment records, optionally filtered by status."""
records = list(self._all_records())
if status:
records = [r for r in records if r.status == status]
return sorted(records, key=lambda r: r.started_at, reverse=True)

def _save_metadata(self, record: ExperimentRecord) -> None:
path = self.meta_dir / f"{record.run_id}.json"
with path.open("w", encoding="utf-8") as f:
json.dump(asdict(record), f, indent=2)

def _load_metadata(self, run_id: str) -> ExperimentRecord:
path = self.meta_dir / f"{run_id}.json"
if not path.exists():
raise KeyError(f"No run found with id: {run_id}")
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
return ExperimentRecord(**data)

def _all_records(self) -> Iterator[ExperimentRecord]:
for path in self.meta_dir.glob("*.json"):
try:
with path.open("r", encoding="utf-8") as f:
yield ExperimentRecord(**json.load(f))
except (json.JSONDecodeError, TypeError):
continue


# Demo (without actual ML libraries - using mock models)
import random

class MockModel:
"""Simulates a trained ML model."""
def __init__(self, n_estimators, learning_rate):
self.n_estimators = n_estimators
self.learning_rate = learning_rate
self.weights = [random.random() for _ in range(100)]

def predict(self, x):
return sum(self.weights[:len(x)]) > 0.5


tracker = ExperimentTracker("/tmp/experiments")

# Run 1
run_id_1 = tracker.start_run("GradientBoosting", {"n_estimators": 100, "lr": 0.1})
model_1 = MockModel(100, 0.1)
tracker.log_metrics(run_id_1, {"accuracy": 0.921, "f1": 0.918, "val_loss": 0.089})
tracker.save_model(run_id_1, model_1)

# Run 2
run_id_2 = tracker.start_run("GradientBoosting", {"n_estimators": 500, "lr": 0.05})
model_2 = MockModel(500, 0.05)
tracker.log_metrics(run_id_2, {"accuracy": 0.934, "f1": 0.931, "val_loss": 0.062})
tracker.save_model(run_id_2, model_2)

# Find best
best = tracker.find_best_run("accuracy")
print(f"\nBest run: {best.run_id}")
print(f"Accuracy: {best.metrics['accuracy']}")
print(f"Params: {best.hyperparams}")

# Load best model
best_model = tracker.load_model(best.run_id)
print(f"Model type: {type(best_model).__name__}")
print(f"Weights count: {len(best_model.weights)}")

# List all runs
print("\nAll completed runs:")
for run in tracker.list_runs(status="completed"):
print(f" {run.run_id} accuracy={run.metrics.get('accuracy', 'N/A')}")

Quick Reference

FormatSerializeDeserializeHandles datetimeCross-langSafe from untrusted
jsonjson.dumps(d)json.loads(s)No (convert first)YesYes
picklepickle.dumps(o)pickle.loads(b)Yes (native)NoNO
msgpackmsgpack.packb(d)msgpack.unpackb(b)NoYesYes
structstruct.pack(fmt, *v)struct.unpack(fmt, b)NoYesYes
shelvedb[key] = objobj = db[key]YesNoNO
Dataclass toolPurpose
@dataclassAuto-generate __init__, __repr__, __eq__
asdict(obj)Recursively convert to nested dict
astuple(obj)Convert to tuple (for CSV rows, etc.)
fields(cls)Get field metadata (name, type, default)
field(default_factory=list)Mutable default for list/dict fields
struct format charPython typeBytes
b / Bint (signed / unsigned)1
h / Hint (signed / unsigned)2
i / Iint (signed / unsigned)4
q / Qint (signed / unsigned)8
ffloat (32-bit)4
dfloat (64-bit)8
4sbytes (4-char)4
> prefixbig-endian-
< prefixlittle-endian-

Key Takeaways

  • Serialization converts in-memory objects to bytes/text for storage, transmission, or inter-process sharing; deserialization reverses the process
  • pickle can serialize almost any Python object natively, but is a critical security risk - never unpickle data from untrusted sources; it enables arbitrary code execution
  • json is the default for cross-language data exchange; use pickle only within trusted, Python-only pipelines (ML model saving, caching)
  • dataclasses provide a clean, structured way to define serializable data objects; asdict() bridges them to JSON by recursively converting nested dataclasses to dicts
  • Pydantic is the production standard for API models - it combines type validation at instantiation time with native serialization, handling datetime, Decimal, and custom types
  • struct operates at the byte level for binary protocols and file formats defined by external specifications; not a general-purpose serializer
  • Always include a version number in serialized records - schema evolution is inevitable in production systems, and version fields enable graceful migration of old data
© 2026 EngineersOfAI. All rights reserved.