Serialization Concepts - pickle, dataclasses, and Format Tradeoffs
Reading time: ~20 minutes | Level: Foundation → Engineering
Here is a vulnerability that has appeared in real production systems - including ML platforms:
import pickle
# This looks harmless. It is not.
with open("model_weights.pkl", "rb") as f:
model = pickle.load(f) # Loading from an untrusted source
If model_weights.pkl was crafted by an attacker, this line executes arbitrary code on your machine. Not just reads data - executes code. A pickle file can contain arbitrary Python instructions, and pickle.load() runs them without question.
This is not a theoretical risk. It is a documented attack vector against ML pipelines that download "pretrained models" from the internet.
Serialization is the process of converting in-memory objects to bytes and back. Choosing the wrong format - or using the right format wrong - has consequences from data corruption to remote code execution.
What You Will Learn
- What serialization is: converting objects to bytes and restoring them
pickle: what it can serialize, protocol versions, and its critical security risks- Safe uses of pickle and how ML practitioners use it correctly
- Format comparison:
picklevsjsonvsmsgpackvsprotobuf dataclasses:@dataclass,asdict(),astuple(),fields()for clean serializable objects- Converting dataclasses to/from JSON for API use
- Pydantic models: validation and serialization together - the production API standard
struct: binary packing/unpacking for network protocols and file formatsshelve: persistent dict backed by pickle- Schema evolution: handling backward compatibility when your data format changes
Prerequisites
- Understanding of Python classes and instances
- Familiarity with
json.dumps()andjson.loads()(lesson 08) - Understanding of context managers (lesson 03)
- Basic familiarity with file I/O (lessons 01 and 02)
Mental Model: Serialization as the Object Lifecycle
Serialization formats differ in:
- Which types they can represent
- Speed (encode + decode)
- Size (bytes on wire or disk)
- Cross-language compatibility
- Security guarantees
- Schema support (validation, evolution)
Part 1 - pickle: The Python-Native Serializer
What pickle Can Serialize
pickle can serialize almost any Python object:
import pickle
import datetime
from collections import defaultdict
# All of these work with pickle - many fail with json
objects = [
{"key": "value"}, # dict
[1, 2, 3], # list
(4, 5, 6), # tuple (preserved as tuple!)
{1, 2, 3}, # set
datetime.datetime.now(), # datetime (no custom encoder needed)
defaultdict(list, {"a": [1, 2]}), # defaultdict
lambda x: x * 2, # lambda (!)
range(1_000_000), # range object (not expanded)
]
for obj in objects:
data = pickle.dumps(obj)
restored = pickle.loads(data)
print(f"{type(obj).__name__:15} -> {len(data):5} bytes -> {restored!r:.40}")
The Core Four Functions
import pickle
# In-memory: bytes ↔ object
data = {"model": "RandomForest", "n_estimators": 100, "accuracy": 0.94}
pickled = pickle.dumps(data) # object → bytes
print(type(pickled)) # <class 'bytes'>
print(len(pickled)) # ~60 bytes
restored = pickle.loads(pickled) # bytes → object
print(restored) # {'model': 'RandomForest', ...}
# File-based: file ↔ object
with open("model_meta.pkl", "wb") as f: # "wb" - binary write!
pickle.dump(data, f)
with open("model_meta.pkl", "rb") as f: # "rb" - binary read!
restored_from_file = pickle.load(f)
print(restored_from_file["accuracy"]) # 0.94
:::warning Always use binary mode with pickle
pickle files are binary. Always open them with "wb" for writing and "rb" for reading. Using text mode ("w" or "r") raises a TypeError.
:::
Protocol Versions
pickle has multiple protocol versions that trade compatibility for efficiency:
import pickle
data = {"key": "value", "numbers": list(range(1000))}
for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
pickled = pickle.dumps(data, protocol=protocol)
print(f"Protocol {protocol}: {len(pickled):5} bytes")
# Protocol 0: 10234 bytes (ASCII, human-readable, slowest)
# Protocol 1: 7891 bytes (binary, Python 2 compatible)
# Protocol 2: 7891 bytes (new-style classes, Python 2 compatible)
# Protocol 3: 7876 bytes (bytes support, Python 3.0+)
# Protocol 4: 7871 bytes (large objects, Python 3.4+)
# Protocol 5: 7871 bytes (out-of-band buffers, Python 3.8+)
| Protocol | Use when |
|---|---|
pickle.HIGHEST_PROTOCOL | Always use for new data - smallest output, fastest |
pickle.DEFAULT_PROTOCOL | Current default (protocol 5) |
protocol=2 | Maximum Python 2 compatibility |
protocol=0 | Human-readable ASCII (debugging) |
Always use pickle.HIGHEST_PROTOCOL for new systems - it produces the smallest output and is fastest:
import pickle
# Always use this for new code
with open("data.pkl", "wb") as f:
pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)
Part 2 - pickle Security: The Critical Warning
:::danger Never unpickle untrusted data
pickle.load() on a malicious file is equivalent to running arbitrary Python code. An attacker can craft a pickle file that, when loaded, executes any code they choose - spawning a shell, exfiltrating files, installing malware. This is not theoretical. It is documented in Python's own documentation: "Warning: The pickle module is not secure. Only unpickle data you trust."
:::
How the Attack Works
# This is what a malicious pickle looks like conceptually
# DO NOT use this code - it is shown purely for understanding
import pickle
import os
class MaliciousPayload:
def __reduce__(self):
# __reduce__ is called during pickling
# The return value is a callable + args to restore the object
# An attacker returns a shell command instead
return (os.system, ("echo 'Attacker was here' > /tmp/pwned",))
# When pickled and sent to a victim:
payload = pickle.dumps(MaliciousPayload())
# When the victim calls pickle.loads(payload):
# → os.system("echo 'Attacker was here' > /tmp/pwned") executes
# Replace with any command: rm -rf, curl | bash, etc.
Safe Uses of pickle
# SAFE: pickling your own objects within your own system
import pickle
from sklearn.ensemble import RandomForestClassifier # type: ignore
import numpy as np
# Train a model
X = np.random.rand(100, 4)
y = np.random.randint(0, 2, 100)
model = RandomForestClassifier(n_estimators=10)
model.fit(X, y)
# Save for later use within the SAME trusted system
with open("model.pkl", "wb") as f:
pickle.dump(model, f, protocol=pickle.HIGHEST_PROTOCOL)
# Load from your own file - safe because YOU wrote it
with open("model.pkl", "rb") as f:
loaded_model = pickle.load(f)
predictions = loaded_model.predict(X[:5])
print(predictions)
# NEVER SAFE:
# - Downloading a .pkl from the internet and loading it
# - Accepting pickle data from an HTTP request
# - Loading .pkl files from untrusted users
:::tip ML model saving alternatives For ML models, consider safer alternatives:
- scikit-learn models:
joblib.dump()/joblib.load()(same pickle risk but slightly safer interface) - PyTorch models:
torch.save(model.state_dict(), path)+ load separately - TensorFlow/Keras:
model.save()in SavedModel format (not pickle) - ONNX: language-neutral binary format, safe to load from external sources :::
Part 3 - Format Comparison: Choosing the Right Serializer
| Format | Speed | Size | Cross-lang | Safety | Schema | Human-readable |
|---|---|---|---|---|---|---|
pickle | Fast | Medium | Python only | UNSAFE* | None | No (binary) |
json | Medium | Large | Universal | Safe | None | Yes |
msgpack | Fast | Small | Universal | Safe | None | No (binary) |
protobuf | Fast | Tiny | Universal | Safe | Yes | No (binary) |
| CSV | Slow | Large | Universal | Safe | None | Yes |
* Unsafe = arbitrary code execution if data is from an untrusted source.
pickle - When to Use
- Saving scikit-learn/sklearn models within a trusted ML pipeline
- Python-to-Python inter-process communication via
multiprocessing.Queue - Caching expensive computations (e.g.,
joblib.Memory) shelvemodule (covered below)
json - When to Use
- REST APIs and any cross-language data exchange
- Config files that humans may edit
- Structured logging (JSONL)
- Any data that must survive a Python version upgrade
msgpack - When to Use
# pip install msgpack
import msgpack
data = {"event": "click", "x": 100, "y": 200, "values": list(range(1000))}
# msgpack: binary, fast, small, cross-language
packed = msgpack.packb(data)
unpacked = msgpack.unpackb(packed, raw=False)
import json
json_bytes = json.dumps(data).encode()
print(f"msgpack: {len(packed):6} bytes")
print(f"json: {len(json_bytes):6} bytes")
# msgpack: 4019 bytes
# json: 6891 bytes (≈70% larger)
msgpack is ideal for message queues (Kafka, RabbitMQ), high-frequency telemetry, and anywhere JSON is too verbose.
protobuf - When to Use
Protocol Buffers require defining a schema (.proto file) upfront. They produce the smallest output and are fast, but require code generation:
# Example proto definition:
# message User {
# int32 id = 1;
# string name = 2;
# repeated float scores = 3;
# }
# After code generation:
from user_pb2 import User # type: ignore
user = User(id=42, name="Alice", scores=[95.0, 87.0])
serialized = user.SerializeToString() # Very compact bytes
restored = User.FromString(serialized)
print(restored.name) # Alice
Use protobuf for: gRPC services, internal microservice communication where schema enforcement matters, and data that must be tiny (IoT, mobile).
Part 4 - dataclasses: Clean Serializable Objects
The dataclasses module (Python 3.7+) provides a structured way to define data-holding classes with automatic __init__, __repr__, and __eq__:
from dataclasses import dataclass, field, asdict, astuple, fields
from datetime import datetime
from typing import Optional
@dataclass
class ModelMetadata:
model_id: str
name: str
version: int
accuracy: float
created_at: datetime = field(default_factory=datetime.utcnow)
tags: list[str] = field(default_factory=list)
parent_id: Optional[str] = None
# Instantiation - __init__ is auto-generated
meta = ModelMetadata(
model_id="m-001",
name="SentimentClassifier",
version=3,
accuracy=0.924,
tags=["nlp", "production"],
)
print(meta)
# ModelMetadata(model_id='m-001', name='SentimentClassifier', version=3,
# accuracy=0.924, created_at=datetime.datetime(...), tags=['nlp', 'production'],
# parent_id=None)
# __eq__ is auto-generated
meta2 = ModelMetadata(model_id="m-001", name="SentimentClassifier", version=3, accuracy=0.924)
# meta == meta2 # True if all fields match (ignoring created_at since it's time-based)
asdict() - Convert to Dictionary
from dataclasses import asdict
import json
meta_dict = asdict(meta)
print(meta_dict)
# {'model_id': 'm-001', 'name': 'SentimentClassifier', 'version': 3,
# 'accuracy': 0.924, 'created_at': datetime.datetime(...), 'tags': ['nlp', 'production'],
# 'parent_id': None}
# Note: asdict() recursively converts nested dataclasses too!
@dataclass
class TrainingConfig:
learning_rate: float
batch_size: int
@dataclass
class ExperimentResult:
config: TrainingConfig # Nested dataclass
final_accuracy: float
result = ExperimentResult(
config=TrainingConfig(learning_rate=0.001, batch_size=32),
final_accuracy=0.94,
)
print(asdict(result))
# {'config': {'learning_rate': 0.001, 'batch_size': 32}, 'final_accuracy': 0.94}
# Nested dataclass becomes a nested dict - perfect for JSON
astuple() - Convert to Tuple
from dataclasses import astuple
@dataclass
class Point3D:
x: float
y: float
z: float
p = Point3D(1.0, 2.5, -0.3)
coords = astuple(p)
print(coords) # (1.0, 2.5, -0.3)
print(type(coords)) # <class 'tuple'>
# Useful for writing rows to CSV
import csv
with open("points.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["x", "y", "z"])
writer.writerow(astuple(p))
fields() - Introspect the Schema
from dataclasses import fields
for f in fields(ModelMetadata):
print(f"{f.name:15} type={f.type.__name__ if hasattr(f.type, '__name__') else f.type}")
# model_id type=str
# name type=str
# version type=int
# accuracy type=float
# created_at type=datetime
# tags type=list[str]
# parent_id type=Optional[str]
Dataclass + JSON Serialization
import json
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class Event:
event_id: str
event_type: str
occurred_at: datetime
payload: dict
def to_json(self) -> str:
d = asdict(self)
d["occurred_at"] = self.occurred_at.isoformat() # datetime not JSON-native
return json.dumps(d)
@classmethod
def from_json(cls, json_str: str) -> "Event":
d = json.loads(json_str)
d["occurred_at"] = datetime.fromisoformat(d["occurred_at"])
return cls(**d)
# Usage
event = Event(
event_id="evt-123",
event_type="model_deployed",
occurred_at=datetime(2024, 1, 15, 14, 30),
payload={"model_id": "m-001", "environment": "production"},
)
json_str = event.to_json()
print(json_str)
# {"event_id": "evt-123", "event_type": "model_deployed", "occurred_at": "2024-01-15T14:30:00",
# "payload": {"model_id": "m-001", "environment": "production"}}
restored = Event.from_json(json_str)
print(restored.occurred_at) # 2024-01-15 14:30:00
print(type(restored.occurred_at)) # <class 'datetime.datetime'>
Part 5 - Pydantic: Validation + Serialization Together
Pydantic is the production standard for API request/response models. It combines type validation with serialization in a single class:
# pip install pydantic
from pydantic import BaseModel, Field, field_validator
from datetime import datetime
from decimal import Decimal
from typing import Optional
import json
class UserCreate(BaseModel):
"""Request model for creating a new user. Validated on instantiation."""
username: str = Field(min_length=3, max_length=50, pattern=r"^[a-zA-Z0-9_]+$")
email: str = Field(pattern=r"^[^@]+@[^@]+\.[^@]+$")
age: int = Field(ge=13, le=120) # ge=greater-than-or-equal, le=less-than-or-equal
balance: Decimal = Field(default=Decimal("0.00"), ge=0)
@field_validator("username")
@classmethod
def username_not_reserved(cls, v):
reserved = {"admin", "root", "system"}
if v.lower() in reserved:
raise ValueError(f"Username '{v}' is reserved")
return v.lower() # Normalize to lowercase
class UserResponse(BaseModel):
"""Response model - what the API returns."""
user_id: int
username: str
email: str
created_at: datetime
balance: Decimal
model_config = {"json_encoders": {Decimal: str}}
# Validation on creation
try:
print(user)
except Exception as e:
print(e)
# Invalid data raises ValidationError with detailed messages
try:
bad = UserCreate(username="ab", email="not-an-email", age=200)
except Exception as e:
print(e)
# 3 validation errors for UserCreate
# username: String should have at least 3 characters
# email: String should match pattern...
# age: Input should be less than or equal to 120
Pydantic Serialization
from pydantic import BaseModel
from datetime import datetime
from decimal import Decimal
class OrderModel(BaseModel):
order_id: str
amount: Decimal
created_at: datetime
items: list[str]
order = OrderModel(
order_id="ord-456",
amount=Decimal("149.99"),
created_at=datetime(2024, 1, 15, 14, 30),
items=["Widget A", "Widget B"],
)
# Serialize to dict
d = order.model_dump()
print(d)
# {'order_id': 'ord-456', 'amount': Decimal('149.99'),
# 'created_at': datetime.datetime(2024, 1, 15, 14, 30), 'items': ['Widget A', 'Widget B']}
# Serialize to JSON string (handles datetime and Decimal natively)
json_str = order.model_dump_json()
print(json_str)
# {"order_id":"ord-456","amount":"149.99","created_at":"2024-01-15T14:30:00","items":["Widget A","Widget B"]}
# Deserialize from dict
order2 = OrderModel.model_validate(d)
# Deserialize from JSON string
order3 = OrderModel.model_validate_json(json_str)
print(order3.amount) # 149.99
print(type(order3.amount)) # <class 'decimal.Decimal'>
print(type(order3.created_at)) # <class 'datetime.datetime'>
Use dataclasses | Use Pydantic |
|---|---|
| Simple data containers, no external input | API request/response models (FastAPI, Flask) |
| Internal application objects (ML features, results) | Config files loaded from YAML/JSON/env vars |
| Zero runtime dependencies required | Data from external sources (user input, APIs) |
| Python 3.7+ stdlib only | Field-level validation (min/max, regex, custom rules) |
Part 6 - struct: Binary Packing for Protocols and File Formats
The struct module packs Python values into binary byte sequences using C-style format strings. This is essential for reading binary file formats, implementing network protocols, and interfacing with C libraries:
import struct
# Format string characters:
# > = big-endian byte order (network byte order)
# < = little-endian (x86 native)
# i = signed 32-bit integer (4 bytes)
# I = unsigned 32-bit integer (4 bytes)
# f = 32-bit float (4 bytes)
# d = 64-bit double (8 bytes)
# h = signed 16-bit short (2 bytes)
# B = unsigned 8-bit byte (1 byte)
# s = bytes (use "4s" for 4-byte string)
# Pack: Python values → bytes
packet = struct.pack(">IhfB", 1234, -7, 3.14, 255)
print(packet) # b'\x00\x00\x04\xd2\xff\xf9@H\xf5\xc3\xff'
print(len(packet)) # 11 bytes (4 + 2 + 4 + 1)
# Unpack: bytes → Python values
values = struct.unpack(">IhfB", packet)
print(values) # (1234, -7, 3.140000104904175, 255)
Real-World: Reading a Binary File Header
Many binary file formats have fixed-size headers. struct lets you read them precisely:
import struct
from pathlib import Path
# Simulate a binary sensor log file format:
# Header: magic (4 bytes) + version (2 bytes) + record_count (4 bytes)
# Records: timestamp (8 bytes double) + sensor_id (4 bytes) + value (4 bytes float)
HEADER_FORMAT = ">4sHI" # big-endian: 4-char string, unsigned short, unsigned int
HEADER_SIZE = struct.calcsize(HEADER_FORMAT) # 10 bytes
RECORD_FORMAT = ">dIf" # big-endian: double, unsigned int, float
RECORD_SIZE = struct.calcsize(RECORD_FORMAT) # 16 bytes
def write_sensor_log(path: str, records: list[tuple]) -> None:
"""Write sensor data to a compact binary file."""
with open(path, "wb") as f:
# Write header
header = struct.pack(HEADER_FORMAT, b"SENS", 1, len(records))
f.write(header)
# Write records
for timestamp, sensor_id, value in records:
f.write(struct.pack(RECORD_FORMAT, timestamp, sensor_id, value))
def read_sensor_log(path: str) -> list[tuple]:
"""Read binary sensor log, returning list of (timestamp, sensor_id, value)."""
records = []
with open(path, "rb") as f:
# Read and validate header
header_data = f.read(HEADER_SIZE)
magic, version, record_count = struct.unpack(HEADER_FORMAT, header_data)
if magic != b"SENS":
raise ValueError(f"Invalid file format: expected b'SENS', got {magic!r}")
print(f"Format version: {version}, Record count: {record_count}")
# Read records
for _ in range(record_count):
record_data = f.read(RECORD_SIZE)
if len(record_data) < RECORD_SIZE:
break
records.append(struct.unpack(RECORD_FORMAT, record_data))
return records
import time
sample_data = [
(time.time(), 1001, 23.5),
(time.time() + 1, 1002, 24.1),
(time.time() + 2, 1001, 23.8),
]
write_sensor_log("/tmp/sensors.bin", sample_data)
file_size = Path("/tmp/sensors.bin").stat().st_size
print(f"Binary file size: {file_size} bytes")
# Binary file size: 58 bytes (10 header + 3 * 16 records)
restored = read_sensor_log("/tmp/sensors.bin")
# Format version: 1, Record count: 3
print(restored[0][2]) # 23.5
:::tip struct vs alternatives
struct is for raw binary protocols where byte layout is fixed by external specification (network protocols, file formats like PNG/BMP/WAV, C library interfaces). For Python-to-Python binary data, prefer pickle (within trusted systems) or msgpack (cross-language). For human-readable config, use JSON.
:::
Part 7 - shelve: Persistent Dictionary
shelve provides a dictionary-like interface backed by pickle. It persists to disk between program runs:
import shelve
# shelve creates multiple files (*.db, *.bak, *.dir) behind the scenes
with shelve.open("/tmp/app_cache") as db:
# Write - works exactly like a dict
db["user:42"] = {"name": "Alice", "score": 95, "tier": "premium"}
db["user:43"] = {"name": "Bob", "score": 72, "tier": "standard"}
db["config"] = {"max_users": 1000, "feature_flags": ["new_ui", "beta_api"]}
# Data persists between program runs
with shelve.open("/tmp/app_cache") as db:
user = db["user:42"]
print(user)
# {'name': 'Alice', 'score': 95, 'tier': 'premium'}
# Can store any picklable object - not just JSON-compatible types
from datetime import datetime
db["last_run"] = datetime.now()
print(list(db.keys()))
# ['user:42', 'user:43', 'config', 'last_run']
:::warning shelve inherits all pickle security risks
Since shelve uses pickle under the hood, the same security warning applies: only open shelve databases you created yourself. A maliciously crafted shelve database can execute arbitrary code when opened.
:::
When to Use shelve
Good use cases for shelve:
• Simple caching between program runs (e.g., rate limiter state)
• Small key-value stores that don't justify a full database
• Development-time persistence (quickly saving intermediate results)
• CLI tools that need to remember settings between invocations
Not appropriate for:
• Multi-process or multi-threaded access (no locking)
• Cross-language data sharing
• Large datasets (no indexing, poor performance at scale)
• Any data from external untrusted sources
Part 8 - Schema Evolution: Handling Data Format Changes
Real applications evolve. When you change a data class or serialization format, you must handle old data that was serialized with the previous format.
The Problem
# Version 1 of your model (deployed in January)
@dataclass
class UserProfileV1:
user_id: int
name: str
email: str
# Saved to disk as pickle / JSON / database
# Version 2 (deployed in March) - added fields
@dataclass
class UserProfileV2:
user_id: int
name: str
email: str
created_at: str # NEW FIELD - doesn't exist in V1 data!
role: str # NEW FIELD - doesn't exist in V1 data!
When you try to load V1 pickled data into V2 class, you get TypeError.
Strategy 1: JSON with Default Values
JSON is naturally forward-compatible because you use .get() with defaults:
import json
from datetime import datetime
def load_user_profile(json_str: str) -> dict:
"""Load user profile, handling missing fields from older formats."""
data = json.loads(json_str)
# Apply defaults for fields added in later versions
data.setdefault("created_at", "2020-01-01T00:00:00") # V2 addition
data.setdefault("role", "standard") # V2 addition
data.setdefault("preferences", {}) # V3 addition
return data
# V1 JSON (old data on disk)
# Loads fine - missing fields get defaults
profile = load_user_profile(v1_json)
print(profile)
# {'user_id': 42, 'name': 'Alice', 'email': '[email protected]',
# 'created_at': '2020-01-01T00:00:00', 'role': 'standard', 'preferences': {}}
Strategy 2: Versioned pickle with __reduce__
For pickle-based schema evolution, implement __reduce__ or __getstate__/__setstate__:
import pickle
class UserProfileVersioned:
"""A pickle-serializable class that handles schema evolution."""
CURRENT_VERSION = 2
def __init__(self, user_id, name, email, created_at=None, role="standard"):
self.user_id = user_id
self.name = name
self.email = email
self.created_at = created_at or "2020-01-01T00:00:00"
self.role = role
self._version = self.CURRENT_VERSION
def __getstate__(self):
"""What gets pickled - include version for future migration."""
return {
"_version": self._version,
"user_id": self.user_id,
"name": self.name,
"email": self.email,
"created_at": self.created_at,
"role": self.role,
}
def __setstate__(self, state):
"""Called when unpickling - migrate from old versions."""
version = state.get("_version", 1)
# Apply migrations forward
if version < 2:
# V1 data: add fields introduced in V2
state["created_at"] = "2020-01-01T00:00:00"
state["role"] = "standard"
# Set all attributes
self.__dict__.update(state)
self._version = self.CURRENT_VERSION # Upgrade to current version
# Simulate loading old V1 pickle data (before these fields existed)
v1_obj = UserProfileVersioned.__new__(UserProfileVersioned)
v1_pickle = pickle.dumps(v1_obj)
# Load with __setstate__ migration
restored = pickle.loads(v1_pickle)
print(restored.role) # standard (migrated from V1)
print(restored.created_at) # 2020-01-01T00:00:00 (migrated from V1)
Strategy 3: Add a Version Field to All Records
The most robust approach - always include a version number:
import json
def serialize_record(data: dict, version: int = 1) -> str:
"""Always include a version field in serialized records."""
return json.dumps({"_schema_version": version, **data}, separators=(',', ':'))
def deserialize_record(json_str: str) -> dict:
"""Deserialize and migrate records based on their schema version."""
data = json.loads(json_str)
version = data.pop("_schema_version", 1)
if version == 1:
# Migrate V1 to current
data.setdefault("created_at", "2020-01-01T00:00:00")
data.setdefault("role", "standard")
# Always set current version after migration
data["_migrated_from"] = version
return data
Part 9 - Real-World: ML Model Metadata Serialization
Here is a complete pattern for saving and loading ML experiment results, combining dataclasses, JSON, and version management:
import json
from dataclasses import dataclass, asdict, field
from datetime import datetime
from pathlib import Path
from typing import Optional
@dataclass
class TrainingRun:
"""Complete record of a single model training run."""
# Identifiers
run_id: str
experiment_name: str
# Hyperparameters
learning_rate: float
batch_size: int
max_epochs: int
optimizer: str = "adam"
# Results (filled in after training)
final_train_loss: Optional[float] = None
final_val_loss: Optional[float] = None
best_val_accuracy: Optional[float] = None
best_epoch: Optional[int] = None
# Metadata
started_at: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
finished_at: Optional[str] = None
duration_seconds: Optional[float] = None
notes: str = ""
# Schema version for forward compatibility
_schema_version: int = field(default=2, repr=False)
def complete(self, train_loss: float, val_loss: float, accuracy: float, epoch: int):
"""Mark run as complete with final metrics."""
now = datetime.utcnow()
started = datetime.fromisoformat(self.started_at.rstrip("Z"))
self.final_train_loss = round(train_loss, 6)
self.final_val_loss = round(val_loss, 6)
self.best_val_accuracy = round(accuracy, 4)
self.best_epoch = epoch
self.finished_at = now.isoformat() + "Z"
self.duration_seconds = (now - started).total_seconds()
def save(self, runs_dir: str | Path) -> Path:
"""Save run record to JSON file."""
runs_dir = Path(runs_dir)
runs_dir.mkdir(parents=True, exist_ok=True)
path = runs_dir / f"{self.run_id}.json"
data = asdict(self)
with path.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
return path
@classmethod
def load(cls, path: str | Path) -> "TrainingRun":
"""Load a training run from a JSON file."""
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
# Remove internal fields before passing to constructor
data.pop("_schema_version", None)
return cls(**data)
# Usage
run = TrainingRun(
run_id="run-20240115-001",
experiment_name="sentiment_v3",
learning_rate=0.001,
batch_size=32,
max_epochs=100,
)
# Simulate training completing
run.complete(train_loss=0.042, val_loss=0.058, accuracy=0.924, epoch=73)
# Save to disk
path = run.save("/tmp/ml_runs")
print(f"Saved to: {path}")
print(f"Accuracy: {run.best_val_accuracy}")
print(f"Duration: {run.duration_seconds:.2f}s")
# Load back
loaded = TrainingRun.load(path)
print(loaded.experiment_name) # sentiment_v3
print(loaded.best_val_accuracy) # 0.924
Interview Questions
Q1: What is serialization and why is it needed?
Answer: Serialization is the process of converting an in-memory Python object into a format (bytes or text) that can be stored on disk, sent over a network, or shared with another process. When a program exits, all in-memory state is lost - serialization is how you persist it. Deserialization is the reverse: converting bytes or text back into live Python objects. You need serialization whenever data must outlive a single process execution or cross a process boundary.
Q2: Why is unpickling untrusted data dangerous?
Answer: pickle's __reduce__ protocol allows an object to specify arbitrary Python code that gets executed during deserialization. When you call pickle.load(), Python runs that code unconditionally, with no sandboxing. An attacker who can craft a pickle file can make Python execute any code: spawn a shell, read/write files, connect to remote servers, or install malware. Python's own documentation states this warning explicitly. The rule is absolute: only unpickle data you yourself pickled from trusted code.
Q3: What does asdict() do with nested dataclasses?
Answer: asdict() recursively converts nested dataclasses to dictionaries. If a dataclass has a field that is itself a dataclass, asdict() converts that inner dataclass to a dict as well, producing a fully nested plain-Python structure that can be serialized to JSON without any custom encoder. This makes asdict() the standard bridge between dataclasses and JSON.
Q4: What is the advantage of Pydantic over plain dataclasses for API models?
Answer: Pydantic provides validation at instantiation time - if you pass an invalid value (wrong type, out-of-range number, invalid email format), Pydantic raises a ValidationError with detailed field-level messages immediately, before the object is used. Plain dataclasses do not validate - you can assign any value to any field. For API request/response models, Pydantic also handles serialization natively via model_dump() and model_dump_json(), including handling of datetime, Decimal, and custom types. FastAPI is built on Pydantic and uses models for request validation and response serialization automatically.
Q5: When would you use struct instead of json or pickle?
Answer: Use struct when you need to read or write binary data whose byte-level layout is fixed by an external specification - network protocols (TCP/IP headers, custom binary protocols), binary file formats (PNG, WAV, ELF binaries), or C struct interop. struct gives you precise control over byte order (big-endian vs little-endian), data types, and packing. json and pickle are both higher-level Python-centric formats; struct works at the byte level and is the right tool when the format is defined outside Python.
Q6: What is schema evolution and what are the main strategies for handling it?
Answer: Schema evolution is the challenge of reading old serialized data after you have changed the data format. Strategies include:
-
JSON with defaults: Use
dict.setdefault()or.get(key, default)when deserializing - missing fields from older records get default values. JSON is naturally forward-compatible this way. -
Version numbers: Always include a
_schema_versionor_vfield in serialized records. The deserializer checks this and applies migration logic for old versions. -
__getstate__/__setstate__: For pickle-based objects, implement these methods to control what gets pickled and apply migrations on deserialization. -
Protobuf field numbers: Protocol Buffers handle evolution naturally - adding new fields with new numbers is backward-compatible, and old data simply has those fields as default values when read by newer code.
The fundamental principle: newer code must be able to read older data. Plan for evolution from the start by including version fields and using optional fields with defaults.
Practice Challenges
Beginner: Compare Serialization Formats
Serialize the same data structure using json, pickle, and (if installed) msgpack. Print the size in bytes and verify you can restore the original data from each.
Solution
import json
import pickle
from datetime import datetime
data = {
"model": "GradientBoosting",
"params": {
"n_estimators": 500,
"learning_rate": 0.05,
"max_depth": 6,
},
"metrics": {
"accuracy": 0.934,
"f1_score": 0.921,
"roc_auc": 0.971,
},
"created_at": datetime(2024, 1, 15, 14, 30), # Not JSON-native!
"tags": ["production", "v2", "gbm"],
}
print("=== Serialization Format Comparison ===\n")
# ── JSON ─────────────────────────────────────────────────────────────────────
# datetime is not JSON-serializable; convert first
json_safe = {**data, "created_at": data["created_at"].isoformat()}
json_bytes = json.dumps(json_safe).encode("utf-8")
restored_json = json.loads(json_bytes.decode())
# Note: created_at comes back as a string, must convert manually
restored_json["created_at"] = datetime.fromisoformat(restored_json["created_at"])
print(f"JSON: {len(json_bytes):5} bytes")
assert restored_json["metrics"]["accuracy"] == data["metrics"]["accuracy"]
assert restored_json["created_at"] == data["created_at"]
print(" Round-trip: OK")
# ── pickle ────────────────────────────────────────────────────────────────────
pickle_bytes = pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL)
restored_pickle = pickle.loads(pickle_bytes)
print(f"pickle: {len(pickle_bytes):5} bytes")
assert restored_pickle["created_at"] == data["created_at"] # datetime preserved natively!
assert restored_pickle["params"] == data["params"]
print(" Round-trip: OK (datetime preserved as datetime)")
# ── msgpack ───────────────────────────────────────────────────────────────────
try:
import msgpack
# msgpack doesn't handle datetime natively; convert
msgpack_safe = {**data, "created_at": data["created_at"].isoformat()}
msgpack_bytes = msgpack.packb(msgpack_safe, use_bin_type=True)
restored_msgpack = msgpack.unpackb(msgpack_bytes, raw=False)
print(f"msgpack: {len(msgpack_bytes):5} bytes")
assert restored_msgpack["metrics"]["accuracy"] == data["metrics"]["accuracy"]
print(" Round-trip: OK")
except ImportError:
print("msgpack: not installed (pip install msgpack)")
print("\nSize summary:")
print(f" JSON baseline")
print(f" pickle {len(pickle_bytes)/len(json_bytes)*100:.0f}% of JSON size")
# Typical output: pickle is ~40-60% smaller for simple dicts with protocol 5
Intermediate: Versioned Serializer
Build a VersionedSerializer that:
- Saves objects to JSON with a
_schema_versionfield - Loads objects and applies migrations when the version is old
- Demonstrates loading V1 data correctly into a V2 schema
Solution
import json
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any
CURRENT_VERSION = 3
def migrate_v1_to_v2(data: dict) -> dict:
"""V1 → V2: Added 'role' field and renamed 'score' to 'reputation_score'."""
data["role"] = "member" # Default for old users
if "score" in data:
data["reputation_score"] = data.pop("score")
return data
def migrate_v2_to_v3(data: dict) -> dict:
"""V2 → V3: Added 'preferences' dict and 'is_active' boolean."""
data["preferences"] = {"theme": "light", "notifications": True}
data["is_active"] = True
return data
MIGRATIONS = {
1: migrate_v1_to_v2,
2: migrate_v2_to_v3,
}
def apply_migrations(data: dict, from_version: int) -> dict:
"""Apply all migrations from from_version to CURRENT_VERSION."""
version = from_version
while version < CURRENT_VERSION:
if version in MIGRATIONS:
data = MIGRATIONS[version](data)
version += 1
return data
def save(path: str | Path, data: dict) -> None:
"""Save data dict with schema version."""
record = {"_schema_version": CURRENT_VERSION, **data}
with open(path, "w", encoding="utf-8") as f:
json.dump(record, f, indent=2)
def load(path: str | Path) -> dict:
"""Load and migrate data to current schema version."""
with open(path, "r", encoding="utf-8") as f:
raw = json.load(f)
version = raw.pop("_schema_version", 1)
if version < CURRENT_VERSION:
print(f" Migrating from V{version} to V{CURRENT_VERSION}...")
raw = apply_migrations(raw, version)
return raw
# Demo
print("=== Versioned Serializer Demo ===\n")
# Create a V1 file (simulate old data on disk)
v1_path = "/tmp/user_v1.json"
with open(v1_path, "w") as f:
json.dump({"_schema_version": 1, **v1_data}, f, indent=2)
print(f"V1 file contents:")
print(open(v1_path).read())
# Load V1 data - migrations apply automatically
print("\nLoading V1 data (auto-migrating to V3):")
migrated = load(v1_path)
print(json.dumps(migrated, indent=2))
# V1 has: user_id, name, email, score
# After V1→V2: role="member", score renamed to reputation_score
# After V2→V3: preferences dict added, is_active=True
# Verify migrations applied correctly
assert migrated["role"] == "member"
assert "reputation_score" in migrated
assert "score" not in migrated
assert migrated["is_active"] is True
assert "preferences" in migrated
# Save in current format
save("/tmp/user_v3.json", migrated)
print("\nLoading V3 file (no migration needed):")
v3 = load("/tmp/user_v3.json")
print(f" role: {v3['role']}, is_active: {v3['is_active']}")
print("All assertions passed.")
Advanced: ML Experiment Tracker with Pickle + JSON Hybrid
Build an experiment tracker that stores:
- Model objects using
pickle(since scikit-learn models are not JSON-serializable) - Metadata and metrics using JSON (for human readability and cross-tool access)
The tracker should support: saving, loading, searching by metric, and listing all experiments.
Solution
import json
import pickle
import uuid
from dataclasses import dataclass, asdict, field
from datetime import datetime
from pathlib import Path
from typing import Optional, Iterator
@dataclass
class ExperimentRecord:
"""JSON-serializable metadata for one experiment."""
run_id: str
model_class: str
hyperparams: dict
metrics: dict = field(default_factory=dict)
started_at: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
finished_at: Optional[str] = None
status: str = "running" # running | completed | failed
notes: str = ""
class ExperimentTracker:
"""
Hybrid tracker: JSON for metadata (human-readable),
pickle for model objects (Python-native).
"""
def __init__(self, base_dir: str | Path):
self.base_dir = Path(base_dir)
self.meta_dir = self.base_dir / "metadata"
self.models_dir = self.base_dir / "models"
self.meta_dir.mkdir(parents=True, exist_ok=True)
self.models_dir.mkdir(parents=True, exist_ok=True)
def start_run(self, model_class: str, hyperparams: dict, notes: str = "") -> str:
"""Register a new experiment run. Returns run_id."""
run_id = f"run-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:6]}"
record = ExperimentRecord(
run_id=run_id,
model_class=model_class,
hyperparams=hyperparams,
notes=notes,
)
self._save_metadata(record)
return run_id
def log_metrics(self, run_id: str, metrics: dict) -> None:
"""Update metrics for a running experiment."""
record = self._load_metadata(run_id)
record.metrics.update({k: round(float(v), 6) for k, v in metrics.items()})
self._save_metadata(record)
def save_model(self, run_id: str, model) -> None:
"""Save trained model as pickle. Mark run as completed."""
model_path = self.models_dir / f"{run_id}.pkl"
with model_path.open("wb") as f:
pickle.dump(model, f, protocol=pickle.HIGHEST_PROTOCOL)
record = self._load_metadata(run_id)
record.status = "completed"
record.finished_at = datetime.utcnow().isoformat() + "Z"
self._save_metadata(record)
print(f"Model saved: {model_path} ({model_path.stat().st_size} bytes)")
def load_model(self, run_id: str):
"""Load a trained model from pickle. ONLY use with your own runs."""
model_path = self.models_dir / f"{run_id}.pkl"
if not model_path.exists():
raise FileNotFoundError(f"No model found for run {run_id}")
with model_path.open("rb") as f:
return pickle.load(f) # Safe: we wrote this file ourselves
def find_best_run(self, metric: str, higher_is_better: bool = True) -> Optional[ExperimentRecord]:
"""Find the run with the best value for a given metric."""
best = None
best_value = None
for record in self._all_records():
if metric not in record.metrics:
continue
value = record.metrics[metric]
if best_value is None:
best, best_value = record, value
elif higher_is_better and value > best_value:
best, best_value = record, value
elif not higher_is_better and value < best_value:
best, best_value = record, value
return best
def list_runs(self, status: Optional[str] = None) -> list[ExperimentRecord]:
"""List all experiment records, optionally filtered by status."""
records = list(self._all_records())
if status:
records = [r for r in records if r.status == status]
return sorted(records, key=lambda r: r.started_at, reverse=True)
def _save_metadata(self, record: ExperimentRecord) -> None:
path = self.meta_dir / f"{record.run_id}.json"
with path.open("w", encoding="utf-8") as f:
json.dump(asdict(record), f, indent=2)
def _load_metadata(self, run_id: str) -> ExperimentRecord:
path = self.meta_dir / f"{run_id}.json"
if not path.exists():
raise KeyError(f"No run found with id: {run_id}")
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
return ExperimentRecord(**data)
def _all_records(self) -> Iterator[ExperimentRecord]:
for path in self.meta_dir.glob("*.json"):
try:
with path.open("r", encoding="utf-8") as f:
yield ExperimentRecord(**json.load(f))
except (json.JSONDecodeError, TypeError):
continue
# Demo (without actual ML libraries - using mock models)
import random
class MockModel:
"""Simulates a trained ML model."""
def __init__(self, n_estimators, learning_rate):
self.n_estimators = n_estimators
self.learning_rate = learning_rate
self.weights = [random.random() for _ in range(100)]
def predict(self, x):
return sum(self.weights[:len(x)]) > 0.5
tracker = ExperimentTracker("/tmp/experiments")
# Run 1
run_id_1 = tracker.start_run("GradientBoosting", {"n_estimators": 100, "lr": 0.1})
model_1 = MockModel(100, 0.1)
tracker.log_metrics(run_id_1, {"accuracy": 0.921, "f1": 0.918, "val_loss": 0.089})
tracker.save_model(run_id_1, model_1)
# Run 2
run_id_2 = tracker.start_run("GradientBoosting", {"n_estimators": 500, "lr": 0.05})
model_2 = MockModel(500, 0.05)
tracker.log_metrics(run_id_2, {"accuracy": 0.934, "f1": 0.931, "val_loss": 0.062})
tracker.save_model(run_id_2, model_2)
# Find best
best = tracker.find_best_run("accuracy")
print(f"\nBest run: {best.run_id}")
print(f"Accuracy: {best.metrics['accuracy']}")
print(f"Params: {best.hyperparams}")
# Load best model
best_model = tracker.load_model(best.run_id)
print(f"Model type: {type(best_model).__name__}")
print(f"Weights count: {len(best_model.weights)}")
# List all runs
print("\nAll completed runs:")
for run in tracker.list_runs(status="completed"):
print(f" {run.run_id} accuracy={run.metrics.get('accuracy', 'N/A')}")
Quick Reference
| Format | Serialize | Deserialize | Handles datetime | Cross-lang | Safe from untrusted |
|---|---|---|---|---|---|
| json | json.dumps(d) | json.loads(s) | No (convert first) | Yes | Yes |
| pickle | pickle.dumps(o) | pickle.loads(b) | Yes (native) | No | NO |
| msgpack | msgpack.packb(d) | msgpack.unpackb(b) | No | Yes | Yes |
| struct | struct.pack(fmt, *v) | struct.unpack(fmt, b) | No | Yes | Yes |
| shelve | db[key] = obj | obj = db[key] | Yes | No | NO |
| Dataclass tool | Purpose |
|---|---|
@dataclass | Auto-generate __init__, __repr__, __eq__ |
asdict(obj) | Recursively convert to nested dict |
astuple(obj) | Convert to tuple (for CSV rows, etc.) |
fields(cls) | Get field metadata (name, type, default) |
field(default_factory=list) | Mutable default for list/dict fields |
| struct format char | Python type | Bytes |
|---|---|---|
b / B | int (signed / unsigned) | 1 |
h / H | int (signed / unsigned) | 2 |
i / I | int (signed / unsigned) | 4 |
q / Q | int (signed / unsigned) | 8 |
f | float (32-bit) | 4 |
d | float (64-bit) | 8 |
4s | bytes (4-char) | 4 |
> prefix | big-endian | - |
< prefix | little-endian | - |
Key Takeaways
- Serialization converts in-memory objects to bytes/text for storage, transmission, or inter-process sharing; deserialization reverses the process
- pickle can serialize almost any Python object natively, but is a critical security risk - never unpickle data from untrusted sources; it enables arbitrary code execution
- json is the default for cross-language data exchange; use pickle only within trusted, Python-only pipelines (ML model saving, caching)
- dataclasses provide a clean, structured way to define serializable data objects;
asdict()bridges them to JSON by recursively converting nested dataclasses to dicts - Pydantic is the production standard for API models - it combines type validation at instantiation time with native serialization, handling
datetime,Decimal, and custom types - struct operates at the byte level for binary protocols and file formats defined by external specifications; not a general-purpose serializer
- Always include a version number in serialized records - schema evolution is inevitable in production systems, and version fields enable graceful migration of old data
