Skip to main content

Memory Optimization - Fitting More in Less

Predict the memory usage of these two classes:

import sys

class PointRegular:
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z

class PointSlots:
__slots__ = ('x', 'y', 'z')
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z

regular = PointRegular(1.0, 2.0, 3.0)
slotted = PointSlots(1.0, 2.0, 3.0)

print(sys.getsizeof(regular)) # ?
print(sys.getsizeof(slotted)) # ?

On CPython 3.11 (64-bit):

48 # PointRegular - but this EXCLUDES the __dict__!
sys.getsizeof(regular.__dict__) # 104 bytes for the instance dict
# Total: 48 + 104 = 152 bytes per instance

56 # PointSlots - no __dict__, this IS the total
# Savings: 152 - 56 = 96 bytes per instance (63% reduction)

At one million instances, that is 96 MB saved - just by adding one line of code. But __slots__ has trade-offs that most tutorials never mention. This lesson covers when to use it, when to avoid it, and six other techniques for fitting more data in less memory.

What You Will Learn

  • How __slots__ works internally and its inheritance gotchas
  • How weakref prevents circular reference memory leaks
  • When to use the array module instead of lists
  • How struct.pack compresses data for binary protocols
  • How memory-mapped files let you process files larger than RAM
  • Object pooling and the flyweight pattern for reducing allocation overhead
  • Real-world techniques for processing millions of records efficiently

Prerequisites

  • Completed Lessons 1-4 (profiling, cProfile, line_profiler, caching)
  • Understanding of CPython's memory model (reference counting, __dict__, gc)
  • Familiarity with sys.getsizeof and pympler.asizeof from Lesson 3

Part 1 - __slots__ at Scale

How __slots__ Works

Every regular Python object has a __dict__ - a dictionary that stores its instance attributes. This dict is flexible (you can add any attribute at runtime) but expensive: on CPython 3.11, an empty __dict__ consumes about 64 bytes, and it grows with each attribute.

__slots__ replaces the __dict__ with fixed-offset storage. Instead of a hash table lookup, attribute access becomes a direct memory offset - like a C struct.

Memory Savings at Scale

import sys
from pympler import asizeof

class SensorReading:
def __init__(self, timestamp, sensor_id, value, unit):
self.timestamp = timestamp
self.sensor_id = sensor_id
self.value = value
self.unit = unit

class SensorReadingSlots:
__slots__ = ('timestamp', 'sensor_id', 'value', 'unit')
def __init__(self, timestamp, sensor_id, value, unit):
self.timestamp = timestamp
self.sensor_id = sensor_id
self.value = value
self.unit = unit

# Compare at scale
import time
n = 1_000_000

start = time.perf_counter()
regular_list = [SensorReading(i, i % 100, i * 0.1, "celsius")
for i in range(n)]
t_regular = time.perf_counter() - start

start = time.perf_counter()
slotted_list = [SensorReadingSlots(i, i % 100, i * 0.1, "celsius")
for i in range(n)]
t_slotted = time.perf_counter() - start

print(f"Regular: {asizeof.asizeof(regular_list) / 1024 / 1024:.0f} MB, "
f"{t_regular:.2f}s")
print(f"Slotted: {asizeof.asizeof(slotted_list) / 1024 / 1024:.0f} MB, "
f"{t_slotted:.2f}s")

# Typical output:
# Regular: 210 MB, 1.45s
# Slotted: 120 MB, 1.10s
# Savings: ~43% memory, ~24% faster creation

Inheritance Gotchas

__slots__ interacts with inheritance in ways that surprise even experienced engineers:

# Gotcha 1: Parent without __slots__ negates child's __slots__
class Base:
pass # Has __dict__

class Child(Base):
__slots__ = ('x', 'y')

c = Child()
c.x = 1
c.y = 2
c.z = 3 # This WORKS because Base provides __dict__
# The __slots__ on Child are pointless - instances still have __dict__

# Gotcha 2: Multiple inheritance with conflicting __slots__
class A:
__slots__ = ('x',)

class B:
__slots__ = ('x',) # Same slot name as A

