Zero-Copy and Data Transfer
The Production Scenario
A large-scale image classification pipeline processes 50,000 training images per second on a cluster of eight A100 GPUs. The data loading team has tuned the DataLoader with 16 workers and a prefetch buffer. GPU utilization sits at 62%. The profiler shows 38% of training time is idle - GPUs waiting. The CPUs are not the bottleneck. The network is not saturated. The culprit is something that does not show up obviously in most profilers: the data is being copied four times between disk and GPU memory, when it could be copied once.
First, the raw JPEG bytes are read from disk into a kernel buffer. Then they are copied from the kernel buffer into a Python bytes object in userspace. Then the decoded NumPy array is copied into a PyTorch tensor. Then the tensor is copied from regular pageable host memory into GPU memory via the PCIe bus - a slow, CPU-mediated transfer. Four copies. Eliminate two of them and GPU utilization jumps to 87%. That is a 40% throughput improvement on the same hardware, no new equipment required.
This is not an exotic optimization. Every production ML system at scale hits this wall. The data engineers who understand the memory hierarchy - kernel buffer, userspace buffer, pinned memory, device memory - are the ones who extract the last 20-30% of performance from existing hardware. The others restart the training job and wonder why it is slow.
The story of zero-copy techniques is also a story about the abstraction cost of modern software. Operating systems add copy boundaries to enforce security and isolation. Every time you cross from kernel space to user space, you pay a copy tax. Every time you cross from CPU memory to GPU memory without pinning, you pay a page-fault tax. Zero-copy is the discipline of identifying those boundaries and eliminating the unnecessary ones.
In this lesson you will learn every major zero-copy technique: from the Linux sendfile() syscall that makes web servers fast, to the Python memoryview protocol that lets you slice bytes without allocation, to PyTorch's pinned memory that doubles your GPU data transfer rate, to Apache Arrow Flight which lets model training pipelines consume data from remote servers without deserializing it.
Why This Exists
The fundamental problem is that CPUs, GPUs, storage, and networks all have separate address spaces. Moving data between them requires copying - unless you build specific hardware and software paths to avoid it.
In 1994, a web server sending a file would: (1) read the file into kernel page cache, (2) copy from kernel cache to userspace buffer via read(), (3) copy from userspace buffer back into the kernel socket buffer via write(). Two kernel crossings, two copies of data that had not changed at all. Linus Torvalds added sendfile() in Linux 2.2 in 1998, which allows the kernel to move data directly from page cache to socket buffer - eliminating both userspace copies and the two syscalls that went with them.
The same pattern appears in GPU computing. PyTorch by default allocates CPU tensors in regular pageable memory. When you call .cuda(), the CUDA runtime must first pin the pages (prevent the OS from swapping them), then initiate a DMA transfer to the GPU. Allocating in pinned memory from the start eliminates the page-locking step and allows asynchronous transfers - your CPU can prepare the next batch while the GPU trains on the current one.
Understanding zero-copy means understanding where the actual copies happen, which requires knowing the hardware memory hierarchy.
Historical Context
The concept of zero-copy I/O in operating systems dates to the early 1990s. The first practical implementations were driven by the web serving workload: HTTP servers like Apache needed to serve large static files, and the traditional read/write path was measurably inefficient.
The Linux sendfile() system call (1998) and FreeBSD's sendfile() (1995, earlier) were the first widely-adopted zero-copy primitives. Nginx, which serves roughly 34% of the internet today, relies heavily on sendfile(). Kafka, the distributed messaging system at the core of virtually every large-scale ML pipeline, uses sendfile() for log segment transfers - one reason a single Kafka broker can sustain 800 MB/s throughput on commodity hardware.
The mmap() interface predates sendfile() - it appeared in SunOS 4.0 (1988) and was standardized in POSIX.1-2001. Memory-mapped files allow processes to treat file contents as directly addressable memory, with the kernel handling the I/O lazily via page faults. Databases like PostgreSQL and LMDB use mmap extensively.
Python's buffer protocol was formalized in PEP 3118 (2006). The memoryview type gives Python code access to the buffer protocol, allowing zero-copy slicing of bytes, bytearray, array.array, and NumPy arrays. NumPy's view semantics (ndarray.base tracking) were designed from the beginning to allow slicing without allocation.
CUDA pinned (page-locked) memory has been a feature of CUDA since version 1.0 (2007). The performance difference between pageable and pinned transfers was documented in NVIDIA's early CUDA Best Practices Guide and remains one of the first optimizations any serious CUDA programmer learns.
Apache Arrow was designed by Wes McKinney (creator of pandas) and Jacques Nadeau (Dremio), first released in 2016. Its core insight was that a common in-memory columnar format would allow data to be shared between processes and across networks without serialization. Arrow Flight (2019) extends this to a gRPC-based RPC framework specifically designed for high-throughput columnar data transfer.
Core Concepts
The Traditional I/O Path and Its Copies
To understand what zero-copy eliminates, you must first understand what the traditional path does.
When a user-space program calls read(fd, buf, n):
- The kernel issues a DMA transfer from disk to the kernel page cache
- The kernel copies from page cache to the user-space buffer
buf - Control returns to user space
When the program then calls write(sockfd, buf, n):
- The kernel copies from user-space
bufto the kernel socket buffer - The kernel issues a DMA transfer from socket buffer to the network card
This path has 2 CPU copies (kernel-to-user, user-to-kernel) and 2 DMA transfers. The CPU copies are the problem: they consume CPU cycles, pollute caches, and require data to pass through user space even when user space does nothing with it.
A["Disk / Storage<br/>DMA read"]:::teal
B["Kernel Page Cache<br/>in-kernel buffer"]:::blue
C["User Space Buffer<br/>buf in RAM"]:::orange
D["Kernel Socket Buffer<br/>send queue"]:::blue
E["Network Card<br/>DMA write"]:::teal
A -->|"DMA transfer"| B
B -->|"CPU copy - read() syscall"| C
C -->|"CPU copy - write() syscall"| D
D -->|"DMA transfer"| E
classDef blue fill:#dbeafe,color:#1e293b,stroke:#2563eb
classDef teal fill:#ccfbf1,color:#134e4a,stroke:#14b8a6
classDef orange fill:#ffedd5,color:#7c2d12,stroke:#ea580c
classDef green fill:#dcfce7,color:#14532d,stroke:#16a34a
classDef purple fill:#ede9fe,color:#4c1d95,stroke:#7c3aed
classDef red fill:#fee2e2,color:#7f1d1d,stroke:#dc2626
The sendfile() syscall collapses the middle two steps. Data moves from page cache directly to the socket buffer, or - on hardware that supports scatter-gather DMA - the socket descriptor is updated to reference the page cache entries directly, and no byte-level copy happens at all.
sendfile(), splice(), and tee()
sendfile(out_fd, in_fd, offset, count) is a Linux syscall that moves data from in_fd (a file) to out_fd (a socket) entirely within the kernel. On hardware that supports scatter-gather DMA, the kernel does not even copy within kernel memory - it just updates the socket descriptor to point to the existing page cache entries. This is the "true zero-copy" path.
splice(fd_in, off_in, fd_out, off_out, len, flags) generalizes sendfile() - it can move data between arbitrary file descriptors (including pipes). splice() uses a kernel pipe buffer as the connection point and moves data without copying using the "page-stealing" technique. Page references are transferred between kernel data structures; the actual bytes never move.
tee(fd_in, fd_out, len, flags) duplicates data between two pipes without consuming it from the input. This is useful for logging pipelines that need to observe data in-flight without interrupting the main data flow.
For ML data pipelines, these syscalls matter most in the data serving layer. A data server sending training batches to GPU workers over a socket can use sendfile() to serve pre-batched binary data at disk-bandwidth speeds. Kafka achieves its extraordinary throughput this way - the broker reads log segments and forwards them to consumers without the data ever reaching broker user space.
mmap() for Zero-Copy File Reading
mmap(addr, length, prot, flags, fd, offset) maps a file directly into the process's virtual address space. Pages are not loaded immediately - they are loaded on demand via page faults. When you read from a memory-mapped region, you are reading directly from the kernel page cache; there is no additional copy into a user-space buffer.
For ML use cases, mmap is excellent for:
- Reading large model checkpoints (map the file, access only the weights you need)
- Memory-mapped datasets (LMDB, Arrow memory-mapped IPC files)
- Sharing data between multiple processes reading the same file (they share physical pages)
The safetensors format (used by Hugging Face) is designed around mmap. Tensors are stored at predictable, aligned offsets in the file. When you load a model with safetensors, the library mmaps the file and constructs tensor objects pointing directly into the mmap region. Two Python processes loading the same model file share the same physical RAM pages via the page cache - no duplication.
import mmap
import struct
import numpy as np
# Reading a large binary file with mmap - zero copy into user buffer
def read_tensor_mmap(filepath: str, offset: int, shape: tuple, dtype=np.float32):
"""Read a tensor from a binary file using mmap - no intermediate copy."""
itemsize = np.dtype(dtype).itemsize
n_bytes = int(np.prod(shape)) * itemsize
with open(filepath, 'rb') as f:
# Map the entire file into virtual address space
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
# Create a numpy array backed by the mmap region - no copy
# The array's memory IS the mmap region
arr = np.frombuffer(mm, dtype=dtype, count=int(np.prod(shape)), offset=offset)
arr = arr.reshape(shape)
# Note: arr is valid only while mm is open
# To get a standalone array, call arr.copy()
return arr, mm # caller must keep mm alive
# Memory-mapped reading of multiple tensors from a checkpoint
def load_checkpoint_mmap(filepath: str, tensor_manifest: dict):
"""
Load model checkpoint using mmap.
tensor_manifest: {name: (offset, shape, dtype)}
"""
import os
file_size = os.path.getsize(filepath)
with open(filepath, 'rb') as f:
mm = mmap.mmap(f.fileno(), file_size, access=mmap.ACCESS_READ)
tensors = {}
for name, (offset, shape, dtype) in tensor_manifest.items():
arr = np.frombuffer(mm, dtype=dtype,
count=int(np.prod(shape)),
offset=offset).reshape(shape)
# This is a view into the mmap - no allocation, no copy
tensors[name] = arr
# Tensors reference mm - keep mm alive
return tensors, mm
# Demonstrate: two processes sharing same physical pages
# Process A and Process B both mmap the same file
# The OS gives them different virtual addresses pointing to
# the same physical pages in the page cache
# Memory usage: 1x the file size, not 2x
Python memoryview and the Buffer Protocol
Python's buffer protocol is a C-level interface that objects can implement to expose their underlying memory as a raw byte buffer. Objects that support the buffer protocol include: bytes, bytearray, array.array, memoryview itself, and NumPy ndarrays.
memoryview(obj) creates a view object that exposes the buffer. The key property: slicing a memoryview does not copy data. It returns a new memoryview that references the same underlying memory at a different offset.
import struct
import numpy as np
# Large buffer - simulating a received network packet or file chunk
data = bytearray(b'\x00' * 1_000_000)
# Traditional approach: every slice copies
chunk = data[1000:2000] # allocates 1000 bytes, copies data
print(f"type(chunk): {type(chunk)}") # <class 'bytearray'>
# Buffer protocol: no copy
mv = memoryview(data)
chunk_view = mv[1000:2000] # no allocation, no copy
print(f"type(chunk_view): {type(chunk_view)}") # <class 'memoryview'>
print(f"chunk_view.nbytes: {chunk_view.nbytes}") # 1000
# The memory backing chunk_view IS the memory in `data`
# You can pass memoryview to any function that accepts buffers:
# sock.send(chunk_view) - sends those 1000 bytes directly
# f.write(chunk_view) - writes without intermediate copy
# hashlib.md5(chunk_view) - hashes without copying
# Reading struct fields from a large binary buffer without copying
mv = memoryview(data)
# Unpack 4-byte header fields from offset 0
packed = struct.pack('>IHHI', 0xDEADBEEF, 42, 1000, 0xFF)
data[0:16] = packed + bytes(16 - len(packed))
magic, version, length, flags = struct.unpack_from('>IHHI', mv[0:16])
print(f"magic: {magic:#010x}, version: {version}, length: {length}")
# NumPy can create arrays from memoryview without copying
arr = np.frombuffer(mv[100:1100], dtype=np.float32)
# arr.base is the memoryview; no data was copied
print(f"arr.base is mv sliced: the array shares memory with data buffer")
print(f"arr.shape: {arr.shape}") # (250,)
# Practical: parsing a binary ML feature file without copies
def parse_feature_batch_zero_copy(raw_bytes: bytes, n_features: int, n_samples: int):
"""
Parse a batch of float32 features from raw bytes.
Returns a numpy array backed by the original bytes - no allocation.
"""
mv = memoryview(raw_bytes)
# Skip 8-byte header
data_view = mv[8:]
arr = np.frombuffer(data_view, dtype=np.float32,
count=n_samples * n_features)
return arr.reshape(n_samples, n_features)
# The returned array shares memory with raw_bytes
# If raw_bytes is a bytes object from recv(), this is truly zero copy
# Memory efficiency comparison
import sys
n = 1_000_000
buf = bytearray(n)
# Traditional: slice copies n bytes
traditional_slice = buf[0:n]
print(f"Traditional copy size: {sys.getsizeof(traditional_slice)} bytes")
# Zero-copy: memoryview slice
mv = memoryview(buf)
zero_copy_view = mv[0:n]
print(f"Memoryview view size: {sys.getsizeof(zero_copy_view)} bytes (just metadata)")
NumPy Views vs Copies
NumPy's most powerful zero-copy feature is the view: an ndarray that shares memory with another array. Every basic slice in NumPy returns a view, not a copy.
The ndarray.base attribute tells you if an array owns its data or is a view. np.shares_memory(a, b) tests whether two arrays share any memory at all.
import numpy as np
# Allocate a large array - one allocation, one block of memory
original = np.zeros((1000, 1000), dtype=np.float32)
# original.base is None - it owns its data
# These operations return views (shared storage, no copy):
row = original[42] # shape (1000,)
submatrix = original[10:20, 50:100] # shape (10, 50)
transposed = original.T # shape (1000, 1000), non-contiguous
col = original[:, 5] # shape (1000,)
every_other = original[::2] # shape (500, 1000)
print(f"row.base is original: {row.base is original}") # True
print(f"submatrix.base is original: {submatrix.base is original}") # True
print(f"np.shares_memory(row, original): {np.shares_memory(row, original)}") # True
# The stride system explains how views work
# For C-contiguous float32 array of shape (1000, 1000):
# stride[0] = 1000 * 4 = 4000 bytes (one row)
# stride[1] = 4 bytes (one element)
print(f"original.strides: {original.strides}") # (4000, 4)
# Transposed: strides swap
print(f"transposed.strides: {transposed.strides}") # (4, 4000)
# Sliced: strides multiply
print(f"every_other.strides: {every_other.strides}") # (8000, 4)
# Storage offset tells you where in the buffer the view starts
print(f"submatrix.__array_interface__['data'][0] - original.__array_interface__['data'][0]:")
offset = submatrix.__array_interface__['data'][0] - original.__array_interface__['data'][0]
print(f" {offset} bytes") # 10*4000 + 50*4 = 40200
# Operations that FORCE a copy:
copied = original.copy() # explicit copy
flattened = original.flatten() # always copies
fancy_indexed = original[[0, 5, 99]] # fancy indexing always copies
ascontiguous = np.ascontiguousarray(original.T) # T is non-contiguous; copy to make contiguous
boolean_mask = original[original > 0.5] # boolean indexing always copies
print(f"copied.base is None: {copied.base is None}") # True
print(f"flattened.base is None: {flattened.base is None}") # True
print(f"fancy_indexed.base is None: {fancy_indexed.base is None}") # True
# ravel() vs flatten() - important distinction
raveled = original.ravel()
print(f"ravel returns view for contiguous: {raveled.base is original}") # True
print(f"flatten always copies: {original.flatten().base is None}") # True
# Reshape: view if contiguous, copy if not
reshaped = original.reshape(1000000)
print(f"reshape of contiguous is view: {reshaped.base is original}") # True
original_T = original.T # Fortran-contiguous
reshaped_T = original_T.reshape(1000000) # Forces a copy
print(f"reshape of non-contiguous: {reshaped_T.base is original_T}") # False
# Utility function for production code
def is_view(original: np.ndarray, result: np.ndarray) -> bool:
return np.shares_memory(original, result)
def check_operation(original, operation_fn, name):
result = operation_fn(original)
status = "VIEW (zero-copy)" if is_view(original, result) else f"COPY ({result.nbytes / 1e6:.1f} MB)"
print(f" {name}: {status}")
arr = np.zeros((1000, 1000), dtype=np.float32) # 4 MB
print("Operation analysis:")
check_operation(arr, lambda a: a[0:100], "basic slice") # VIEW
check_operation(arr, lambda a: a[[0, 1, 2]], "fancy index") # COPY
check_operation(arr, lambda a: a.reshape(-1), "reshape (contiguous)")# VIEW
check_operation(arr, lambda a: a.T.reshape(-1), "reshape (non-contiguous)") # COPY
check_operation(arr, lambda a: a.ravel(), "ravel") # VIEW
check_operation(arr, lambda a: a.flatten(), "flatten") # COPY
check_operation(arr, lambda a: a.T, "transpose") # VIEW
The performance implication: in a data pipeline processing 1000 batches of 10,000 float32 features at 128 dimensions, returning views instead of copies eliminates 512 MB of allocations per batch. At 50 batches per second, that is 25 GB/s of allocation pressure eliminated - memory the GC never has to chase.
PyTorch Tensor Storage and Sharing
PyTorch tensors separate tensor metadata (shape, stride, dtype, device) from tensor storage (the raw data buffer). Multiple tensors can share the same storage, just like NumPy views.
import torch
# Tensor views in PyTorch
base = torch.randn(1000, 1000)
# These operations return views (shared storage):
row = base[42] # view
t = base.t() # transpose - view (non-contiguous)
narrow = base.narrow(0, 10, 100) # view
squeezed = base.unsqueeze(0) # view - adds dimension
expanded = base[None, :] # view
# Checking storage sharing
print(f"base storage ptr: {base.storage().data_ptr()}")
print(f"row storage ptr: {row.storage().data_ptr()}")
print(f"Same storage: {base.storage().data_ptr() == row.storage().data_ptr()}") # True
print(f"row.storage_offset(): {row.storage_offset()}") # 42000 (42 * 1000 elements)
# is_contiguous() determines if reshape creates a view or copy
print(f"base.is_contiguous(): {base.is_contiguous()}") # True
print(f"t.is_contiguous(): {t.is_contiguous()}") # False
# t.view(500, 2000) would RAISE RuntimeError - must use .reshape()
# .reshape() returns view if possible, copy otherwise
reshaped_t = t.reshape(500, 2000) # copies (t is non-contiguous)
print(f"reshaped_t.data_ptr() == base.data_ptr(): {reshaped_t.data_ptr() == base.data_ptr()}") # False
reshaped_base = base.reshape(500, 2000) # view (base is contiguous)
print(f"reshaped_base.data_ptr() == base.data_ptr(): {reshaped_base.data_ptr() == base.data_ptr()}") # True
# torch.from_numpy: zero-copy bridge between NumPy and PyTorch
import numpy as np
np_arr = np.random.randn(1000, 128).astype(np.float32)
torch_tensor = torch.from_numpy(np_arr) # shares memory, zero copy
print(f"Shares memory: {np_arr.ctypes.data == torch_tensor.data_ptr()}") # True
np_arr[0, 0] = 999.0
print(f"Change propagates: {torch_tensor[0, 0].item():.1f}") # 999.0
# torch.tensor() - ALWAYS copies
torch_copy = torch.tensor(np_arr) # always copies
print(f"tensor() shares memory: {np_arr.ctypes.data == torch_copy.data_ptr()}") # False
CUDA Pinned (Page-Locked) Memory
When PyTorch calls .cuda() on a CPU tensor in regular (pageable) memory, the CUDA runtime must: (1) pin the pages temporarily, (2) initiate the DMA transfer, (3) unpin the pages. This sequence adds latency and prevents the transfer from being made asynchronous.
Pinned (page-locked) memory bypasses this. When you allocate pinned memory, the OS promises never to swap those pages. The CUDA DMA engine can transfer directly and asynchronously, while the CPU prepares the next batch.
The formula for PCIe bandwidth:
For pageable memory transfer: typical 6-8 GB/s on PCIe 4.0 x16 (well below the 32 GB/s theoretical maximum, due to the pin/unpin overhead). For pinned memory transfer: 12-16 GB/s, and it can overlap with GPU computation.
import torch
import time
import numpy as np
def benchmark_transfer(use_pinned: bool, size_mb: int = 256, n_trials: int = 25):
"""Benchmark host-to-device transfer speed for pinned vs pageable memory."""
n_elements = (size_mb * 1024 * 1024) // 4 # float32, 4 bytes each
times = []
for _ in range(n_trials):
if use_pinned:
# pin_memory=True allocates in page-locked memory
cpu_tensor = torch.randn(n_elements).pin_memory()
else:
cpu_tensor = torch.randn(n_elements)
torch.cuda.synchronize()
start = time.perf_counter()
# non_blocking=True only works if tensor is pinned
gpu_tensor = cpu_tensor.cuda(non_blocking=use_pinned)
torch.cuda.synchronize() # wait for async transfer to complete
end = time.perf_counter()
times.append(end - start)
del gpu_tensor
avg_s = np.mean(times[5:]) # skip warmup iterations
bandwidth_gbps = (size_mb / 1024) / avg_s
return avg_s * 1000, bandwidth_gbps
if torch.cuda.is_available():
pageable_ms, pageable_bw = benchmark_transfer(use_pinned=False)
pinned_ms, pinned_bw = benchmark_transfer(use_pinned=True)
print(f"Pageable: {pageable_ms:.2f} ms ({pageable_bw:.1f} GB/s)")
print(f"Pinned: {pinned_ms:.2f} ms ({pinned_bw:.1f} GB/s)")
print(f"Speedup: {pageable_ms / pinned_ms:.2f}x")
# Checking if a tensor is pinned
t = torch.randn(1000).pin_memory()
print(f"t.is_pinned(): {t.is_pinned()}") # True
# The recommended DataLoader configuration for GPU training
from torch.utils.data import DataLoader, TensorDataset
X = torch.randn(10000, 512)
y = torch.zeros(10000, dtype=torch.long)
dataset = TensorDataset(X, y)
loader = DataLoader(
dataset,
batch_size=256,
pin_memory=True, # All batches returned in pinned memory
num_workers=4, # Parallel data loading
persistent_workers=True, # Workers survive between epochs
prefetch_factor=2, # Each worker prefetches 2 batches
)
# In the training loop:
for inputs, labels in loader:
# non_blocking=True: transfer starts, CPU moves on immediately
# GPU receives data while CPU preps next batch
inputs = inputs.cuda(non_blocking=True)
labels = labels.cuda(non_blocking=True)
# ... training step (overlap with async H2D transfer)
# At this point, both inputs and labels are safely on GPU
# (PyTorch stream ordering ensures the transfer is complete before use)
Pinned memory is not free. The OS cannot use pinned pages for other purposes, including satisfying other processes' memory needs. If you pin too much, you create system-wide memory pressure. A safe rule: pin at most 50% of system RAM, and only in long-running training processes, not in per-request serving code.
DMA Transfers and GPUDirect
DMA (Direct Memory Access) allows hardware components to read and write system memory without CPU involvement. The CPU sets up the transfer by programming the DMA controller with source address, destination address, and transfer length, then the transfer proceeds while the CPU continues executing instructions.
GPUDirect P2P (peer-to-peer) extends this concept between GPUs. On systems where two GPUs share a PCIe switch (or are connected via NVLink), they can transfer data directly between their HBM memories without bouncing through system RAM. A tensor on GPU 0 can be copied to GPU 1 without involving the CPU or system memory.
GPUDirect RDMA goes further: InfiniBand and RoCE network cards can read from or write to GPU memory directly, bypassing the CPU and system RAM entirely. This is what enables near-memory-bandwidth distributed training over InfiniBand in DGX clusters.
import torch
# Check P2P access availability
if torch.cuda.device_count() >= 2:
can_p2p_01 = torch.cuda.can_device_access_peer(0, 1)
can_p2p_10 = torch.cuda.can_device_access_peer(1, 0)
print(f"GPU 0 -> GPU 1 P2P: {can_p2p_01}")
print(f"GPU 1 -> GPU 0 P2P: {can_p2p_10}")
# Direct tensor copy between GPUs (uses P2P if available)
t_gpu0 = torch.randn(1000, 1000, device='cuda:0')
# .to('cuda:1') uses P2P if available, falls through system memory if not
t_gpu1 = t_gpu0.to('cuda:1')
# Explicit copy via streams - both GPUs must be synchronized
stream = torch.cuda.Stream(device='cuda:1')
with torch.cuda.stream(stream):
t_gpu1_async = t_gpu0.to('cuda:1', non_blocking=True)
stream.synchronize()
# Shared CUDA tensors across Python processes
import torch.multiprocessing as mp
import os
def worker_process(shared_tensor, result_queue):
"""Worker that reads a shared GPU tensor without copying it."""
total = shared_tensor.sum().item()
result_queue.put(total)
if __name__ == '__main__' and torch.cuda.is_available():
# Create a GPU tensor and share it across processes
shared = torch.randn(100, 100, device='cuda:0')
shared.share_memory_() # moves to shared memory (CPU tensors only)
# For GPU tensors, use CUDA IPC:
# shared_handle = shared.storage().share_cuda_()
# Other processes can reconstruct the tensor from the handle
# This is zero-copy - they access the same GPU memory
ctx = mp.get_context('spawn')
result_queue = ctx.Queue()
p = ctx.Process(target=worker_process, args=(shared.cpu().share_memory_(), result_queue))
p.start()
p.join()
print(f"Worker computed: {result_queue.get():.4f}")
print(f"Main computed: {shared.cpu().sum().item():.4f}")
Apache Arrow and Zero-Copy Columnar Data
Apache Arrow defines a standard in-memory columnar format with a crucial property: the on-wire format is identical to the in-memory format. When you receive an Arrow RecordBatch over the network, no deserialization is required - you directly cast the received bytes to an Arrow array.
Compare this to protobuf: received bytes must be decoded into a message struct, which allocates new objects for every field. For a 1000-feature float32 vector, protobuf allocates roughly 1000 Python float objects. Arrow allocates zero new objects - the received bytes are the data.
Arrow Flight is a gRPC-based RPC system built on Arrow, specifically designed for bulk data transfer. A training pipeline that uses Arrow Flight to receive batches from a remote feature server gets the data already in a format that is one zero-copy step from a PyTorch tensor.
import pyarrow as pa
import pyarrow.flight as flight
import numpy as np
import torch
# --- Arrow zero-copy fundamentals ---
def arrow_zero_copy_demo():
"""Demonstrate Arrow's buffer-sharing with NumPy."""
# Create a numpy array
data = np.random.randn(1_000_000).astype(np.float32)
data_ptr = data.ctypes.data
# Wrap in Arrow array - no copy if aligned
arrow_array = pa.array(data)
buf = arrow_array.buffers()[1] # data buffer
recovered = np.frombuffer(buf, dtype=np.float32)
print(f"NumPy array data ptr: {data_ptr}")
print(f"Arrow buffer data ptr: {recovered.ctypes.data}")
# May or may not be the same depending on alignment, but
# Arrow avoids copying when possible
# torch.from_numpy after Arrow conversion: zero extra copies
arr_back = arrow_array.to_pydict() # this does create Python objects
# Better: use to_numpy() which gives a numpy array
np_from_arrow = np.array(arrow_array) # this IS a copy for safety
# For true zero-copy: access the buffer directly
np_zero_copy = np.frombuffer(arrow_array.buffers()[1], dtype=np.float32)
torch_tensor = torch.from_numpy(np_zero_copy) # zero copy - shares Arrow buffer
print(f"torch_tensor.data_ptr() == np_zero_copy.ctypes.data: "
f"{torch_tensor.data_ptr() == np_zero_copy.ctypes.data}") # True
# --- Arrow Flight server for ML batch serving ---
class TrainingDataServer(flight.FlightServerBase):
"""
Flight server that serves training batches.
Client receives Arrow RecordBatches directly from server memory - no deserialization.
"""
def __init__(self, host: str = "localhost", port: int = 8815):
location = flight.Location.for_grpc_tcp(host, port)
super().__init__(location)
self._schema = pa.schema([
pa.field('features', pa.list_(pa.float32())),
pa.field('label', pa.int32()),
])
self._batches = self._create_batches(n_batches=50, batch_size=512, n_features=128)
def _create_batches(self, n_batches, batch_size, n_features):
batches = []
for i in range(n_batches):
features_flat = np.random.randn(batch_size * n_features).astype(np.float32)
labels = np.random.randint(0, 10, size=batch_size).astype(np.int32)
# Build a FixedSizeList array for the feature vectors
features_list = pa.FixedSizeListArray.from_arrays(
pa.array(features_flat),
list_size=n_features
)
batch = pa.record_batch(
[features_list, pa.array(labels)],
schema=self._schema
)
batches.append(batch)
return batches
def do_get(self, context, ticket):
# Return a RecordBatchReader over our batch list
reader = pa.RecordBatchReader.from_batches(self._schema, self._batches)
return flight.RecordBatchStream(reader)
def list_flights(self, context, criteria):
descriptor = flight.FlightDescriptor.for_command(b"training")
info = flight.FlightInfo(
self._schema,
descriptor,
[flight.FlightEndpoint(flight.Ticket(b"training"),
[flight.Location.for_grpc_tcp("localhost", 8815)])],
-1, len(self._batches)
)
yield info
# --- Arrow Flight client ---
def stream_training_batches(host: str = "localhost", port: int = 8815):
"""Consume training batches from a Flight server as PyTorch tensors."""
client = flight.FlightConnect(f"grpc://{host}:{port}")
reader = client.do_get(flight.Ticket(b"training"))
for chunk in reader:
batch = chunk.data # pa.RecordBatch
# Convert to numpy - FixedSizeList -> numpy requires a bit of work
features_col = batch.column('features')
# Access the underlying float buffer directly
n = len(features_col)
list_size = features_col.type.list_size
flat_buf = features_col.values.buffers()[1]
features_np = np.frombuffer(flat_buf, dtype=np.float32).reshape(n, list_size)
labels_np = batch.column('label').to_pylist()
labels_np = np.array(labels_np, dtype=np.int64)
# torch.from_numpy: zero copy from numpy to torch
features_t = torch.from_numpy(features_np.copy()) # copy needed: Arrow buffer lifetime
labels_t = torch.from_numpy(labels_np)
yield features_t, labels_t
arrow_zero_copy_demo()
The Zero-Copy Pipeline for ML Training
DISK["Object Store<br/>or local SSD"]:::teal
KPC["Kernel Page Cache<br/>shared by all processes"]:::blue
UM["User Memory<br/>pageable heap"]:::orange
PM["Pinned Memory<br/>page-locked, DMA-ready"]:::green
GPU["GPU HBM<br/>device memory"]:::purple
GPU2["GPU 2<br/>P2P transfer"]:::purple
DISK -->|"DMA (no CPU)"| KPC
KPC -->|"mmap() - zero copy"| UM
KPC -->|"read() - one copy"| UM
UM -->|"pin_memory=True<br/>(DataLoader)"| PM
PM -->|"async DMA<br/>14-16 GB/s"| GPU
UM -->|"pageable transfer<br/>6-8 GB/s + pin overhead"| GPU
GPU -->|"GPUDirect P2P"| GPU2
classDef blue fill:#dbeafe,color:#1e293b,stroke:#2563eb
classDef teal fill:#ccfbf1,color:#134e4a,stroke:#14b8a6
classDef orange fill:#ffedd5,color:#7c2d12,stroke:#ea580c
classDef green fill:#dcfce7,color:#14532d,stroke:#16a34a
classDef purple fill:#ede9fe,color:#4c1d95,stroke:#7c3aed
classDef red fill:#fee2e2,color:#7f1d1d,stroke:#dc2626
Production Engineering Notes
Measuring Copy Overhead in Practice
Before optimizing, measure. Python's tracemalloc shows allocation counts but not transfer time. Use NVIDIA Nsight Systems (nsys) to see PCIe transfer time alongside GPU kernel time. On Linux, perf stat -e cache-misses can reveal when copying is thrashing CPU caches.
import torch
import time
import numpy as np
class TransferProfiler:
"""Tracks host-to-device transfer time in training loops."""
def __init__(self):
self.transfer_times_ms = []
def timed_to_cuda(self, tensor: torch.Tensor, non_blocking: bool = True) -> torch.Tensor:
if not torch.cuda.is_available():
return tensor
torch.cuda.synchronize()
t0 = time.perf_counter()
result = tensor.cuda(non_blocking=non_blocking)
if not non_blocking:
torch.cuda.synchronize()
else:
torch.cuda.synchronize() # sync to get accurate timing
self.transfer_times_ms.append((time.perf_counter() - t0) * 1000)
return result
def report(self):
if not self.transfer_times_ms:
print("No transfers recorded")
return
arr = np.array(self.transfer_times_ms)
print(f"H2D Transfer stats over {len(arr)} batches:")
print(f" Mean: {arr.mean():.2f} ms")
print(f" P50: {np.percentile(arr, 50):.2f} ms")
print(f" P99: {np.percentile(arr, 99):.2f} ms")
print(f" Total: {arr.sum():.0f} ms")
The /dev/shm Problem in Docker
PyTorch DataLoader with num_workers > 0 uses shared memory (/dev/shm) to pass tensors from worker processes to the main process. Docker containers default to 64 MB of /dev/shm. A single batch of 512 images at 224x224x3 float32 is 154 MB. When shared memory runs out, PyTorch silently falls back to a slower pipe-based transfer.
Always set --shm-size=8g (or larger) when running GPU training containers.
# Docker run
docker run --gpus all --shm-size=8g my-training-image python train.py
# docker-compose
services:
trainer:
shm_size: '8gb'
deploy:
resources:
reservations:
devices:
- capabilities: [gpu]
Arrow Flight vs gRPC for Feature Serving
For ML feature serving where the payload is structured numeric data (feature vectors, embeddings, model outputs), Arrow Flight consistently outperforms standard gRPC+protobuf in benchmarks by 3-10x for large payloads. The reason is architectural: protobuf encodes numeric arrays as repeated fields, which requires deserializing each element individually. Arrow encodes them as flat binary buffers, which require no element-wise deserialization.
For small payloads (under 10 KB), the difference is negligible. For training data batches (1+ MB per batch), Arrow Flight is the right choice.
Common Mistakes
torch.from_numpy() then .clone(): torch.from_numpy(arr) creates a zero-copy tensor that shares the numpy array's memory. Calling .clone() immediately after defeats the entire purpose and wastes a full copy. Remove .clone() unless you actually need independent data. The correct pattern is torch.from_numpy(arr) (shares memory) or torch.tensor(arr) (always copies - use when you want an independent copy).
Forgetting non_blocking=True with pinned memory: Pinned memory enables asynchronous transfers, but only if you pass non_blocking=True to .cuda(). Without it, the transfer is synchronous and you gain only the higher bandwidth, not the pipeline overlap. The full benefit comes from pin_memory=True in DataLoader combined with non_blocking=True in .cuda(). Getting either half wrong halves the benefit.
NumPy copy-vs-view confusion with operations: arr[::2] is a view. arr.flatten() is a copy. arr.reshape(-1) is usually a view but not always (depends on contiguity). arr.T.reshape(-1) is always a copy. Never assume an operation returns a view without checking np.shares_memory(). Getting this wrong in a tight loop silently degrades performance without any error.
Arrow "zero-copy" applies to in-process buffer reuse, not network transfers: Arrow's zero-copy property means no deserialization marshaling overhead - the binary format on the wire is the same as in memory. But bytes still travel over the network. The benefit over protobuf is that no additional allocation or parsing happens after receipt. For in-process use (passing data between threads or mapping IPC files), Arrow is truly zero allocation. For Arrow Flight transfers, it means the received buffer IS the array data, not that no transfer occurred.
Interview Q&A
Q: What is the difference between zero-copy and DMA, and why does pinned memory matter for GPU transfers?
A: DMA (Direct Memory Access) is a hardware mechanism where a peripheral device can transfer data to/from system memory without CPU involvement. The CPU programs the DMA controller with source address, destination address, and transfer size, then continues executing. The DMA engine performs the actual byte movement in parallel.
Pinned (page-locked) memory matters because DMA requires the physical addresses of source memory to remain stable throughout the transfer. Regular pageable memory can have its physical pages moved or swapped by the OS at any time - this would corrupt a DMA transfer in progress.
When PyTorch transfers a tensor from pageable CPU memory to GPU, the CUDA driver must first copy the tensor to a temporary internal pinned buffer, then initiate the DMA. This is why pageable transfers are slower (extra copy) and cannot be asynchronous (must wait for the internal copy).
With pinned memory, the DMA starts immediately from the original allocation, can run asynchronously while the CPU prepares the next batch, and achieves near-theoretical PCIe bandwidth. The non_blocking=True parameter makes the asynchronous nature explicit: the .cuda() call returns before the transfer completes, and the CUDA stream ordering system ensures the GPU waits for the transfer before using the data in a kernel.
Q: Explain how NumPy views work internally and how to detect if a slice creates a copy or a view.
A: Every NumPy ndarray has two parts: a metadata header (shape, stride, dtype, a pointer to the data buffer, and a storage offset) and a data buffer (a contiguous block of bytes). A view creates a new metadata header pointing to the same data buffer, possibly at a different offset and with different strides, but without copying any bytes.
The stride system is what makes views flexible. For a C-contiguous 2D float32 array of shape (1000, 500), strides are (2000, 4): moving one row forward advances 2000 bytes, one column advances 4 bytes. A slice arr[10:20, 50:100] creates a new header: shape (10, 50), same strides (2000, 4), but a storage offset of 10*2000 + 50*4 = 20200 bytes. No bytes moved.
To detect copy vs view: use np.shares_memory(original, result) - returns True if they share any bytes in their data buffers. Check result.base: if not None, it points to the owner of the data. If result.base is None, the result owns its data (it is a copy, or the original).
Rules of thumb: basic slicing (integers and slice notation) always returns a view. Fancy indexing (arrays/lists of indices) always copies. Boolean indexing always copies. reshape() returns a view if the array is contiguous, copies otherwise. ravel() returns a view if contiguous, copies otherwise. flatten() always copies.
Q: A training pipeline uses DataLoader with pin_memory=True and non_blocking=True but GPU utilization is still only 70%. What would you investigate?
A: I would investigate in this order.
First, confirm pinning is working. Add assert inputs.is_pinned() in the training loop. Custom collate_fn functions often silently convert tensors to regular memory. Verify the DataLoader workers are not bottlenecked on preprocessing (add timing to __getitem__).
Second, determine whether the bottleneck is transfer-bound or compute-bound. Use nvidia-smi dmon -s u to watch SM utilization vs PCIe bandwidth utilization simultaneously. If PCIe is saturated but SM is low, you need faster data (more workers, cached data in RAM). If PCIe is underutilized, the problem is elsewhere.
Third, look for implicit synchronization points. Any .item() call, print(tensor), torch.cuda.synchronize(), or loss logging that calls .item() on a GPU tensor inserts a full pipeline stall. This is the most common cause of GPU underutilization in otherwise well-configured pipelines.
Fourth, check the compute-to-data ratio. If a single training step takes 5 ms and transferring one batch takes 8 ms, no amount of async optimization will give you 100% utilization - you need to reduce batch transfer time (reduce batch size, reduce feature dimensionality, use BF16 instead of FP32) or increase step time (larger model, more computation per step).
Fifth, look at the prefetch queue. With prefetch_factor=2 and 4 workers, there are 8 batches in flight simultaneously. If the GPU processes batches faster than workers can prepare them, increase num_workers or prefetch_factor.
Q: What is Apache Arrow's zero-copy property and when does it actually apply?
A: Apache Arrow defines an in-memory binary format for columnar data arrays. The on-wire format for Arrow IPC is the same as the in-memory format - there is no additional encoding/decoding step. An Arrow float32 array is stored as a raw byte buffer: 4 bytes per element, no framing, no field tags, no length prefixes for elements. When you receive this buffer over a network, you can immediately interpret it as float32 values by casting the pointer.
The zero-copy property applies concretely in these situations:
First, mmap'd Arrow IPC files. When you open an Arrow IPC file with a memory-mapped file reader, the resulting array objects point directly into the mmap'd pages. The data was never copied into a separate buffer. Loading a 1 GB Arrow file with mmap uses 1 GB of page cache, shared across all processes that mmap the same file.
Second, torch.from_numpy(np_array) where np_array was obtained from arrow_array.buffers()[1] - this is two views deep into the same memory: Arrow buffer to numpy ndarray to torch tensor, no copies.
Third, Arrow Flight batch reception. The RecordBatch objects returned by the Flight reader reference the raw received bytes. No deserialization step allocates new objects per element. Compare to protobuf's repeated float field, where every float is unpacked into a Python float object.
The zero-copy property does NOT mean network transfers are free. It means that after the bytes arrive, no additional allocation or per-element work is required to use them. For small payloads this distinction is minor. For 512-element float32 feature vectors (2 KB), protobuf would allocate 512 Python objects per batch; Arrow allocates zero.
Q: How does mmap() compare to read() for accessing large model checkpoint files?
A: The core difference is when and how pages enter the process's address space.
With read(), the kernel copies file data into a buffer you provide. The data goes through two representations: the kernel's page cache, and your buffer. For sequential file reads, the kernel's read-ahead algorithm preloads upcoming pages efficiently, making read() competitive for pure sequential access.
With mmap(), the file is mapped into the process's virtual address space. Pages are loaded on demand via page faults - when you dereference a virtual address in the mapped region for the first time, the CPU triggers a page fault, the kernel loads that page from the page cache (or from disk), and execution resumes. Subsequent accesses to the same page are in-memory and very fast.
For large model checkpoints (a 7B parameter model is roughly 14 GB at float16), mmap has significant advantages. You can access specific tensors without reading the whole file. Hugging Face safetensors stores tensor metadata (names, shapes, offsets) in a small JSON header at the start of the file, then stores tensors at fixed offsets. A loader can mmap the file, parse the header (tiny read), and construct tensor objects pointing into the mmap region - touching only the pages containing the tensors actually needed.
The second major advantage of mmap for model serving: multiple Python processes loading the same checkpoint file share the same physical pages via the page cache. Two model serving replicas on the same machine load a 14 GB model; total physical RAM used for weights is 14 GB, not 28 GB. With read(), each process gets its own private copy.
The disadvantage of mmap: on systems with memory pressure (many processes competing for RAM), page eviction under mmap can cause repeated page faults as the kernel evicts and reloads the same pages. For streaming large datasets that do not fit in RAM, read() with explicit buffering and madvise(MADV_SEQUENTIAL) can outperform mmap because the kernel's sequential readahead is more aggressive than the demand-paging behavior of mmap.
Q: Explain the sendfile() syscall and where it appears in ML data infrastructure.
A: sendfile(out_fd, in_fd, offset, count) is a Linux syscall that transfers data from a file descriptor to a socket entirely within the kernel, without the data ever entering user space. On hardware supporting scatter-gather DMA, the kernel does not even perform a kernel-level copy - it updates the socket buffer descriptor to point to the existing page cache pages. The network card's DMA engine reads directly from the page cache. No CPU copies occur at all.
The performance difference is large. A 1 GB file sent with read()/write() requires two CPU copies: 1 GB kernel-to-user, then 1 GB user-to-kernel. Those copies consume CPU cycles, pollute the L3 cache with data the CPU never inspects, and add latency. sendfile() eliminates both.
Where this appears in ML infrastructure:
Kafka uses sendfile() (via Java's FileChannel.transferTo()) to deliver log segments from broker to consumer. When a training pipeline reads from Kafka, the batch data flows from the broker's disk directly to the consumer's socket buffer without the broker's CPU touching the bytes. This is the primary reason a single Kafka broker can sustain 600-800 MB/s throughput while remaining CPU-idle.
Nginx and CDN systems use sendfile() to serve static files at disk bandwidth. If you serve pre-tokenized training data from a simple HTTP server (a common pattern for distributed training on heterogeneous clusters), using Nginx as the data server gives you sendfile() semantics for free.
Custom data servers for ML training (the pattern of a dedicated "data service" that preloads and serves batches) should be written in Go or C++ and use sendfile()/splice() for batch delivery. Python does expose os.sendfile(out_fd, in_fd, offset, count), but the overhead of the Python interpreter at high batch rates typically warrants a native implementation.
