Python Memory Optimization Practice Problems & Exercises
Practice: Memory Optimization
← Back to lessonEasy
Compare memory usage of a regular class vs a __slots__ class for a Point with x, y, z coordinates.
import sys
import tracemalloc
class RegularPoint:
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
class SlottedPoint:
__slots__ = ("x", "y", "z")
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
rp = RegularPoint(1.0, 2.0, 3.0)
sp = SlottedPoint(1.0, 2.0, 3.0)
print(f"Regular: {sys.getsizeof(rp)} bytes")
print(f"Slotted: {sys.getsizeof(sp)} bytes")
print(f"Savings per instance: {sys.getsizeof(rp) - sys.getsizeof(sp)} bytes")
# Verify __slots__ prevents dynamic attributes
try:
sp.w = 4.0
print("ERROR: should not allow new attribute")
except AttributeError:
print("__slots__ correctly prevents dynamic attributes")
print("__slots__ class uses less memory per instance")
Solution
import sys
class RegularPoint:
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
class SlottedPoint:
__slots__ = ("x", "y", "z")
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
rp = RegularPoint(1.0, 2.0, 3.0)
sp = SlottedPoint(1.0, 2.0, 3.0)
print(f"Regular: {sys.getsizeof(rp)} bytes")
print(f"Slotted: {sys.getsizeof(sp)} bytes")
print(f"Savings per instance: {sys.getsizeof(rp) - sys.getsizeof(sp)} bytes")
try:
sp.w = 4.0
except AttributeError:
print("__slots__ correctly prevents dynamic attributes")
print("__slots__ class uses less memory per instance")
slots mechanics:
- Regular class: each instance has
__dict__(the attribute namespace dict),__weakref__, and the object header. Total ~232 bytes for an empty instance. __slots__class: replaces__dict__with fixed-size descriptors. ~48-56 bytes per instance.- Savings are 100-200 bytes per instance — for 1 million instances: 100-200 MB saved.
- Limitation: cannot add new attributes at runtime;
__dict__is gone. __weakref__is also removed unless explicitly added to__slots__.
Expected Output
__slots__ class uses less memory per instanceHints
Hint 1: __slots__ removes the per-instance __dict__, saving ~200 bytes per instance.
Hint 2: Use sys.getsizeof() to compare instance sizes.
Measure the peak memory difference between a list comprehension and a generator expression for large data.
import tracemalloc
def sum_squares_list(n):
squares = [i * i for i in range(n)]
return sum(squares)
def sum_squares_gen(n):
squares = (i * i for i in range(n))
return sum(squares)
n = 500_000
tracemalloc.start()
sum_squares_list(n)
_, peak_list = tracemalloc.get_traced_memory()
tracemalloc.stop()
tracemalloc.start()
sum_squares_gen(n)
_, peak_gen = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"List peak: {peak_list / 1e6:.1f} MB")
print(f"Generator peak: {peak_gen / 1024:.0f} KB")
print(f"Ratio: {peak_list / peak_gen:.0f}x")
print("Generator uses dramatically less memory than list")
Solution
import tracemalloc
def sum_squares_list(n):
return sum([i * i for i in range(n)])
def sum_squares_gen(n):
return sum(i * i for i in range(n))
n = 500_000
tracemalloc.start()
sum_squares_list(n)
_, peak_list = tracemalloc.get_traced_memory()
tracemalloc.stop()
tracemalloc.start()
sum_squares_gen(n)
_, peak_gen = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"List peak: {peak_list / 1e6:.1f} MB")
print(f"Generator peak: {peak_gen / 1024:.0f} KB")
print(f"Ratio: {peak_list / peak_gen:.0f}x")
print("Generator uses dramatically less memory than list")
Generator memory model:
- List
[i*i for i in range(n)]: allocates n integer objects + n pointer slots in the list array. For n=500k: ~4-20MB depending on integer size. - Generator
(i*i for i in range(n)): stores only the generator state (current i, code pointer). ~100-200 bytes regardless of n. - Use generators whenever you only need to iterate once and don't need random access.
- If you need to iterate multiple times, materialize to a list the first time and reuse it.
Expected Output
Generator uses dramatically less memory than listHints
Hint 1: A list materializes all values at once. A generator computes one at a time.
Hint 2: Use tracemalloc to measure peak memory for both.
Compare memory usage of array.array vs a list for storing 100,000 floats.
import array
import sys
n = 100_000
float_list = [float(i) for i in range(n)]
float_array = array.array("d", range(n)) # "d" = double (C double = 8 bytes)
list_size = sys.getsizeof(float_list) + sum(sys.getsizeof(x) for x in float_list[:1000]) * n // 1000
array_size = sys.getsizeof(float_array)
print(f"List approx size: {list_size / 1e6:.1f} MB")
print(f"array.array size: {array_size / 1e6:.1f} MB")
print(f"Ratio: {list_size / array_size:.1f}x")
print("array is more memory-efficient than list for numeric data")
Solution
import array
import sys
n = 100_000
float_list = [float(i) for i in range(n)]
float_array = array.array("d", range(n))
# List: pointer array + individual Python float objects
list_array_overhead = sys.getsizeof(float_list)
float_obj_size = sys.getsizeof(1.0) # ~24 bytes
total_list = list_array_overhead + float_obj_size * n
array_size = sys.getsizeof(float_array)
print(f"List approx: {total_list / 1e6:.1f} MB")
print(f"array.array: {array_size / 1e6:.1f} MB")
print(f"Ratio: {total_list / array_size:.1f}x")
print("array is more memory-efficient than list for numeric data")
array.array vs list:
- Python list: an array of pointers to Python objects. Each float is a 24-byte heap-allocated object + 8-byte pointer = 32 bytes minimum.
array.array("d"): stores raw C doubles, 8 bytes each. No object overhead.- For 100,000 floats: list ~3.2MB, array ~0.8MB — roughly 4x more efficient.
- Type codes:
"b"int8,"h"int16,"i"int32,"l"int64,"f"float32,"d"float64. - For mathematical work on large numeric arrays, use NumPy (
np.array) — it is faster AND more memory-efficient thanarray.array.
Expected Output
array is more memory-efficient than list for numeric dataHints
Hint 1: array.array stores primitive C types (not Python objects). Bytes per element = C type size.
Hint 2: A Python float in a list = 24-byte object + 8-byte pointer. In array.array("d") = 8 bytes.
Medium
Use struct to pack a list of records into a compact binary buffer and compare sizes.
import struct
import sys
# Each record: id (4 bytes), x (4 bytes float), y (4 bytes float), flags (1 byte)
RECORD_FORMAT = "!IffB" # big-endian: uint32, float, float, uint8
RECORD_SIZE = struct.calcsize(RECORD_FORMAT)
def pack_records(records):
buf = bytearray(len(records) * RECORD_SIZE)
for i, r in enumerate(records):
struct.pack_into(RECORD_FORMAT, buf, i * RECORD_SIZE,
r["id"], r["x"], r["y"], r["flags"])
return bytes(buf)
def unpack_records(buf):
n = len(buf) // RECORD_SIZE
return [
{"id": v[0], "x": v[1], "y": v[2], "flags": v[3]}
for i in range(n)
for v in [struct.unpack_from(RECORD_FORMAT, buf, i * RECORD_SIZE)]
]
records = [{"id": i, "x": float(i), "y": float(i * 2), "flags": i % 256} for i in range(1000)]
packed = pack_records(records)
unpacked = unpack_records(packed)
dict_size = sum(sys.getsizeof(r) + sum(sys.getsizeof(v) for v in r.values()) for r in records[:10]) * 100
struct_size = len(packed)
print(f"Dict list approx: {dict_size / 1024:.1f} KB")
print(f"struct buffer: {struct_size / 1024:.1f} KB")
print(f"Ratio: {dict_size / struct_size:.1f}x")
print("struct packs 100 records into much less space than dicts")
Solution
import struct
import sys
RECORD_FORMAT = "!IffB"
RECORD_SIZE = struct.calcsize(RECORD_FORMAT)
def pack_records(records):
buf = bytearray(len(records) * RECORD_SIZE)
for i, r in enumerate(records):
struct.pack_into(RECORD_FORMAT, buf, i * RECORD_SIZE,
r["id"], r["x"], r["y"], r["flags"])
return bytes(buf)
def unpack_records(buf):
n = len(buf) // RECORD_SIZE
return [
struct.unpack_from(RECORD_FORMAT, buf, i * RECORD_SIZE)
for i in range(n)
]
records = [{"id": i, "x": float(i), "y": float(i * 2), "flags": i % 256} for i in range(1000)]
packed = pack_records(records)
unpacked = unpack_records(packed)
print(f"struct buffer: {len(packed)} bytes ({len(packed)/1024:.1f} KB)")
print(f"Records per byte: {len(records)/len(packed):.3f}")
print(f"Bytes per record: {len(packed)/len(records):.1f} (vs ~200+ for dict)")
print("struct packs 100 records into much less space than dicts")
struct module efficiency:
struct.pack_intoavoids allocating a new bytes object per call — writes directly into a pre-allocated buffer."!IffB"= big-endian!, uint32I(4 bytes), floatf(4 bytes), floatf(4 bytes), uint8B(1 byte) = 13 bytes per record.- A Python dict with 4 string keys + 4 values uses ~400-600 bytes. Struct: 13 bytes. ~40x more compact.
- Use case: network protocols, binary file formats, memory-mapped arrays, inter-process shared memory.
ctypes.Structureis similar but provides C-style struct access with attribute names.
Expected Output
struct packs 100 records into much less space than dictsHints
Hint 1: struct.pack(format, *values) packs values into a binary buffer. Format codes: B=uint8, H=uint16, I=uint32, f=float32.
Hint 2: struct.calcsize(format) tells you the byte size of a packed format.
Use sys.intern to deduplicate a large number of repeated strings and measure memory savings.
import sys
import tracemalloc
N = 50000
CATEGORIES = ["alpha", "beta", "gamma", "delta", "epsilon"]
# Without interning — many duplicate string objects
tracemalloc.start()
without_intern = [CATEGORIES[i % 5] for i in range(N)]
_, peak_no_intern = tracemalloc.get_traced_memory()
tracemalloc.stop()
# With interning — all equal strings share one object
tracemalloc.start()
with_intern = [sys.intern(CATEGORIES[i % 5]) for i in range(N)]
_, peak_intern = tracemalloc.get_traced_memory()
tracemalloc.stop()
# Verify identity sharing
a = sys.intern("hello_world_unique")
b = sys.intern("hello_world_unique")
print(f"Interned strings are same object: {a is b}")
print(f"Without intern peak: {peak_no_intern/1024:.1f} KB")
print(f"With intern peak: {peak_intern/1024:.1f} KB")
print("Interned strings share memory — deduplication verified")
Solution
import sys
import tracemalloc
N = 50000
CATEGORIES = ["alpha", "beta", "gamma", "delta", "epsilon"]
tracemalloc.start()
without_intern = [CATEGORIES[i % 5] for i in range(N)]
_, peak_no_intern = tracemalloc.get_traced_memory()
tracemalloc.stop()
tracemalloc.start()
with_intern = [sys.intern(CATEGORIES[i % 5]) for i in range(N)]
_, peak_intern = tracemalloc.get_traced_memory()
tracemalloc.stop()
a = sys.intern("hello_world_unique")
b = sys.intern("hello_world_unique")
print(f"Interned strings are same object: {a is b}")
print(f"Without intern peak: {peak_no_intern/1024:.1f} KB")
print(f"With intern peak: {peak_intern/1024:.1f} KB")
print("Interned strings share memory — deduplication verified")
String interning:
sys.intern(s)storessin a global table. Returns the same object for all equal interned strings.- Python automatically interns string literals and identifiers (short strings that look like identifiers).
- For large collections of repeated strings (log levels, category names, column names), explicit interning saves significant memory.
a is btests identity (same object in memory). Two equal non-interned strings havea == bbut nota is b.- Interned strings persist in the intern table for the program's lifetime — do not intern dynamic strings that grow without bound.
Expected Output
Interned strings share memory — deduplication verifiedHints
Hint 1: sys.intern(s) returns a canonical version of the string — all interned equal strings are the same object.
Hint 2: Use "is" to verify identity (same object), "==" to verify equality.
Implement lazy attribute loading — expensive data is only loaded when first accessed.
import tracemalloc
class LazyDataModel:
def __init__(self, model_id):
self.model_id = model_id
self._loaded = set()
def __getattr__(self, name):
if name.startswith("_") or name == "model_id":
raise AttributeError(name)
# Simulate expensive load
if name == "embeddings":
data = list(range(10000)) # 10k floats
elif name == "metadata":
data = {f"key_{i}": i for i in range(1000)}
else:
raise AttributeError(f"No attribute {name!r}")
object.__setattr__(self, name, data) # cache in instance dict
self._loaded.add(name)
return data
# Measure: creating many models without accessing their data
tracemalloc.start()
models = [LazyDataModel(i) for i in range(100)]
_, peak_init = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"Peak after creating 100 models: {peak_init/1024:.1f} KB")
# Access embeddings on one model
tracemalloc.start()
_ = models[0].embeddings
_, peak_after = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"Peak after loading one embedding: {peak_after/1024:.1f} KB")
print("Attributes loaded only when accessed, not on init")
Solution
import tracemalloc
class LazyDataModel:
def __init__(self, model_id):
self.model_id = model_id
self._loaded = set()
def __getattr__(self, name):
if name.startswith("_") or name == "model_id":
raise AttributeError(name)
if name == "embeddings":
data = list(range(10000))
elif name == "metadata":
data = {f"key_{i}": i for i in range(1000)}
else:
raise AttributeError(f"No attribute {name!r}")
object.__setattr__(self, name, data)
self._loaded.add(name)
return data
tracemalloc.start()
models = [LazyDataModel(i) for i in range(100)]
_, peak_init = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"Peak after creating 100 models: {peak_init/1024:.1f} KB")
tracemalloc.start()
_ = models[0].embeddings
_, peak_after = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"Peak after loading one embedding: {peak_after/1024:.1f} KB")
print("Attributes loaded only when accessed, not on init")
Lazy loading pattern:
__getattr__is only called when normal attribute lookup (instance dict, class dict, MRO) fails.object.__setattr__(self, name, data)stores the result in the instance dict directly — bypassing any custom__setattr__.- After the first access, the attribute lives in
instance.__dict__—__getattr__is not called again (normal lookup succeeds). - Pattern: create 100 model objects cheaply; only pay the memory cost of loading data for models you actually use.
- This is how ORM lazy loading works: accessing a relationship attribute triggers a database query on first access.
Expected Output
Attributes loaded only when accessed, not on initHints
Hint 1: __getattr__ is called only when normal attribute lookup fails — perfect for lazy initialization.
Hint 2: Store loaded data in the instance dict so __getattr__ is not called again for the same attribute.
Use mmap to read specific sections of a large file without loading the entire file into memory.
import mmap
import os
import tempfile
import tracemalloc
# Create a 1MB test file
with tempfile.NamedTemporaryFile(delete=False, suffix=".bin") as f:
tmpfile = f.name
f.write(b"0123456789" * 100000) # 1MB
# Read with mmap — only requested pages loaded
tracemalloc.start()
with open(tmpfile, "rb") as f:
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
# Read only 100 bytes from position 500000
chunk = mm[500000:500100]
size = len(mm)
_, peak_mmap = tracemalloc.get_traced_memory()
tracemalloc.stop()
# Read entire file into memory
tracemalloc.start()
with open(tmpfile, "rb") as f:
data = f.read()
_, peak_full = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"mmap peak: {peak_mmap/1024:.1f} KB")
print(f"full read peak: {peak_full/1024:.1f} KB")
print(f"chunk: {chunk[:20]}...")
print("mmap accesses file data without loading entire file into memory")
os.unlink(tmpfile)
Solution
import mmap
import os
import tempfile
import tracemalloc
with tempfile.NamedTemporaryFile(delete=False, suffix=".bin") as f:
tmpfile = f.name
f.write(b"0123456789" * 100000)
tracemalloc.start()
with open(tmpfile, "rb") as f:
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
chunk = mm[500000:500100]
size = len(mm)
_, peak_mmap = tracemalloc.get_traced_memory()
tracemalloc.stop()
tracemalloc.start()
with open(tmpfile, "rb") as f:
data = f.read()
_, peak_full = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"mmap peak: {peak_mmap/1024:.1f} KB")
print(f"full read peak: {peak_full/1024:.1f} KB")
print(f"chunk: {chunk[:20]}...")
print("mmap accesses file data without loading entire file into memory")
os.unlink(tmpfile)
mmap use cases:
- Large files where you only need specific sections (binary indices, database pages, log files).
- Random access without loading everything:
mm[offset:offset+size]loads only those OS pages. mmap.mmap(fd, 0)maps the entire file. The OS uses virtual memory — actual physical pages are loaded on demand.- Write access:
mmap.mmap(fd, 0, access=mmap.ACCESS_WRITE)— changes are reflected in the file. mmap.ACCESS_COPYcreates a private copy-on-write mapping — writes do not affect the file.
Expected Output
mmap accesses file data without loading entire file into memoryHints
Hint 1: mmap.mmap(fd, 0) maps the entire file. Access it like a bytes object — the OS loads pages on demand.
Hint 2: mmap only loads the pages you actually read — perfect for random access into large files.
Hard
Implement a copy-on-write wrapper that shares data between instances until mutation.
import copy
class CopyOnWrite:
def __init__(self, data):
self._data = data
self._owned = False # False = shared, True = own copy
def _ensure_owned(self):
if not self._owned:
self._data = copy.deepcopy(self._data)
self._owned = True
def get(self, key):
return self._data[key]
def set(self, key, value):
self._ensure_owned()
self._data[key] = value
def __repr__(self):
return f"CoW(owned={self._owned}, data={self._data})"
original_data = {"a": 1, "b": 2, "c": 3}
cow1 = CopyOnWrite(original_data)
cow2 = CopyOnWrite(original_data)
# Both share the same dict initially
print(f"Shared: {cow1._data is cow2._data}")
# Read doesn't trigger copy
_ = cow1.get("a")
print(f"After read: cow1 owned={cow1._owned}, cow2 owned={cow2._owned}")
# Write triggers copy on cow1 only
cow1.set("a", 99)
print(f"After write: cow1 owned={cow1._owned}, cow2 owned={cow2._owned}")
print(f"cow1.a={cow1.get('a')}, cow2.a={cow2.get('a')} (unchanged)")
print("Shared data until mutation; copy triggered on write")
Solution
import copy
class CopyOnWrite:
def __init__(self, data):
self._data = data
self._owned = False
def _ensure_owned(self):
if not self._owned:
self._data = copy.deepcopy(self._data)
self._owned = True
def get(self, key):
return self._data[key]
def set(self, key, value):
self._ensure_owned()
self._data[key] = value
original_data = {"a": 1, "b": 2, "c": 3}
cow1 = CopyOnWrite(original_data)
cow2 = CopyOnWrite(original_data)
print(f"Shared: {cow1._data is cow2._data}")
_ = cow1.get("a")
print(f"After read: cow1 owned={cow1._owned}, cow2 owned={cow2._owned}")
cow1.set("a", 99)
print(f"After write: cow1 owned={cow1._owned}, cow2 owned={cow2._owned}")
print(f"cow1.a={cow1.get('a')}, cow2.a={cow2.get('a')}")
print("Shared data until mutation; copy triggered on write")
Copy-on-write pattern:
- Reads are zero-copy — all instances share the original object.
- Only the first write triggers a deep copy of the data for that instance.
- Use case: function arguments that are "pass by value" semantics with "share until modified" performance.
- Python uses COW in the OS for
fork(): child processes share parent memory pages until they write. - Production:
pandasDataFrames use COW mode since 2.0 to reduce unexpected mutations and memory usage.
Expected Output
Shared data until mutation; copy triggered on writeHints
Hint 1: Store a reference to the original data. On first write, make a copy and work on it.
Hint 2: Use a flag to track whether a copy has been made.
Implement an object pool that reuses expensive-to-create objects instead of allocating new ones each time.
import tracemalloc
class ExpensiveBuffer:
def __init__(self, size=1024):
self.data = bytearray(size)
self.in_use = False
def reset(self):
self.data[:] = b"\x00" * len(self.data)
self.in_use = False
class BufferPool:
def __init__(self, pool_size=10, buffer_size=1024):
self._pool = [ExpensiveBuffer(buffer_size) for _ in range(pool_size)]
self._stats = {"allocations": 0, "reuses": 0}
def acquire(self):
for buf in self._pool:
if not buf.in_use:
buf.in_use = True
self._stats["reuses"] += 1
return buf
# Pool exhausted — allocate new (not ideal)
self._stats["allocations"] += 1
buf = ExpensiveBuffer()
buf.in_use = True
return buf
def release(self, buf):
buf.reset()
# With pooling
pool = BufferPool(pool_size=5)
tracemalloc.start()
for _ in range(100):
buf = pool.acquire()
buf.data[0] = 42
pool.release(buf)
_, peak_pool = tracemalloc.get_traced_memory()
tracemalloc.stop()
# Without pooling
tracemalloc.start()
for _ in range(100):
buf = ExpensiveBuffer()
buf.data[0] = 42
_, peak_no_pool = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"Pool reuses: {pool._stats['reuses']} / extra allocs: {pool._stats['allocations']}")
print(f"Pool peak: {peak_pool/1024:.1f} KB")
print(f"No-pool peak: {peak_no_pool/1024:.1f} KB")
print("Pool reuses objects — fewer allocations than create-and-discard")
Solution
import tracemalloc
class ExpensiveBuffer:
def __init__(self, size=1024):
self.data = bytearray(size)
self.in_use = False
def reset(self):
for i in range(len(self.data)):
self.data[i] = 0
self.in_use = False
class BufferPool:
def __init__(self, pool_size=10, buffer_size=1024):
self._pool = [ExpensiveBuffer(buffer_size) for _ in range(pool_size)]
self._stats = {"allocations": 0, "reuses": 0}
def acquire(self):
for buf in self._pool:
if not buf.in_use:
buf.in_use = True
self._stats["reuses"] += 1
return buf
self._stats["allocations"] += 1
buf = ExpensiveBuffer()
buf.in_use = True
return buf
def release(self, buf):
buf.reset()
pool = BufferPool(pool_size=5)
tracemalloc.start()
for _ in range(100):
buf = pool.acquire()
buf.data[0] = 42
pool.release(buf)
_, peak_pool = tracemalloc.get_traced_memory()
tracemalloc.stop()
tracemalloc.start()
for _ in range(100):
buf = ExpensiveBuffer()
buf.data[0] = 42
_, peak_no_pool = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"Pool reuses: {pool._stats['reuses']}")
print(f"Pool peak: {peak_pool/1024:.1f} KB")
print(f"No-pool peak: {peak_no_pool/1024:.1f} KB")
print("Pool reuses objects — fewer allocations than create-and-discard")
Object pool benefits:
- Eliminates repeated allocation and deallocation of expensive objects (network buffers, DB connections, worker threads).
- Reduces GC pressure: fewer short-lived objects means fewer GC cycles.
- Predictable memory: pool size is fixed; no unexpected growth.
reset()clears state so released objects are safe to reuse — critical for security (clear sensitive data).- Python's
concurrent.futures.ThreadPoolExecutorandasyncio's connection pools use this pattern.
Expected Output
Pool reuses objects — fewer allocations than create-and-discardHints
Hint 1: A memory pool pre-allocates objects and hands them out. Returned objects are reset and reused.
Hint 2: Track allocations with tracemalloc to show reduced allocation count with pooling.
Compare memory usage of dict, regular class, dataclass, and namedtuple for the same record data.
import sys
import tracemalloc
from collections import namedtuple
from dataclasses import dataclass
RecordNamedTuple = namedtuple("Record", ["id", "x", "y", "label"])
@dataclass
class RecordDataclass:
id: int
x: float
y: float
label: str
class RecordClass:
def __init__(self, id, x, y, label):
self.id = id
self.x = x
self.y = y
self.label = label
@dataclass
class RecordSlotted:
__slots__ = ("id", "x", "y", "label")
id: int
x: float
y: float
label: str
n = 50000
label = "category_A"
def measure_peak(factory):
tracemalloc.start()
records = [factory(i, float(i), float(i * 2), label) for i in range(n)]
_, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
return peak
results = {
"dict": measure_peak(lambda i, x, y, l: {"id": i, "x": x, "y": y, "label": l}),
"namedtuple": measure_peak(lambda i, x, y, l: RecordNamedTuple(i, x, y, l)),
"dataclass": measure_peak(lambda i, x, y, l: RecordDataclass(i, x, y, l)),
"class": measure_peak(lambda i, x, y, l: RecordClass(i, x, y, l)),
}
for name, peak in sorted(results.items(), key=lambda x: x[1]):
print(f"{name:12s}: {peak/1e6:.1f} MB peak")
print("namedtuple is more memory-efficient than dict or regular class")
Solution
import tracemalloc
from collections import namedtuple
from dataclasses import dataclass
RecordNamedTuple = namedtuple("Record", ["id", "x", "y", "label"])
@dataclass
class RecordDataclass:
id: int
x: float
y: float
label: str
class RecordClass:
def __init__(self, id, x, y, label):
self.id = id
self.x = x
self.y = y
self.label = label
n = 50000
label = "category_A"
def measure_peak(factory):
tracemalloc.start()
records = [factory(i, float(i), float(i * 2), label) for i in range(n)]
_, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
return peak
results = {
"dict": measure_peak(lambda i, x, y, l: {"id": i, "x": x, "y": y, "label": l}),
"namedtuple": measure_peak(lambda i, x, y, l: RecordNamedTuple(i, x, y, l)),
"dataclass": measure_peak(lambda i, x, y, l: RecordDataclass(i, x, y, l)),
"class": measure_peak(lambda i, x, y, l: RecordClass(i, x, y, l)),
}
for name, peak in sorted(results.items(), key=lambda x: x[1]):
print(f"{name:12s}: {peak/1e6:.1f} MB peak")
print("namedtuple is more memory-efficient than dict or regular class")
Memory ranking (typical, ascending):
namedtuple: tuple subclass — same overhead as tuple (~56 bytes + N * 8-byte pointers).- Regular class with
__slots__: ~56 bytes, no__dict__. - Regular class: ~232 bytes with
__dict__. dataclass: same as regular class by default (has__dict__). Add__slots__=True(Python 3.10+) for slot efficiency.dict: ~200 bytes base + ~50 bytes per key-value pair.
Namedtuple is the most memory-efficient option when you don't need mutability.
Expected Output
namedtuple is more memory-efficient than dict or regular classHints
Hint 1: namedtuple is a tuple subclass — has tuple memory overhead (very low) plus slot-like access.
Hint 2: Compare sys.getsizeof() for dict, regular class, and namedtuple with the same fields.
Process a large simulated dataset in fixed-size chunks to bound peak memory usage.
import tracemalloc
def data_source(total_records):
"""Simulates a large data source — yields records lazily."""
for i in range(total_records):
yield {"id": i, "value": float(i), "tags": [i % 5, i % 7]}
def chunked(iterable, size):
"""Yield successive chunks of `size` items."""
chunk = []
for item in iterable:
chunk.append(item)
if len(chunk) == size:
yield chunk
chunk = []
if chunk:
yield chunk
def process_chunk(chunk):
return sum(r["value"] for r in chunk)
def process_all_chunked(total, chunk_size):
total_value = 0
for chunk in chunked(data_source(total), chunk_size):
total_value += process_chunk(chunk)
return total_value
def process_all_at_once(total):
records = list(data_source(total))
return sum(r["value"] for r in records)
total = 200_000
chunk_size = 1_000
tracemalloc.start()
r1 = process_all_chunked(total, chunk_size)
_, peak_chunked = tracemalloc.get_traced_memory()
tracemalloc.stop()
tracemalloc.start()
r2 = process_all_at_once(total)
_, peak_at_once = tracemalloc.get_traced_memory()
tracemalloc.stop()
assert abs(r1 - r2) < 0.001, "Results must match"
print(f"Chunked peak: {peak_chunked/1e6:.1f} MB")
print(f"All-at-once: {peak_at_once/1e6:.1f} MB")
print(f"Ratio: {peak_at_once/peak_chunked:.1f}x")
print("Chunked processing uses bounded memory regardless of dataset size")
Solution
import tracemalloc
def data_source(total_records):
for i in range(total_records):
yield {"id": i, "value": float(i), "tags": [i % 5, i % 7]}
def chunked(iterable, size):
chunk = []
for item in iterable:
chunk.append(item)
if len(chunk) == size:
yield chunk
chunk = []
if chunk:
yield chunk
def process_chunk(chunk):
return sum(r["value"] for r in chunk)
def process_all_chunked(total, chunk_size):
return sum(process_chunk(chunk) for chunk in chunked(data_source(total), chunk_size))
def process_all_at_once(total):
return sum(r["value"] for r in list(data_source(total)))
total = 200_000
chunk_size = 1_000
tracemalloc.start()
r1 = process_all_chunked(total, chunk_size)
_, peak_chunked = tracemalloc.get_traced_memory()
tracemalloc.stop()
tracemalloc.start()
r2 = process_all_at_once(total)
_, peak_at_once = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"Chunked peak: {peak_chunked/1e6:.1f} MB")
print(f"All-at-once: {peak_at_once/1e6:.1f} MB")
print(f"Ratio: {peak_at_once/peak_chunked:.1f}x")
print("Chunked processing uses bounded memory regardless of dataset size")
Chunked processing:
- Peak memory = size of one chunk (1,000 records) regardless of total dataset size.
- All-at-once: peak memory = entire dataset (200,000 records).
- Chunk size tuning: larger chunks → better CPU cache utilization; smaller chunks → lower peak memory.
- Production: tune chunk size to fit comfortably in L3 cache (~4-16MB) for maximum throughput.
- This pattern is used by pandas
read_csv(chunksize=N), SQLAlchemyyield_per(N), and database cursor-based iteration.
Expected Output
Chunked processing uses bounded memory regardless of dataset sizeHints
Hint 1: Process the dataset in fixed-size chunks. Only one chunk lives in memory at a time.
Hint 2: Use a generator to yield chunks lazily — no pre-allocation of the entire dataset.