# class C(A, B): # TypeError: multiple bases have instance lay-out conflict
# __slots__ = ()

# Gotcha 3: Correct slots inheritance
class BaseSlots:
__slots__ = ('x', 'y')

class DerivedSlots(BaseSlots):
__slots__ = ('z',) # Only add NEW slots; x, y are inherited

d = DerivedSlots()
d.x = 1 # From BaseSlots
d.y = 2 # From BaseSlots
d.z = 3 # From DerivedSlots
# d.w = 4 # AttributeError - no __dict__, can't add arbitrary attrs

:::danger The slots Inheritance Rule For __slots__ to actually save memory, every class in the MRO must define __slots__. If any ancestor omits it, all instances get a __dict__, and the memory savings disappear. This includes forgetting __slots__ = () on intermediate base classes. :::

When NOT to Use slots

# Don't use __slots__ when:

# 1. You need dynamic attributes (e.g., ORMs, plugins)
class DynamicModel:
"""ORM models need to set arbitrary attributes from DB columns."""
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value) # Requires __dict__

# 2. You use __dict__ explicitly
class Config:
def to_dict(self):
return self.__dict__ # Fails with __slots__

# 3. You have few instances (< 1000)
# The complexity is not worth saving a few KB

# 4. You need pickling compatibility with older code
# Slotted objects need __getstate__/__setstate__ for pickle

Making slots Compatible with Serialization

class SlottedSerializable:
__slots__ = ('name', 'value', 'metadata')

def __init__(self, name, value, metadata=None):
self.name = name
self.value = value
self.metadata = metadata

def __getstate__(self):
"""Support for pickle."""
return {slot: getattr(self, slot) for slot in self.__slots__
if hasattr(self, slot)}

def __setstate__(self, state):
for slot, value in state.items():
setattr(self, slot, value)

def to_dict(self):
"""JSON-compatible serialization."""
return {slot: getattr(self, slot) for slot in self.__slots__}

@classmethod
def from_dict(cls, data):
return cls(**{k: v for k, v in data.items() if k in cls.__slots__})

Part 2 - weakref: Breaking Reference Cycles

weakref creates references to objects that do not prevent garbage collection. When the last strong reference to an object is dropped, the object is collected - even if weak references still exist.

The Problem: Strong Reference Cycles

import gc
import sys

class Node:
def __init__(self, name):
self.name = name
self.parent = None
self.children = []

def add_child(self, child):
self.children.append(child)
child.parent = self # Strong reference back to parent

# Create a parent-child cycle
parent = Node("root")
child = Node("leaf")
parent.add_child(child)

# Reference count for parent: 2 (variable + child.parent)
print(sys.getrefcount(parent) - 1) # 2 (getrefcount adds 1)

del parent
# parent is NOT collected! child.parent still references it.
# The cycle (parent -> children -> child -> parent) keeps both alive.
# Python's cyclic GC will eventually collect them, but:
# 1. GC runs are expensive
# 2. Objects with __del__ in cycles may never be collected (pre-3.4)

The Fix: weakref

import weakref

class Node:
def __init__(self, name):
self.name = name
self._parent_ref = None # Will hold a weakref
self.children = []

@property
def parent(self):
if self._parent_ref is not None:
return self._parent_ref() # Dereference the weakref
return None

def add_child(self, child):
self.children.append(child)
child._parent_ref = weakref.ref(self) # Weak reference to parent

# Now the cycle is broken
parent = Node("root")
child = Node("leaf")
parent.add_child(child)

print(child.parent.name) # "root" - works fine

del parent
# parent IS collected immediately (no strong ref cycle)
print(child.parent) # None - the weakref returns None

# You can also set a callback when the referent is collected
def on_collected(ref):
print(f"Object was garbage collected")

parent = Node("root")
weak = weakref.ref(parent, on_collected)
del parent # Prints: "Object was garbage collected"

WeakValueDictionary: Caches That Don't Leak

import weakref

class ObjectCache:
"""
Cache that does not prevent garbage collection.
Entries disappear automatically when the cached objects
are no longer referenced elsewhere.
"""

