Memory Optimization - Fitting More in Less
Predict the memory usage of these two classes:
import sys
class PointRegular:
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
class PointSlots:
__slots__ = ('x', 'y', 'z')
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
regular = PointRegular(1.0, 2.0, 3.0)
slotted = PointSlots(1.0, 2.0, 3.0)
print(sys.getsizeof(regular)) # ?
print(sys.getsizeof(slotted)) # ?
On CPython 3.11 (64-bit):
48 # PointRegular - but this EXCLUDES the __dict__!
sys.getsizeof(regular.__dict__) # 104 bytes for the instance dict
# Total: 48 + 104 = 152 bytes per instance
56 # PointSlots - no __dict__, this IS the total
# Savings: 152 - 56 = 96 bytes per instance (63% reduction)
At one million instances, that is 96 MB saved - just by adding one line of code. But __slots__ has trade-offs that most tutorials never mention. This lesson covers when to use it, when to avoid it, and six other techniques for fitting more data in less memory.
What You Will Learn
- How
__slots__works internally and its inheritance gotchas - How
weakrefprevents circular reference memory leaks - When to use the
arraymodule instead of lists - How
struct.packcompresses data for binary protocols - How memory-mapped files let you process files larger than RAM
- Object pooling and the flyweight pattern for reducing allocation overhead
- Real-world techniques for processing millions of records efficiently
Prerequisites
- Completed Lessons 1-4 (profiling, cProfile, line_profiler, caching)
- Understanding of CPython's memory model (reference counting,
__dict__, gc) - Familiarity with
sys.getsizeofandpympler.asizeoffrom Lesson 3
Part 1 - __slots__ at Scale
How __slots__ Works
Every regular Python object has a __dict__ - a dictionary that stores its instance attributes. This dict is flexible (you can add any attribute at runtime) but expensive: on CPython 3.11, an empty __dict__ consumes about 64 bytes, and it grows with each attribute.
__slots__ replaces the __dict__ with fixed-offset storage. Instead of a hash table lookup, attribute access becomes a direct memory offset - like a C struct.
Memory Savings at Scale
import sys
from pympler import asizeof
class SensorReading:
def __init__(self, timestamp, sensor_id, value, unit):
self.timestamp = timestamp
self.sensor_id = sensor_id
self.value = value
self.unit = unit
class SensorReadingSlots:
__slots__ = ('timestamp', 'sensor_id', 'value', 'unit')
def __init__(self, timestamp, sensor_id, value, unit):
self.timestamp = timestamp
self.sensor_id = sensor_id
self.value = value
self.unit = unit
# Compare at scale
import time
n = 1_000_000
start = time.perf_counter()
regular_list = [SensorReading(i, i % 100, i * 0.1, "celsius")
for i in range(n)]
t_regular = time.perf_counter() - start
start = time.perf_counter()
slotted_list = [SensorReadingSlots(i, i % 100, i * 0.1, "celsius")
for i in range(n)]
t_slotted = time.perf_counter() - start
print(f"Regular: {asizeof.asizeof(regular_list) / 1024 / 1024:.0f} MB, "
f"{t_regular:.2f}s")
print(f"Slotted: {asizeof.asizeof(slotted_list) / 1024 / 1024:.0f} MB, "
f"{t_slotted:.2f}s")
# Typical output:
# Regular: 210 MB, 1.45s
# Slotted: 120 MB, 1.10s
# Savings: ~43% memory, ~24% faster creation
Inheritance Gotchas
__slots__ interacts with inheritance in ways that surprise even experienced engineers:
# Gotcha 1: Parent without __slots__ negates child's __slots__
class Base:
pass # Has __dict__
class Child(Base):
__slots__ = ('x', 'y')
c = Child()
c.x = 1
c.y = 2
c.z = 3 # This WORKS because Base provides __dict__
# The __slots__ on Child are pointless - instances still have __dict__
# Gotcha 2: Multiple inheritance with conflicting __slots__
class A:
__slots__ = ('x',)
class B:
__slots__ = ('x',) # Same slot name as A
# class C(A, B): # TypeError: multiple bases have instance lay-out conflict
# __slots__ = ()
# Gotcha 3: Correct slots inheritance
class BaseSlots:
__slots__ = ('x', 'y')
class DerivedSlots(BaseSlots):
__slots__ = ('z',) # Only add NEW slots; x, y are inherited
d = DerivedSlots()
d.x = 1 # From BaseSlots
d.y = 2 # From BaseSlots
d.z = 3 # From DerivedSlots
# d.w = 4 # AttributeError - no __dict__, can't add arbitrary attrs
:::danger The slots Inheritance Rule
For __slots__ to actually save memory, every class in the MRO must define __slots__. If any ancestor omits it, all instances get a __dict__, and the memory savings disappear. This includes forgetting __slots__ = () on intermediate base classes.
:::
When NOT to Use slots
# Don't use __slots__ when:
# 1. You need dynamic attributes (e.g., ORMs, plugins)
class DynamicModel:
"""ORM models need to set arbitrary attributes from DB columns."""
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value) # Requires __dict__
# 2. You use __dict__ explicitly
class Config:
def to_dict(self):
return self.__dict__ # Fails with __slots__
# 3. You have few instances (< 1000)
# The complexity is not worth saving a few KB
# 4. You need pickling compatibility with older code
# Slotted objects need __getstate__/__setstate__ for pickle
Making slots Compatible with Serialization
class SlottedSerializable:
__slots__ = ('name', 'value', 'metadata')
def __init__(self, name, value, metadata=None):
self.name = name
self.value = value
self.metadata = metadata
def __getstate__(self):
"""Support for pickle."""
return {slot: getattr(self, slot) for slot in self.__slots__
if hasattr(self, slot)}
def __setstate__(self, state):
for slot, value in state.items():
setattr(self, slot, value)
def to_dict(self):
"""JSON-compatible serialization."""
return {slot: getattr(self, slot) for slot in self.__slots__}
@classmethod
def from_dict(cls, data):
return cls(**{k: v for k, v in data.items() if k in cls.__slots__})
Part 2 - weakref: Breaking Reference Cycles
weakref creates references to objects that do not prevent garbage collection. When the last strong reference to an object is dropped, the object is collected - even if weak references still exist.
The Problem: Strong Reference Cycles
import gc
import sys
class Node:
def __init__(self, name):
self.name = name
self.parent = None
self.children = []
def add_child(self, child):
self.children.append(child)
child.parent = self # Strong reference back to parent
# Create a parent-child cycle
parent = Node("root")
child = Node("leaf")
parent.add_child(child)
# Reference count for parent: 2 (variable + child.parent)
print(sys.getrefcount(parent) - 1) # 2 (getrefcount adds 1)
del parent
# parent is NOT collected! child.parent still references it.
# The cycle (parent -> children -> child -> parent) keeps both alive.
# Python's cyclic GC will eventually collect them, but:
# 1. GC runs are expensive
# 2. Objects with __del__ in cycles may never be collected (pre-3.4)
The Fix: weakref
import weakref
class Node:
def __init__(self, name):
self.name = name
self._parent_ref = None # Will hold a weakref
self.children = []
@property
def parent(self):
if self._parent_ref is not None:
return self._parent_ref() # Dereference the weakref
return None
def add_child(self, child):
self.children.append(child)
child._parent_ref = weakref.ref(self) # Weak reference to parent
# Now the cycle is broken
parent = Node("root")
child = Node("leaf")
parent.add_child(child)
print(child.parent.name) # "root" - works fine
del parent
# parent IS collected immediately (no strong ref cycle)
print(child.parent) # None - the weakref returns None
# You can also set a callback when the referent is collected
def on_collected(ref):
print(f"Object was garbage collected")
parent = Node("root")
weak = weakref.ref(parent, on_collected)
del parent # Prints: "Object was garbage collected"
WeakValueDictionary: Caches That Don't Leak
import weakref
class ObjectCache:
"""
Cache that does not prevent garbage collection.
Entries disappear automatically when the cached objects
are no longer referenced elsewhere.
"""
def __init__(self):
self._cache = weakref.WeakValueDictionary()
def get_or_create(self, key, factory):
obj = self._cache.get(key)
if obj is not None:
return obj
obj = factory(key)
self._cache[key] = obj
return obj
@property
def size(self):
return len(self._cache)
class ExpensiveResource:
def __init__(self, resource_id):
self.resource_id = resource_id
self.data = bytearray(1_000_000) # 1 MB per resource
cache = ObjectCache()
# Create and cache resources
r1 = cache.get_or_create("db_conn_1", ExpensiveResource)
r2 = cache.get_or_create("db_conn_2", ExpensiveResource)
print(cache.size) # 2
# Resources stay cached as long as r1, r2 exist
r3 = cache.get_or_create("db_conn_1", ExpensiveResource) # Cache hit
print(r3 is r1) # True
# When external references are dropped, cache entries vanish
del r1, r3
import gc; gc.collect()
print(cache.size) # 1 - db_conn_1 was collected
WeakSet: Observer Pattern Without Leaks
import weakref
class EventBus:
"""
Event bus using WeakSet for subscribers.
Subscribers are automatically removed when garbage collected.
"""
def __init__(self):
self._subscribers = weakref.WeakSet()
def subscribe(self, handler):
self._subscribers.add(handler)
def publish(self, event):
# WeakSet automatically removes dead references
for subscriber in list(self._subscribers):
subscriber.handle(event)
@property
def subscriber_count(self):
return len(self._subscribers)
class EventHandler:
def __init__(self, name):
self.name = name
def handle(self, event):
print(f" {self.name} received: {event}")
bus = EventBus()
h1 = EventHandler("handler_1")
h2 = EventHandler("handler_2")
bus.subscribe(h1)
bus.subscribe(h2)
print(bus.subscriber_count) # 2
bus.publish("test_event")
# handler_1 received: test_event
# handler_2 received: test_event
del h1
import gc; gc.collect()
print(bus.subscriber_count) # 1 - h1 was automatically removed
bus.publish("another_event")
# handler_2 received: another_event
:::tip When to Use weakref
Use weakref when you have secondary references that should not keep objects alive:
- Parent references in tree structures (children hold weak refs to parents)
- Caches that should not prevent GC (use
WeakValueDictionary) - Observer/callback registrations (use
WeakSetorWeakMethod) - Circular reference prevention in any bidirectional graph :::
Part 3 - Compact Data Structures
array Module: Typed Arrays
Python lists store pointers to arbitrary PyObjects. The array module stores raw C values - no per-element PyObject overhead.
import array
import sys
# List of 1 million integers
int_list = list(range(1_000_000))
print(f"list: {sys.getsizeof(int_list) / 1024 / 1024:.1f} MB")
# ~8.0 MB (pointers) + ~28 MB (int objects) = ~36 MB total
# array of 1 million integers (signed int, 4 bytes each)
int_array = array.array('i', range(1_000_000))
print(f"array: {sys.getsizeof(int_array) / 1024 / 1024:.1f} MB")
# ~3.8 MB (raw 4-byte ints, no PyObject overhead)
# Savings: ~90% for integer data
Type codes for array:
| Code | C Type | Python Type | Size (bytes) |
|---|---|---|---|
'b' | signed char | int | 1 |
'B' | unsigned char | int | 1 |
'h' | signed short | int | 2 |
'H' | unsigned short | int | 2 |
'i' | signed int | int | 4 |
'I' | unsigned int | int | 4 |
'l' | signed long | int | 4-8 |
'q' | signed long long | int | 8 |
'f' | float | float | 4 |
'd' | double | float | 8 |
import array
# Sensor readings: float values, 4 bytes each vs 8 bytes for Python float
readings = array.array('f', [23.5, 24.1, 22.8, 25.0, 23.9] * 200_000)
print(f"array.array('f'): {sys.getsizeof(readings) / 1024 / 1024:.1f} MB")
# ~3.8 MB
readings_list = [23.5, 24.1, 22.8, 25.0, 23.9] * 200_000
print(f"list: {sys.getsizeof(readings_list) / 1024 / 1024:.1f} MB")
# ~7.6 MB (pointers only, floats are separate objects)
# array supports standard operations
readings.append(26.0)
readings.extend([27.0, 28.0])
avg = sum(readings) / len(readings)
# But: each element is boxed into a Python float on access
# For numerical computation, use NumPy instead (covered in Lesson 6)
struct Module: Binary Packing
struct packs Python values into compact binary representations - essential for binary protocols, file formats, and network communication.
import struct
import sys
# A sensor packet: timestamp (double), sensor_id (unsigned short),
# value (float), status (unsigned byte)
# Without struct: a dict or object consumes ~200+ bytes
# With struct: packed into exactly 15 bytes
fmt = 'd H f B' # double, unsigned short, float, unsigned byte
packed = struct.pack(fmt, 1709722800.0, 42, 23.5, 1)
print(f"Packed size: {len(packed)} bytes") # 15 bytes
print(f"Hex: {packed.hex()}")
# Unpack
timestamp, sensor_id, value, status = struct.unpack(fmt, packed)
print(f"timestamp={timestamp}, sensor_id={sensor_id}, "
f"value={value}, status={status}")
# At scale: 1 million sensor readings
n = 1_000_000
# As dicts: ~200 MB
dicts = [{'timestamp': float(i), 'sensor_id': i % 100,
'value': i * 0.1, 'status': 1} for i in range(n)]
# As packed binary: ~15 MB
packed_data = bytearray()
for i in range(n):
packed_data.extend(struct.pack(fmt, float(i), i % 100, i * 0.1, 1))
print(f"Dicts: {sys.getsizeof(dicts) / 1024 / 1024:.0f} MB (shallow)")
print(f"Packed: {len(packed_data) / 1024 / 1024:.0f} MB")
Batch Packing for Performance
import struct
# Pack many records at once for better performance
fmt_single = 'd H f B'
size_single = struct.calcsize(fmt_single)
n = 100_000
# Slow: pack one at a time
records = [(float(i), i % 100, i * 0.1, 1) for i in range(n)]
# Fast: pack in batch using repeated format
fmt_batch = f'{n}d {n}H {n}f {n}B'
# Or use struct.pack_into with a pre-allocated buffer:
buffer = bytearray(size_single * n)
for i, (ts, sid, val, status) in enumerate(records):
struct.pack_into(fmt_single, buffer, i * size_single, ts, sid, val, status)
print(f"Buffer size: {len(buffer) / 1024 / 1024:.1f} MB")
# 1.4 MB for 100,000 records
Part 4 - Memory-Mapped Files (mmap)
Memory-mapped files let you treat a file on disk as if it were a byte array in memory. The operating system handles paging data in and out of RAM, so you can work with files larger than available RAM.
import mmap
import os
# Create a sample data file
filename = '/tmp/large_data.bin'
with open(filename, 'wb') as f:
for i in range(1_000_000):
f.write(f"record_{i:08d}\n".encode())
file_size = os.path.getsize(filename)
print(f"File size: {file_size / 1024 / 1024:.1f} MB")
# Memory-mapped access - the file is NOT loaded into RAM all at once
with open(filename, 'r+b') as f:
mm = mmap.mmap(f.fileno(), 0) # 0 = map entire file
# Random access - like a byte array
print(mm[0:20]) # First 20 bytes
print(mm[100:120]) # Bytes 100-120
print(mm[-20:]) # Last 20 bytes
# Search
pos = mm.find(b"record_00050000")
if pos != -1:
print(f"Found at offset {pos}: {mm[pos:pos+20]}")
# You can also write
mm[0:6] = b"RECORD" # Modify in place (writes to file)
mm.close()
# Clean up
os.unlink(filename)
Processing a Large CSV with mmap
import mmap
import os
def count_lines_mmap(filepath: str) -> int:
"""Count lines in a file using mmap - works for files larger than RAM."""
with open(filepath, 'rb') as f:
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
count = 0
while mm.readline():
count += 1
mm.close()
return count
def search_in_large_file(filepath: str, pattern: bytes) -> list[int]:
"""Find all occurrences of a pattern in a large file."""
positions = []
with open(filepath, 'rb') as f:
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
pos = 0
while True:
pos = mm.find(pattern, pos)
if pos == -1:
break
positions.append(pos)
pos += 1
mm.close()
return positions
:::note When mmap Shines
- Random access to large files: mmap gives O(1) access to any byte offset
- Shared memory between processes: multiple processes can mmap the same file
- Files larger than RAM: the OS pages data in/out automatically
- Read-only analytics: scanning a 10 GB log file without loading it all
When mmap does NOT help: sequential processing of a file from start to finish - regular buffered I/O with open() is just as fast and simpler.
:::
Part 5 - Object Pooling
Object creation in Python involves allocation, initialization, and (eventually) deallocation. For objects that are created and destroyed frequently, pooling avoids this overhead by reusing objects.
from collections import deque
from contextlib import contextmanager
class ObjectPool:
"""
Generic object pool that reuses instances instead of creating new ones.
Objects are checked out, used, and returned to the pool.
The pool grows on demand but recycles returned objects.
"""
def __init__(self, factory, max_size=100, reset_func=None):
self._factory = factory
self._pool = deque(maxlen=max_size)
self._max_size = max_size
self._reset = reset_func
self._stats = {'created': 0, 'reused': 0, 'returned': 0}
def acquire(self):
"""Get an object from the pool (or create a new one)."""
try:
obj = self._pool.popleft()
if self._reset:
self._reset(obj)
self._stats['reused'] += 1
return obj
except IndexError:
self._stats['created'] += 1
return self._factory()
def release(self, obj):
"""Return an object to the pool for reuse."""
if len(self._pool) < self._max_size:
self._pool.append(obj)
self._stats['returned'] += 1
@contextmanager
def checkout(self):
"""Context manager for automatic acquire/release."""
obj = self.acquire()
try:
yield obj
finally:
self.release(obj)
@property
def stats(self):
return dict(self._stats)
# Example: buffer pool for data processing
def create_buffer():
return bytearray(1024 * 1024) # 1 MB buffer
def reset_buffer(buf):
buf[:] = b'\x00' * len(buf) # Zero out
buffer_pool = ObjectPool(
factory=create_buffer,
max_size=10,
reset_func=reset_buffer,
)
# Process 1000 items, reusing buffers
for i in range(1000):
with buffer_pool.checkout() as buf:
# Use the buffer
buf[0:4] = i.to_bytes(4, 'big')
# ... process data ...
print(buffer_pool.stats)
# {'created': 1, 'reused': 999, 'returned': 1000}
# Only 1 buffer was ever created!
Connection Pool Pattern
import queue
import threading
from contextlib import contextmanager
class ConnectionPool:
"""
Thread-safe connection pool with timeout.
"""
def __init__(self, create_connection, max_connections=10, timeout=30):
self._create = create_connection
self._pool = queue.Queue(maxsize=max_connections)
self._max = max_connections
self._current_count = 0
self._lock = threading.Lock()
@contextmanager
def connection(self, timeout=None):
conn = self._acquire(timeout)
try:
yield conn
except Exception:
# Connection might be corrupted after an error
self._discard(conn)
raise
else:
self._release(conn)
def _acquire(self, timeout=None):
# Try to get from pool first
try:
return self._pool.get_nowait()
except queue.Empty:
pass
# Create new if under limit
with self._lock:
if self._current_count < self._max:
self._current_count += 1
return self._create()
# Wait for one to be returned
try:
return self._pool.get(timeout=timeout or 30)
except queue.Empty:
raise TimeoutError("Connection pool exhausted")
def _release(self, conn):
try:
self._pool.put_nowait(conn)
except queue.Full:
self._discard(conn)
def _discard(self, conn):
with self._lock:
self._current_count -= 1
try:
conn.close()
except Exception:
pass
Part 6 - The Flyweight Pattern
The flyweight pattern shares common state across many objects to reduce memory. It is ideal when many objects share identical attribute values.
import sys
from pympler import asizeof
class Color:
"""Flyweight: shared color objects."""
_cache = {}
def __new__(cls, r, g, b):
key = (r, g, b)
if key not in cls._cache:
instance = super().__new__(cls)
instance.r = r
instance.g = g
instance.b = b
cls._cache[key] = instance
return cls._cache[key]
def __repr__(self):
return f"Color({self.r}, {self.g}, {self.b})"
# Without flyweight: 1 million Color objects = 1 million allocations
# With flyweight: only unique colors are allocated
# 1 million pixels, but only 256 unique colors
import random
colors_normal = [type('C', (), {'r': r, 'g': g, 'b': b})()
for r, g, b in [
(random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255))
for _ in range(1_000_000)]]
colors_flyweight = [Color(random.randint(0, 15) * 17,
random.randint(0, 15) * 17,
random.randint(0, 15) * 17)
for _ in range(1_000_000)]
print(f"Unique flyweight colors: {len(Color._cache)}")
# Up to 4096 unique colors (16^3), but 1M references share them
Flyweight with Immutable Data
from dataclasses import dataclass
@dataclass(frozen=True)
class CurrencyFlyweight:
"""Immutable flyweight for currency data."""
code: str
name: str
symbol: str
decimal_places: int
_cache = {}
@classmethod
def get(cls, code: str) -> 'CurrencyFlyweight':
if code not in cls._cache:
currencies = {
'USD': ('US Dollar', '$', 2),
'EUR': ('Euro', '\u20ac', 2),
'GBP': ('British Pound', '\u00a3', 2),
'JPY': ('Japanese Yen', '\u00a5', 0),
'BTC': ('Bitcoin', '\u20bf', 8),
}
if code not in currencies:
raise ValueError(f"Unknown currency: {code}")
name, symbol, decimals = currencies[code]
cls._cache[code] = cls(code, name, symbol, decimals)
return cls._cache[code]
# 1 million transactions, all sharing the same currency instances
class Transaction:
__slots__ = ('amount', 'currency', 'description')
def __init__(self, amount, currency_code, description):
self.amount = amount
self.currency = CurrencyFlyweight.get(currency_code)
self.description = description
# All USD transactions share the same CurrencyFlyweight instance
t1 = Transaction(100.0, 'USD', 'Purchase 1')
t2 = Transaction(200.0, 'USD', 'Purchase 2')
print(t1.currency is t2.currency) # True - same object
Part 7 - Real-World: Processing Millions of Records
Putting it all together - here is how to process a large dataset with minimal memory:
import struct
import mmap
import os
import array
from collections import defaultdict
from dataclasses import dataclass
# Scenario: process 10 million sensor readings from a binary file.
# Each reading: timestamp (8 bytes), sensor_id (2 bytes),
# value (4 bytes), status (1 byte) = 15 bytes
# Total file: ~150 MB
RECORD_FMT = 'd H f B'
RECORD_SIZE = struct.calcsize(RECORD_FMT) # 15 bytes (+ padding = 16)
def generate_test_data(filepath: str, n_records: int):
"""Generate test binary data file."""
import random
with open(filepath, 'wb') as f:
for i in range(n_records):
record = struct.pack(
RECORD_FMT,
1700000000.0 + i, # timestamp
random.randint(0, 999), # sensor_id
random.uniform(15.0, 35.0),# value (temperature)
random.randint(0, 3), # status
)
f.write(record)
def process_with_mmap(filepath: str) -> dict:
"""
Process records using mmap - handles files larger than RAM.
Memory usage: ~constant regardless of file size.
"""
file_size = os.path.getsize(filepath)
n_records = file_size // RECORD_SIZE
# Accumulators - use array for memory efficiency
sensor_sums = defaultdict(float)
sensor_counts = defaultdict(int)
status_counts = array.array('I', [0, 0, 0, 0]) # 4 status codes
with open(filepath, 'rb') as f:
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
for i in range(n_records):
offset = i * RECORD_SIZE
timestamp, sensor_id, value, status = struct.unpack_from(
RECORD_FMT, mm, offset
)
sensor_sums[sensor_id] += value
sensor_counts[sensor_id] += 1
status_counts[status] += 1
mm.close()
# Compute averages
sensor_averages = {
sid: sensor_sums[sid] / sensor_counts[sid]
for sid in sensor_sums
}
return {
'n_records': n_records,
'n_sensors': len(sensor_averages),
'status_distribution': list(status_counts),
'avg_temp_range': (
min(sensor_averages.values()),
max(sensor_averages.values()),
),
}
def process_chunked(filepath: str, chunk_size: int = 10_000) -> dict:
"""
Process records in chunks - good balance of speed and memory.
"""
file_size = os.path.getsize(filepath)
n_records = file_size // RECORD_SIZE
chunk_bytes = chunk_size * RECORD_SIZE
sensor_sums = defaultdict(float)
sensor_counts = defaultdict(int)
status_counts = [0, 0, 0, 0]
with open(filepath, 'rb') as f:
while True:
chunk = f.read(chunk_bytes)
if not chunk:
break
n_in_chunk = len(chunk) // RECORD_SIZE
for i in range(n_in_chunk):
offset = i * RECORD_SIZE
_, sensor_id, value, status = struct.unpack_from(
RECORD_FMT, chunk, offset
)
sensor_sums[sensor_id] += value
sensor_counts[sensor_id] += 1
status_counts[status] += 1
sensor_averages = {
sid: sensor_sums[sid] / sensor_counts[sid]
for sid in sensor_sums
}
return {
'n_records': n_records,
'n_sensors': len(sensor_averages),
'status_distribution': status_counts,
}
# Benchmark both approaches
if __name__ == '__main__':
import time
filepath = '/tmp/sensor_data.bin'
n = 1_000_000 # 1M records for demo
print("Generating test data...")
generate_test_data(filepath, n)
print(f"File size: {os.path.getsize(filepath) / 1024 / 1024:.1f} MB")
print("\nProcessing with mmap...")
start = time.perf_counter()
result1 = process_with_mmap(filepath)
t1 = time.perf_counter() - start
print(f" Time: {t1:.2f}s, Records: {result1['n_records']:,}")
print("\nProcessing with chunked reads...")
start = time.perf_counter()
result2 = process_chunked(filepath)
t2 = time.perf_counter() - start
print(f" Time: {t2:.2f}s, Records: {result2['n_records']:,}")
os.unlink(filepath)
Key Takeaways
__slots__saves 40-60% memory per instance: but only if every class in the inheritance chain defines__slots__. One missing ancestor reintroduces__dict__.weakrefbreaks reference cycles: useWeakValueDictionaryfor caches that should not prevent GC, andWeakSetfor observer patterns.arraymodule stores raw C values: 80-90% smaller than lists for numeric data, but elements are boxed on access.struct.packcreates compact binary representations: 10-20x smaller than dicts for fixed-schema records.mmaphandles files larger than RAM: the OS manages paging, giving you random access to arbitrary file offsets with constant memory.- Object pooling avoids allocation overhead: reuse expensive objects (connections, buffers) instead of creating and destroying them.
- The flyweight pattern shares identical state: when many objects have the same attribute values, share the common object instead of duplicating it.
Graded Practice Challenges
Level 1 - Predict the Output
Question 1: What happens when you try to add a new attribute to a slotted class?
class Point:
__slots__ = ('x', 'y')
p = Point()
p.x = 1
p.y = 2
p.z = 3
Answer
AttributeError: 'Point' object has no attribute 'z'. Classes with __slots__ do not have a __dict__, so you cannot add attributes not listed in __slots__. This is actually a benefit - it catches typos like point.X = 5 immediately instead of silently creating a new attribute.
Question 2: What does weakref.ref(obj)() return after del obj?
Answer
It returns None. When the referent object is garbage collected, dereferencing the weak reference (by calling it) returns None. This is how you check if a weakly-referenced object still exists:
ref = weakref.ref(obj)
target = ref()
if target is not None:
# Object still alive
target.do_something()
Question 3: How much memory does array.array('f', [0.0] * 1_000_000) use compared to [0.0] * 1_000_000?
Answer
array.array('f', ...): approximately 4 MB (1 million * 4 bytes per float32, plus small overhead)[0.0] * 1_000_000: approximately 8 MB for the list (pointers) plus the float objects. However, CPython interns small floats, and0.0is a single object. So the list holds 1M pointers to the same float, totaling about 8 MB (just the pointer array). With diverse float values, each float object costs 24 bytes, pushing total memory to ~32 MB.
The array is 2-8x more memory efficient depending on value diversity.
Level 2 - Debug Challenge
This class uses __slots__ but is not saving any memory. Find and fix the problem:
class BaseModel:
def __init__(self, id, created_at):
self.id = id
self.created_at = created_at
class User(BaseModel):
__slots__ = ('name', 'email', 'role')
def __init__(self, id, created_at, name, email, role):
super().__init__(id, created_at)
self.name = name
self.email = email
self.role = role
Answer
BaseModel does not define __slots__, so it has a __dict__. When User inherits from BaseModel, every User instance gets a __dict__ from BaseModel in addition to the __slots__ from User. The __slots__ on User are not saving any memory because the __dict__ allows arbitrary attributes anyway.
Fix:
class BaseModel:
__slots__ = ('id', 'created_at') # Add __slots__ to base class
def __init__(self, id, created_at):
self.id = id
self.created_at = created_at
class User(BaseModel):
__slots__ = ('name', 'email', 'role') # Only NEW attributes
def __init__(self, id, created_at, name, email, role):
super().__init__(id, created_at)
self.name = name
self.email = email
self.role = role
Now User instances have no __dict__ and use slot-based storage for all 5 attributes.
Level 3 - Design Challenge
You need to process a 50 GB Apache access log file on a machine with 8 GB RAM. The task:
- Count unique IP addresses
- Find the top 10 most-requested URLs
- Calculate requests per second over time
Design the data structures and processing pipeline. Specify memory usage bounds for each component.
Solution Sketch
Architecture: Single-pass streaming with mmap + compact accumulators.
import mmap
from collections import Counter
def process_access_log(filepath: str):
# Component 1: Unique IPs - use a set
# Worst case: ~10M unique IPs * ~50 bytes = ~500 MB (fits in 8 GB)
# Optimization: store as packed 4-byte integers for IPv4
unique_ips = set() # or use array of packed ints
# Component 2: URL counts - Counter (bounded by unique URLs)
url_counts = Counter() # Typically < 100K unique URLs
# Component 3: Requests per second - dict of timestamps
rps = Counter() # Key: second-granularity timestamp
with open(filepath, 'rb') as f:
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
while True:
line = mm.readline()
if not line:
break
# Parse Apache combined log format
# 192.168.1.1 - - [01/Jan/2024:00:00:01 +0000] "GET /path HTTP/1.1" ...
parts = line.split(b' ')
if len(parts) < 7:
continue
ip = parts[0]
unique_ips.add(ip) # ~15 bytes per IP string
url = parts[6] # The requested path
url_counts[url] += 1
# Extract timestamp (second granularity)
timestamp_part = parts[3] # [01/Jan/2024:00:00:01
rps[timestamp_part] += 1
mm.close()
return {
'unique_ips': len(unique_ips),
'top_urls': url_counts.most_common(10),
'peak_rps': max(rps.values()),
'avg_rps': sum(rps.values()) / len(rps) if rps else 0,
}
Memory budget:
- mmap: ~0 MB (OS handles paging)
- unique_ips set: ~500 MB worst case
- url_counts Counter: ~10 MB (assuming 100K unique URLs)
- rps Counter: ~50 MB (one entry per second of log coverage)
- Total: ~560 MB peak - well within 8 GB
If memory is tighter: Replace the set with a HyperLogLog for approximate unique counting (~12 KB), or use a Bloom filter for exact duplicate detection (~1.2 GB for 10M IPs at 0.1% false positive rate).
What's Next
These memory optimization techniques work within pure Python. But for numerical workloads, the biggest performance gain comes from escaping Python's loop overhead entirely. In Vectorization with NumPy, you will learn to replace Python loops with C-level array operations that run 10-100x faster.