def __init__(self):
self._cache = weakref.WeakValueDictionary()

def get_or_create(self, key, factory):
obj = self._cache.get(key)
if obj is not None:
return obj

obj = factory(key)
self._cache[key] = obj
return obj

@property
def size(self):
return len(self._cache)


class ExpensiveResource:
def __init__(self, resource_id):
self.resource_id = resource_id
self.data = bytearray(1_000_000) # 1 MB per resource


cache = ObjectCache()

# Create and cache resources
r1 = cache.get_or_create("db_conn_1", ExpensiveResource)
r2 = cache.get_or_create("db_conn_2", ExpensiveResource)
print(cache.size) # 2

# Resources stay cached as long as r1, r2 exist
r3 = cache.get_or_create("db_conn_1", ExpensiveResource) # Cache hit
print(r3 is r1) # True

# When external references are dropped, cache entries vanish
del r1, r3
import gc; gc.collect()
print(cache.size) # 1 - db_conn_1 was collected

WeakSet: Observer Pattern Without Leaks

import weakref

class EventBus:
"""
Event bus using WeakSet for subscribers.
Subscribers are automatically removed when garbage collected.
"""

def __init__(self):
self._subscribers = weakref.WeakSet()

def subscribe(self, handler):
self._subscribers.add(handler)

def publish(self, event):
# WeakSet automatically removes dead references
for subscriber in list(self._subscribers):
subscriber.handle(event)

@property
def subscriber_count(self):
return len(self._subscribers)


class EventHandler:
def __init__(self, name):
self.name = name

def handle(self, event):
print(f" {self.name} received: {event}")


bus = EventBus()
h1 = EventHandler("handler_1")
h2 = EventHandler("handler_2")

bus.subscribe(h1)
bus.subscribe(h2)
print(bus.subscriber_count) # 2

bus.publish("test_event")
# handler_1 received: test_event
# handler_2 received: test_event

del h1
import gc; gc.collect()
print(bus.subscriber_count) # 1 - h1 was automatically removed

bus.publish("another_event")
# handler_2 received: another_event

:::tip When to Use weakref Use weakref when you have secondary references that should not keep objects alive:

  • Parent references in tree structures (children hold weak refs to parents)
  • Caches that should not prevent GC (use WeakValueDictionary)
  • Observer/callback registrations (use WeakSet or WeakMethod)
  • Circular reference prevention in any bidirectional graph :::

Part 3 - Compact Data Structures

array Module: Typed Arrays

Python lists store pointers to arbitrary PyObjects. The array module stores raw C values - no per-element PyObject overhead.

import array
import sys

# List of 1 million integers
int_list = list(range(1_000_000))
print(f"list: {sys.getsizeof(int_list) / 1024 / 1024:.1f} MB")
# ~8.0 MB (pointers) + ~28 MB (int objects) = ~36 MB total

# array of 1 million integers (signed int, 4 bytes each)
int_array = array.array('i', range(1_000_000))
print(f"array: {sys.getsizeof(int_array) / 1024 / 1024:.1f} MB")
# ~3.8 MB (raw 4-byte ints, no PyObject overhead)

# Savings: ~90% for integer data

Type codes for array:

CodeC TypePython TypeSize (bytes)
'b'signed charint1
'B'unsigned charint1
'h'signed shortint2
'H'unsigned shortint2
'i'signed intint4
'I'unsigned intint4
'l'signed longint4-8
'q'signed long longint8
'f'floatfloat4
'd'doublefloat8
import array

# Sensor readings: float values, 4 bytes each vs 8 bytes for Python float
readings = array.array('f', [23.5, 24.1, 22.8, 25.0, 23.9] * 200_000)
print(f"array.array('f'): {sys.getsizeof(readings) / 1024 / 1024:.1f} MB")
# ~3.8 MB

readings_list = [23.5, 24.1, 22.8, 25.0, 23.9] * 200_000
print(f"list: {sys.getsizeof(readings_list) / 1024 / 1024:.1f} MB")
# ~7.6 MB (pointers only, floats are separate objects)

# array supports standard operations
readings.append(26.0)
readings.extend([27.0, 28.0])
avg = sum(readings) / len(readings)

# But: each element is boxed into a Python float on access
# For numerical computation, use NumPy instead (covered in Lesson 6)

struct Module: Binary Packing

struct packs Python values into compact binary representations - essential for binary protocols, file formats, and network communication.

import struct
import sys

# A sensor packet: timestamp (double), sensor_id (unsigned short),
# value (float), status (unsigned byte)
# Without struct: a dict or object consumes ~200+ bytes
# With struct: packed into exactly 15 bytes

fmt = 'd H f B' # double, unsigned short, float, unsigned byte
packed = struct.pack(fmt, 1709722800.0, 42, 23.5, 1)
print(f"Packed size: {len(packed)} bytes") # 15 bytes
print(f"Hex: {packed.hex()}")

# Unpack
timestamp, sensor_id, value, status = struct.unpack(fmt, packed)
print(f"timestamp={timestamp}, sensor_id={sensor_id}, "
f"value={value}, status={status}")

# At scale: 1 million sensor readings
n = 1_000_000

# As dicts: ~200 MB
dicts = [{'timestamp': float(i), 'sensor_id': i % 100,
'value': i * 0.1, 'status': 1} for i in range(n)]

# As packed binary: ~15 MB
packed_data = bytearray()
for i in range(n):
packed_data.extend(struct.pack(fmt, float(i), i % 100, i * 0.1, 1))

print(f"Dicts: {sys.getsizeof(dicts) / 1024 / 1024:.0f} MB (shallow)")
print(f"Packed: {len(packed_data) / 1024 / 1024:.0f} MB")

Batch Packing for Performance

import struct

# Pack many records at once for better performance
fmt_single = 'd H f B'
size_single = struct.calcsize(fmt_single)
n = 100_000

# Slow: pack one at a time
records = [(float(i), i % 100, i * 0.1, 1) for i in range(n)]

# Fast: pack in batch using repeated format
fmt_batch = f'{n}d {n}H {n}f {n}B'
# Or use struct.pack_into with a pre-allocated buffer:

buffer = bytearray(size_single * n)
for i, (ts, sid, val, status) in enumerate(records):
struct.pack_into(fmt_single, buffer, i * size_single, ts, sid, val, status)

print(f"Buffer size: {len(buffer) / 1024 / 1024:.1f} MB")
# 1.4 MB for 100,000 records

Part 4 - Memory-Mapped Files (mmap)

Memory-mapped files let you treat a file on disk as if it were a byte array in memory. The operating system handles paging data in and out of RAM, so you can work with files larger than available RAM.

import mmap
import os

# Create a sample data file
filename = '/tmp/large_data.bin'
with open(filename, 'wb') as f:
for i in range(1_000_000):
f.write(f"record_{i:08d}\n".encode())

file_size = os.path.getsize(filename)
print(f"File size: {file_size / 1024 / 1024:.1f} MB")

# Memory-mapped access - the file is NOT loaded into RAM all at once
with open(filename, 'r+b') as f:
mm = mmap.mmap(f.fileno(), 0) # 0 = map entire file

# Random access - like a byte array
print(mm[0:20]) # First 20 bytes
print(mm[100:120]) # Bytes 100-120
print(mm[-20:]) # Last 20 bytes

# Search
pos = mm.find(b"record_00050000")
if pos != -1:
print(f"Found at offset {pos}: {mm[pos:pos+20]}")

# You can also write
mm[0:6] = b"RECORD" # Modify in place (writes to file)

mm.close()

# Clean up
os.unlink(filename)

Processing a Large CSV with mmap

import mmap
import os

def count_lines_mmap(filepath: str) -> int:
"""Count lines in a file using mmap - works for files larger than RAM."""
with open(filepath, 'rb') as f:
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
count = 0
while mm.readline():
count += 1
mm.close()
return count

def search_in_large_file(filepath: str, pattern: bytes) -> list[int]:
"""Find all occurrences of a pattern in a large file."""
positions = []
with open(filepath, 'rb') as f:
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
pos = 0
while True:
pos = mm.find(pattern, pos)
if pos == -1:
break
positions.append(pos)
pos += 1
mm.close()
return positions

:::note When mmap Shines

  • Random access to large files: mmap gives O(1) access to any byte offset
  • Shared memory between processes: multiple processes can mmap the same file
  • Files larger than RAM: the OS pages data in/out automatically
  • Read-only analytics: scanning a 10 GB log file without loading it all

When mmap does NOT help: sequential processing of a file from start to finish - regular buffered I/O with open() is just as fast and simpler. :::

Part 5 - Object Pooling

Object creation in Python involves allocation, initialization, and (eventually) deallocation. For objects that are created and destroyed frequently, pooling avoids this overhead by reusing objects.

from collections import deque
from contextlib import contextmanager

class ObjectPool:
"""
Generic object pool that reuses instances instead of creating new ones.

Objects are checked out, used, and returned to the pool.
The pool grows on demand but recycles returned objects.
"""

def __init__(self, factory, max_size=100, reset_func=None):
self._factory = factory
self._pool = deque(maxlen=max_size)
self._max_size = max_size
self._reset = reset_func
self._stats = {'created': 0, 'reused': 0, 'returned': 0}

def acquire(self):
"""Get an object from the pool (or create a new one)."""
try:
obj = self._pool.popleft()
if self._reset:
self._reset(obj)
self._stats['reused'] += 1
return obj
except IndexError:
self._stats['created'] += 1
return self._factory()

def release(self, obj):
"""Return an object to the pool for reuse."""
if len(self._pool) < self._max_size:
self._pool.append(obj)
self._stats['returned'] += 1

@contextmanager
def checkout(self):
"""Context manager for automatic acquire/release."""
obj = self.acquire()
try:
yield obj
finally:
self.release(obj)

@property
def stats(self):
return dict(self._stats)


# Example: buffer pool for data processing
def create_buffer():
return bytearray(1024 * 1024) # 1 MB buffer

def reset_buffer(buf):
buf[:] = b'\x00' * len(buf) # Zero out

buffer_pool = ObjectPool(
factory=create_buffer,
max_size=10,
reset_func=reset_buffer,
)

# Process 1000 items, reusing buffers
for i in range(1000):
with buffer_pool.checkout() as buf:
# Use the buffer
buf[0:4] = i.to_bytes(4, 'big')
# ... process data ...

print(buffer_pool.stats)
# {'created': 1, 'reused': 999, 'returned': 1000}
# Only 1 buffer was ever created!

Connection Pool Pattern

import queue
import threading
from contextlib import contextmanager

class ConnectionPool:
"""
Thread-safe connection pool with timeout.
"""

def __init__(self, create_connection, max_connections=10, timeout=30):
self._create = create_connection
self._pool = queue.Queue(maxsize=max_connections)
self._max = max_connections
self._current_count = 0
self._lock = threading.Lock()

@contextmanager
def connection(self, timeout=None):
conn = self._acquire(timeout)
try:
yield conn
except Exception:
# Connection might be corrupted after an error
self._discard(conn)
raise
else:
self._release(conn)

def _acquire(self, timeout=None):
# Try to get from pool first
try:
return self._pool.get_nowait()
except queue.Empty:
pass

# Create new if under limit
with self._lock:
if self._current_count < self._max:
self._current_count += 1
return self._create()

# Wait for one to be returned
try:
return self._pool.get(timeout=timeout or 30)
except queue.Empty:
raise TimeoutError("Connection pool exhausted")

def _release(self, conn):
try:
self._pool.put_nowait(conn)
except queue.Full:
self._discard(conn)

def _discard(self, conn):
with self._lock:
self._current_count -= 1
try:
conn.close()
except Exception:
pass

Part 6 - The Flyweight Pattern

The flyweight pattern shares common state across many objects to reduce memory. It is ideal when many objects share identical attribute values.

import sys
from pympler import asizeof

class Color:
"""Flyweight: shared color objects."""
_cache = {}

def __new__(cls, r, g, b):
key = (r, g, b)
if key not in cls._cache:
instance = super().__new__(cls)
instance.r = r
instance.g = g
instance.b = b
cls._cache[key] = instance
return cls._cache[key]

def __repr__(self):
return f"Color({self.r}, {self.g}, {self.b})"

# Without flyweight: 1 million Color objects = 1 million allocations
# With flyweight: only unique colors are allocated

# 1 million pixels, but only 256 unique colors
import random
colors_normal = [type('C', (), {'r': r, 'g': g, 'b': b})()
for r, g, b in [
(random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255))
for _ in range(1_000_000)]]

colors_flyweight = [Color(random.randint(0, 15) * 17,
random.randint(0, 15) * 17,
random.randint(0, 15) * 17)
for _ in range(1_000_000)]

print(f"Unique flyweight colors: {len(Color._cache)}")
# Up to 4096 unique colors (16^3), but 1M references share them

Flyweight with Immutable Data

from dataclasses import dataclass

@dataclass(frozen=True)
class CurrencyFlyweight:
"""Immutable flyweight for currency data."""
code: str
name: str
symbol: str
decimal_places: int

_cache = {}

@classmethod
def get(cls, code: str) -> 'CurrencyFlyweight':
if code not in cls._cache:
currencies = {
'USD': ('US Dollar', '$', 2),
'EUR': ('Euro', '\u20ac', 2),
'GBP': ('British Pound', '\u00a3', 2),
'JPY': ('Japanese Yen', '\u00a5', 0),
'BTC': ('Bitcoin', '\u20bf', 8),
}
if code not in currencies:
raise ValueError(f"Unknown currency: {code}")
name, symbol, decimals = currencies[code]
cls._cache[code] = cls(code, name, symbol, decimals)
return cls._cache[code]

# 1 million transactions, all sharing the same currency instances
class Transaction:
__slots__ = ('amount', 'currency', 'description')

def __init__(self, amount, currency_code, description):
self.amount = amount
self.currency = CurrencyFlyweight.get(currency_code)
self.description = description

# All USD transactions share the same CurrencyFlyweight instance
t1 = Transaction(100.0, 'USD', 'Purchase 1')
t2 = Transaction(200.0, 'USD', 'Purchase 2')
print(t1.currency is t2.currency) # True - same object

Part 7 - Real-World: Processing Millions of Records

Putting it all together - here is how to process a large dataset with minimal memory:

import struct
import mmap
import os
import array
from collections import defaultdict
from dataclasses import dataclass

# Scenario: process 10 million sensor readings from a binary file.
# Each reading: timestamp (8 bytes), sensor_id (2 bytes),
# value (4 bytes), status (1 byte) = 15 bytes
# Total file: ~150 MB

RECORD_FMT = 'd H f B'
RECORD_SIZE = struct.calcsize(RECORD_FMT) # 15 bytes (+ padding = 16)

def generate_test_data(filepath: str, n_records: int):
"""Generate test binary data file."""
import random
with open(filepath, 'wb') as f:
for i in range(n_records):
record = struct.pack(
RECORD_FMT,
1700000000.0 + i, # timestamp
random.randint(0, 999), # sensor_id
random.uniform(15.0, 35.0),# value (temperature)
random.randint(0, 3), # status
)
f.write(record)

def process_with_mmap(filepath: str) -> dict:
"""
Process records using mmap - handles files larger than RAM.
Memory usage: ~constant regardless of file size.
"""
file_size = os.path.getsize(filepath)
n_records = file_size // RECORD_SIZE

# Accumulators - use array for memory efficiency
sensor_sums = defaultdict(float)
sensor_counts = defaultdict(int)
status_counts = array.array('I', [0, 0, 0, 0]) # 4 status codes

with open(filepath, 'rb') as f:
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)

for i in range(n_records):
offset = i * RECORD_SIZE
timestamp, sensor_id, value, status = struct.unpack_from(
RECORD_FMT, mm, offset
)

sensor_sums[sensor_id] += value
sensor_counts[sensor_id] += 1
status_counts[status] += 1

mm.close()

# Compute averages
sensor_averages = {
sid: sensor_sums[sid] / sensor_counts[sid]
for sid in sensor_sums
}

return {
'n_records': n_records,
'n_sensors': len(sensor_averages),
'status_distribution': list(status_counts),
'avg_temp_range': (
min(sensor_averages.values()),
max(sensor_averages.values()),
),
}


def process_chunked(filepath: str, chunk_size: int = 10_000) -> dict:
"""
Process records in chunks - good balance of speed and memory.
"""
file_size = os.path.getsize(filepath)
n_records = file_size // RECORD_SIZE
chunk_bytes = chunk_size * RECORD_SIZE

sensor_sums = defaultdict(float)
sensor_counts = defaultdict(int)
status_counts = [0, 0, 0, 0]

with open(filepath, 'rb') as f:
while True:
chunk = f.read(chunk_bytes)
if not chunk:
break

n_in_chunk = len(chunk) // RECORD_SIZE
for i in range(n_in_chunk):
offset = i * RECORD_SIZE
_, sensor_id, value, status = struct.unpack_from(
RECORD_FMT, chunk, offset
)
sensor_sums[sensor_id] += value
sensor_counts[sensor_id] += 1
status_counts[status] += 1

sensor_averages = {
sid: sensor_sums[sid] / sensor_counts[sid]
for sid in sensor_sums
}

return {
'n_records': n_records,
'n_sensors': len(sensor_averages),
'status_distribution': status_counts,
}


# Benchmark both approaches
if __name__ == '__main__':
import time

filepath = '/tmp/sensor_data.bin'
n = 1_000_000 # 1M records for demo

print("Generating test data...")
generate_test_data(filepath, n)
print(f"File size: {os.path.getsize(filepath) / 1024 / 1024:.1f} MB")

print("\nProcessing with mmap...")
start = time.perf_counter()
result1 = process_with_mmap(filepath)
t1 = time.perf_counter() - start
print(f" Time: {t1:.2f}s, Records: {result1['n_records']:,}")

print("\nProcessing with chunked reads...")
start = time.perf_counter()
result2 = process_chunked(filepath)
t2 = time.perf_counter() - start
print(f" Time: {t2:.2f}s, Records: {result2['n_records']:,}")

os.unlink(filepath)

Key Takeaways

  • __slots__ saves 40-60% memory per instance: but only if every class in the inheritance chain defines __slots__. One missing ancestor reintroduces __dict__.
  • weakref breaks reference cycles: use WeakValueDictionary for caches that should not prevent GC, and WeakSet for observer patterns.
  • array module stores raw C values: 80-90% smaller than lists for numeric data, but elements are boxed on access.
  • struct.pack creates compact binary representations: 10-20x smaller than dicts for fixed-schema records.
  • mmap handles files larger than RAM: the OS manages paging, giving you random access to arbitrary file offsets with constant memory.
  • Object pooling avoids allocation overhead: reuse expensive objects (connections, buffers) instead of creating and destroying them.
  • The flyweight pattern shares identical state: when many objects have the same attribute values, share the common object instead of duplicating it.

Graded Practice Challenges

Level 1 - Predict the Output

Question 1: What happens when you try to add a new attribute to a slotted class?

class Point:
__slots__ = ('x', 'y')

p = Point()
p.x = 1
p.y = 2
p.z = 3
Answer

AttributeError: 'Point' object has no attribute 'z'. Classes with __slots__ do not have a __dict__, so you cannot add attributes not listed in __slots__. This is actually a benefit - it catches typos like point.X = 5 immediately instead of silently creating a new attribute.

Question 2: What does weakref.ref(obj)() return after del obj?

Answer

It returns None. When the referent object is garbage collected, dereferencing the weak reference (by calling it) returns None. This is how you check if a weakly-referenced object still exists:

ref = weakref.ref(obj)
target = ref()
if target is not None:
# Object still alive
target.do_something()

Question 3: How much memory does array.array('f', [0.0] * 1_000_000) use compared to [0.0] * 1_000_000?

Answer
  • array.array('f', ...): approximately 4 MB (1 million * 4 bytes per float32, plus small overhead)
  • [0.0] * 1_000_000: approximately 8 MB for the list (pointers) plus the float objects. However, CPython interns small floats, and 0.0 is a single object. So the list holds 1M pointers to the same float, totaling about 8 MB (just the pointer array). With diverse float values, each float object costs 24 bytes, pushing total memory to ~32 MB.

The array is 2-8x more memory efficient depending on value diversity.

Level 2 - Debug Challenge

This class uses __slots__ but is not saving any memory. Find and fix the problem:

class BaseModel:
def __init__(self, id, created_at):
self.id = id
self.created_at = created_at

class User(BaseModel):
__slots__ = ('name', 'email', 'role')

def __init__(self, id, created_at, name, email, role):
super().__init__(id, created_at)
self.name = name
self.email = email
self.role = role
Answer

BaseModel does not define __slots__, so it has a __dict__. When User inherits from BaseModel, every User instance gets a __dict__ from BaseModel in addition to the __slots__ from User. The __slots__ on User are not saving any memory because the __dict__ allows arbitrary attributes anyway.

Fix:

class BaseModel:
__slots__ = ('id', 'created_at') # Add __slots__ to base class

def __init__(self, id, created_at):
self.id = id
self.created_at = created_at

class User(BaseModel):
__slots__ = ('name', 'email', 'role') # Only NEW attributes

def __init__(self, id, created_at, name, email, role):
super().__init__(id, created_at)
self.name = name
self.email = email
self.role = role

Now User instances have no __dict__ and use slot-based storage for all 5 attributes.

Level 3 - Design Challenge

You need to process a 50 GB Apache access log file on a machine with 8 GB RAM. The task:

  1. Count unique IP addresses
  2. Find the top 10 most-requested URLs
  3. Calculate requests per second over time

Design the data structures and processing pipeline. Specify memory usage bounds for each component.

Solution Sketch

Architecture: Single-pass streaming with mmap + compact accumulators.

import mmap
from collections import Counter

def process_access_log(filepath: str):
# Component 1: Unique IPs - use a set
# Worst case: ~10M unique IPs * ~50 bytes = ~500 MB (fits in 8 GB)
# Optimization: store as packed 4-byte integers for IPv4
unique_ips = set() # or use array of packed ints

# Component 2: URL counts - Counter (bounded by unique URLs)
url_counts = Counter() # Typically < 100K unique URLs

# Component 3: Requests per second - dict of timestamps
rps = Counter() # Key: second-granularity timestamp

with open(filepath, 'rb') as f:
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
while True:
line = mm.readline()
if not line:
break

# Parse Apache combined log format
# 192.168.1.1 - - [01/Jan/2024:00:00:01 +0000] "GET /path HTTP/1.1" ...
parts = line.split(b' ')
if len(parts) < 7:
continue

ip = parts[0]
unique_ips.add(ip) # ~15 bytes per IP string

url = parts[6] # The requested path
url_counts[url] += 1

# Extract timestamp (second granularity)
timestamp_part = parts[3] # [01/Jan/2024:00:00:01
rps[timestamp_part] += 1

mm.close()

return {
'unique_ips': len(unique_ips),
'top_urls': url_counts.most_common(10),
'peak_rps': max(rps.values()),
'avg_rps': sum(rps.values()) / len(rps) if rps else 0,
}

Memory budget:

  • mmap: ~0 MB (OS handles paging)
  • unique_ips set: ~500 MB worst case
  • url_counts Counter: ~10 MB (assuming 100K unique URLs)
  • rps Counter: ~50 MB (one entry per second of log coverage)
  • Total: ~560 MB peak - well within 8 GB

If memory is tighter: Replace the set with a HyperLogLog for approximate unique counting (~12 KB), or use a Bloom filter for exact duplicate detection (~1.2 GB for 10M IPs at 0.1% false positive rate).

What's Next

These memory optimization techniques work within pure Python. But for numerical workloads, the biggest performance gain comes from escaping Python's loop overhead entirely. In Vectorization with NumPy, you will learn to replace Python loops with C-level array operations that run 10-100x faster.

© 2026 EngineersOfAI. All rights reserved.