Working with Directories - Navigation, Traversal, and File Operations
Reading time: ~20 minutes | Level: Foundation → Engineering
Here is behavior that surprises most Python developers the first time they see it:
import shutil
# Delete a directory and everything in it - instantly, permanently
shutil.rmtree("/path/to/my_project")
No confirmation. No recycle bin. No recovery. If you accidentally pass the wrong path - your home directory, /, a mounted drive - the deletion is immediate and irreversible on most systems.
This is why understanding directory operations at engineering depth matters. The same tools that make Python powerful for build systems, dataset management, and log rotation are exactly as powerful at destroying data.
This is the capstone lesson of the file-handling module. Everything you have learned - pathlib (lesson 04), the os module (lesson 05), context managers (lesson 03), and file I/O (lessons 01–02) - comes together here to handle real-world filesystem operations safely and effectively.
What You Will Learn
- Directory concepts: working directory, absolute vs relative paths, and path resolution
- Creating directories safely:
os.makedirs()vsPath.mkdir(parents=True, exist_ok=True) - Listing directory contents:
os.listdir()vsos.scandir()vsPath.iterdir()- and the performance differences - Recursive traversal:
os.walk()with all its parameters;Path.rglob()as a cleaner alternative - Copying:
shutil.copy(),shutil.copy2(),shutil.copytree()and what metadata each preserves - Moving and renaming:
shutil.move()vsPath.rename()and cross-filesystem behavior - Deleting safely:
os.unlink(),os.rmdir(),shutil.rmtree()with guards - Temporary directories:
tempfile.mkdtemp()andtempfile.TemporaryDirectory() - Disk usage:
shutil.disk_usage(),os.stat(), summing a directory tree - File globbing:
glob.glob()vsPath.glob()vsPath.rglob()
Prerequisites
pathlib.Pathbasics (lesson 04 - especiallyPath.iterdir(),Path.glob(),p.parent,p.name)osmodule basics (lesson 05 -os.getcwd(),os.environ,os.stat())- Context managers with
with(lesson 03) - File reading and writing (lessons 01 and 02)
Mental Model: The Filesystem as a Tree
/ # Root - absolute paths start here
├── home/
│ └── alice/ # Home directory
│ ├── projects/ # Working directory (cwd)
│ │ ├── app/ # Subdirectory
│ │ │ ├── main.py
│ │ │ └── utils.py
│ │ └── tests/
│ │ └── test_main.py
│ └── data/
│ ├── train.csv
│ └── test.csv
- Absolute path:
/home/alice/projects/app/main.py - Relative path (from
projects/):app/main.py - Parent of
app/:/home/alice/projects/ - Name of
main.py:main.py- Stem:main- Suffix:.py
Part 1 - Directory Concepts and Path Resolution
Working Directory
The working directory (cwd) is the reference point for all relative paths in your process:
import os
from pathlib import Path
# Get the current working directory
cwd = os.getcwd()
print(cwd)
# /home/alice/projects
# pathlib equivalent
cwd_path = Path.cwd()
print(cwd_path)
# /home/alice/projects
# Relative paths resolve against cwd
relative = Path("app/main.py")
absolute = relative.resolve()
print(absolute)
# /home/alice/projects/app/main.py
# Check if a path is absolute
print(Path("/tmp/file.txt").is_absolute()) # True
print(Path("app/main.py").is_absolute()) # False
Path Resolution and Canonicalization
from pathlib import Path
# resolve() makes a path absolute AND resolves symlinks and ".." components
p = Path("../../etc/passwd")
print(p.resolve())
# /etc/passwd (from any starting directory)
# Parts of a path
p = Path("/home/alice/projects/app/main.py")
print(p.parts) # ('/', 'home', 'alice', 'projects', 'app', 'main.py')
print(p.parent) # /home/alice/projects/app
print(p.parents[2]) # /home/alice
print(p.name) # main.py
print(p.stem) # main
print(p.suffix) # .py
# Build paths safely with / (no string concatenation!)
base = Path("/home/alice/projects")
config = base / "config" / "settings.json"
print(config)
# /home/alice/projects/config/settings.json
:::tip Always use Path / operator for path construction
Never concatenate paths as strings: base + "/" + "subdir" breaks on Windows (which uses \), introduces double-slash bugs, and cannot be resolved. Use pathlib's / operator - it handles all platforms correctly.
:::
Part 2 - Creating Directories
os.makedirs() vs Path.mkdir()
Both create a directory tree, but have different APIs and defaults:
import os
from pathlib import Path
# os.makedirs - creates all intermediate directories
os.makedirs("/tmp/demo/a/b/c")
# Creates: /tmp/demo/, /tmp/demo/a/, /tmp/demo/a/b/, /tmp/demo/a/b/c/
# Raises FileExistsError if the directory already exists (by default)
try:
os.makedirs("/tmp/demo/a/b/c")
except FileExistsError:
print("Already exists")
# exist_ok=True prevents the error
os.makedirs("/tmp/demo/a/b/c", exist_ok=True) # No error if exists
# pathlib equivalent - cleaner, more Pythonic
path = Path("/tmp/demo2/x/y/z")
path.mkdir(parents=True, exist_ok=True)
# parents=True → create all missing intermediate directories
# exist_ok=True → don't raise if directory already exists
| Situation | Command |
|---|---|
| One directory, parent guaranteed to exist | Path("subdir").mkdir() |
| Directory + all missing parents | Path("a/b/c").mkdir(parents=True) |
| Create if not exists, continue if it does (production standard) | Path("dir").mkdir(parents=True, exist_ok=True) |
| Set Unix permissions at creation time | Path("dir").mkdir(mode=0o755) |
Real-World: Project Scaffolding
from pathlib import Path
def scaffold_ml_project(name: str, base_dir: str = ".") -> Path:
"""
Create a standard ML project directory structure.
Idempotent - safe to call multiple times.
"""
root = Path(base_dir) / name
directories = [
"data/raw",
"data/processed",
"data/external",
"notebooks",
"src/features",
"src/models",
"src/evaluation",
"tests",
"configs",
"outputs/models",
"outputs/figures",
"outputs/reports",
]
for directory in directories:
(root / directory).mkdir(parents=True, exist_ok=True)
# Create placeholder files
(root / "README.md").touch()
(root / ".gitignore").write_text(
"data/raw/\n*.pkl\n__pycache__/\n.env\noutputs/models/\n"
)
gitkeep_dirs = ["data/raw", "data/processed", "outputs/models"]
for d in gitkeep_dirs:
(root / d / ".gitkeep").touch()
print(f"Project '{name}' scaffolded at {root.resolve()}")
return root
project = scaffold_ml_project("sentiment_analysis", "/tmp")
# Project 'sentiment_analysis' scaffolded at /tmp/sentiment_analysis
Part 3 - Listing Directory Contents
Three Approaches, Different Trade-offs
import os
from pathlib import Path
directory = "/tmp/demo"
os.makedirs(directory, exist_ok=True)
# Create some test files
for name in ["a.txt", "b.py", "c.json"]:
Path(directory, name).write_text("test")
(Path(directory) / "subdir").mkdir(exist_ok=True)
os.listdir() - Simplest, Returns Names Only
names = os.listdir(directory)
print(names)
# ['a.txt', 'subdir', 'b.py', 'c.json'] - order not guaranteed
# To get full paths, you must join manually
for name in os.listdir(directory):
full_path = os.path.join(directory, name)
print(full_path)
os.scandir() - Fast, Returns DirEntry Objects
with os.scandir(directory) as entries:
for entry in entries:
print(f"{entry.name:20} is_file={entry.is_file()} is_dir={entry.is_dir()}")
# entry.stat() for metadata (size, mtime) - one syscall per entry
stat = entry.stat()
print(f" size={stat.st_size} bytes")
# a.txt is_file=True is_dir=False
# size=4 bytes
# subdir is_file=False is_dir=True
# size=64 bytes
:::tip os.scandir() is significantly faster than os.listdir() for file metadata
os.scandir() caches is_file(), is_dir(), and stat() results from the OS directory scan. os.listdir() only returns names - if you then call os.stat() on each, that's a separate system call per file. On a directory with 10,000 files, os.scandir() can be 10x faster for operations that need file attributes.
:::
Path.iterdir() - Most Pythonic
from pathlib import Path
for entry in Path(directory).iterdir():
print(f"{entry.name:20} is_file={entry.is_file()} size={entry.stat().st_size}")
# Filtering with type checks
python_files = [p for p in Path(directory).iterdir() if p.suffix == ".py"]
directories_only = [p for p in Path(directory).iterdir() if p.is_dir()]
| Method | Returns | Metadata cache | Best for |
|---|---|---|---|
os.listdir() | list[str] | No | Simple name lists |
os.scandir() | DirEntry iterator | Yes (fast) | Large dirs + stat info |
Path.iterdir() | Path iterator | No | Modern code |
Part 4 - Recursive Traversal
os.walk() - The Classic Workhorse
os.walk() generates (dirpath, dirnames, filenames) tuples for every directory in the tree:
import os
from pathlib import Path
# Create a test tree
for p in ["/tmp/tree/a/1", "/tmp/tree/a/2", "/tmp/tree/b"]:
os.makedirs(p, exist_ok=True)
for p in ["/tmp/tree/root.txt", "/tmp/tree/a/1/file1.py", "/tmp/tree/b/data.csv"]:
Path(p).write_text("test")
for dirpath, dirnames, filenames in os.walk("/tmp/tree"):
print(f"DIR: {dirpath}")
for filename in filenames:
full_path = os.path.join(dirpath, filename)
print(f" FILE: {full_path}")
# DIR: /tmp/tree
# FILE: /tmp/tree/root.txt
# DIR: /tmp/tree/a
# DIR: /tmp/tree/a/1
# FILE: /tmp/tree/a/1/file1.py
# DIR: /tmp/tree/a/2
# DIR: /tmp/tree/b
# FILE: /tmp/tree/b/data.csv
Controlling os.walk() Traversal
import os
# topdown=True (default): process parent before children
# Modifying dirnames in-place skips subtrees
for dirpath, dirnames, filenames in os.walk("/tmp/tree", topdown=True):
# Skip hidden directories (starting with .)
dirnames[:] = [d for d in dirnames if not d.startswith(".")]
# Skip __pycache__ and .git
dirnames[:] = [d for d in dirnames if d not in {"__pycache__", ".git", "node_modules"}]
for filename in filenames:
print(os.path.join(dirpath, filename))
:::warning Modifying dirnames in-place
When topdown=True, modifying dirnames in-place with dirnames[:] = [...] prevents os.walk() from descending into the removed directories. Assigning a new list (dirnames = [...]) does nothing - os.walk() has already captured the reference.
:::
os.walk() with Error Handling
import os
def walk_safe(root: str):
"""Walk a directory tree, logging permission errors without stopping."""
def handle_error(error):
print(f" Permission denied: {error.filename}")
for dirpath, dirnames, filenames in os.walk(root, onerror=handle_error):
dirnames[:] = [d for d in dirnames if not d.startswith(".")]
for filename in filenames:
yield os.path.join(dirpath, filename)
# Count Python files in a project
python_count = sum(1 for f in walk_safe("/home/alice/projects") if f.endswith(".py"))
print(f"Python files: {python_count}")
Path.rglob() - The Modern Alternative
Path.rglob() is cleaner and more expressive for glob-pattern matching:
from pathlib import Path
root = Path("/tmp/tree")
# All Python files, recursively
python_files = list(root.rglob("*.py"))
print(python_files)
# [PosixPath('/tmp/tree/a/1/file1.py')]
# All CSV files anywhere in the tree
csv_files = list(root.rglob("*.csv"))
# All files (any name) - ** matches any depth
all_files = [p for p in root.rglob("*") if p.is_file()]
# All directories
all_dirs = [p for p in root.rglob("*") if p.is_dir()]
Use os.walk() | Use Path.rglob() |
|---|---|
| Need files AND directory structure simultaneously | Pattern-matching (*.py, **/*.json, test_*.py) |
Need to modify traversal (dirnames[:] = ... to skip subtrees) | Just need a flat list of matching files |
| Need dirpath context (relative path calculations) | Cleaner, more readable code |
| Maximum performance on very large trees | Integration with pathlib Path objects |
Part 5 - Copying Files and Directories
shutil.copy() vs shutil.copy2() vs shutil.copytree()
import shutil
from pathlib import Path
# Setup
src = Path("/tmp/source_file.txt")
src.write_text("Hello, World!")
Path("/tmp/dest_dir").mkdir(exist_ok=True)
shutil.copy() - Content + Permissions
# Copy file content and permissions (chmod bits)
# Does NOT copy metadata (mtime, atime)
dest = shutil.copy("/tmp/source_file.txt", "/tmp/dest_dir/copy1.txt")
print(dest) # /tmp/dest_dir/copy1.txt
# Can also copy into a directory (preserves filename)
shutil.copy("/tmp/source_file.txt", "/tmp/dest_dir/")
# Creates: /tmp/dest_dir/source_file.txt
shutil.copy2() - Content + Permissions + Metadata
import os, time
# copy2 also copies mtime and atime (file timestamp metadata)
# This is closest to the Unix `cp -p` command
shutil.copy2("/tmp/source_file.txt", "/tmp/dest_dir/copy2.txt")
src_stat = os.stat("/tmp/source_file.txt")
dst_stat = os.stat("/tmp/dest_dir/copy2.txt")
print(f"Source mtime: {src_stat.st_mtime}")
print(f"copy2 mtime: {dst_stat.st_mtime}") # Same as source
shutil.copytree() - Entire Directory Tree
# Create a source directory tree
import os
from pathlib import Path
src_tree = Path("/tmp/src_tree")
(src_tree / "subdir").mkdir(parents=True, exist_ok=True)
(src_tree / "main.py").write_text("# main")
(src_tree / "subdir" / "utils.py").write_text("# utils")
(src_tree / ".git").mkdir(exist_ok=True) # Hidden dir
# Basic copy
shutil.copytree("/tmp/src_tree", "/tmp/dst_tree")
# Ignore patterns - skip .git, __pycache__, *.pyc
shutil.copytree(
"/tmp/src_tree",
"/tmp/dst_tree_clean",
ignore=shutil.ignore_patterns(".git", "__pycache__", "*.pyc", "*.egg-info"),
)
# dirs_exist_ok=True (Python 3.8+): copy into existing directory
shutil.copytree(
"/tmp/src_tree",
"/tmp/existing_dest",
dirs_exist_ok=True,
)
| Function | Content | Permissions | Timestamps | Symlinks |
|---|---|---|---|---|
shutil.copy() | Yes | Yes | No | As file |
shutil.copy2() | Yes | Yes | Yes | As file |
shutil.copyfile() | Yes | No | No | As file |
shutil.copytree() | Yes | Yes | Yes* | Follows* |
* copytree() has symlinks= and copy_function= parameters.
Part 6 - Moving and Renaming
Path.rename() - Atomic on Same Filesystem
from pathlib import Path
# Rename within same directory
p = Path("/tmp/old_name.txt")
p.write_text("content")
p.rename("/tmp/new_name.txt")
# p still points to old (now non-existent) path
# Rename is atomic on most Unix filesystems (same filesystem)
# This means the operation either fully completes or doesn't happen at all
# Move to different directory
src = Path("/tmp/data.csv")
src.write_text("col1,col2\n1,2")
src.rename("/tmp/archive/data.csv") # Will fail if /tmp/archive/ doesn't exist
shutil.move() - Cross-Filesystem Safe
import shutil
# shutil.move works across filesystems (e.g., /tmp → /home)
# On same filesystem: uses os.rename (atomic, fast)
# Across filesystems: copies then deletes (not atomic)
shutil.move("/tmp/source.txt", "/home/alice/destination.txt")
# Move entire directory tree
shutil.move("/tmp/old_dir", "/home/alice/new_dir")
# Move into an existing directory (preserves filename)
shutil.move("/tmp/data.csv", "/home/alice/data/")
# Creates: /home/alice/data/data.csv
:::warning Path.rename() vs shutil.move()
Path.rename() raises OSError if the source and destination are on different filesystems. shutil.move() handles cross-filesystem moves transparently by falling back to copy+delete. In production code that may move files across mounts, always use shutil.move().
:::
Part 7 - Deleting Files and Directories
The Deletion Hierarchy
import os
import shutil
from pathlib import Path
# Level 1: Delete a single file
os.unlink("/tmp/file.txt") # Raises FileNotFoundError if absent
Path("/tmp/file.txt").unlink() # Same, pathlib style
Path("/tmp/file.txt").unlink(missing_ok=True) # Python 3.8+: no error if absent
# Level 2: Delete an empty directory
os.rmdir("/tmp/empty_dir") # Raises OSError if not empty
Path("/tmp/empty_dir").rmdir() # Same, raises if not empty
# Level 3: Delete a directory tree (NON-RECOVERABLE)
shutil.rmtree("/tmp/full_dir") # Deletes everything recursively
:::danger shutil.rmtree() is permanent and irreversible
shutil.rmtree() deletes the directory and everything inside it immediately, bypassing the trash/recycle bin. There is no undo. A typo in the path argument can destroy important data. Always add safety guards before calling it.
:::
Safe Deletion Patterns
import shutil
from pathlib import Path
def safe_rmtree(path: str | Path, require_prefix: str | None = None) -> None:
"""
Delete a directory tree with safety guards.
Args:
path: Directory to delete.
require_prefix: If provided, raises ValueError if path does not
start with this prefix (safety guard for production).
"""
path = Path(path).resolve()
if require_prefix:
prefix = Path(require_prefix).resolve()
if not str(path).startswith(str(prefix)):
raise ValueError(
f"Safety check failed: {path} is not under required prefix {prefix}\n"
f"This guard prevents accidental deletion of important directories."
)
if not path.exists():
return # Nothing to do
if not path.is_dir():
raise ValueError(f"{path} is not a directory")
# Final check: never delete root or home
forbidden = {Path("/"), Path.home()}
if path in forbidden:
raise ValueError(f"Refusing to delete protected path: {path}")
shutil.rmtree(path)
print(f"Deleted: {path}")
# Usage
safe_rmtree("/tmp/build_output", require_prefix="/tmp")
# This would raise ValueError - won't delete /home/alice
# safe_rmtree("/home/alice", require_prefix="/tmp")
shutil.rmtree() with onerror Callback
On Windows, files marked read-only cause rmtree to fail. Use onerror to handle this:
import shutil
import os
import stat
def remove_readonly(func, path, excinfo):
"""onerror callback: remove read-only flag and retry deletion."""
os.chmod(path, stat.S_IWRITE)
func(path)
# Use on Windows or when read-only files may exist
shutil.rmtree("/tmp/directory", onerror=remove_readonly)
Part 8 - Temporary Directories
Temporary directories are essential for safe file processing - write to a temp location, verify success, then move to the final destination.
tempfile.TemporaryDirectory() - Automatic Cleanup
import tempfile
from pathlib import Path
# Context manager: directory is automatically deleted when the `with` block exits
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
print(f"Temp dir: {tmp_path}")
# /tmp/tmpXYZABC123 (random suffix)
# Do work in the temp directory
(tmp_path / "processing.csv").write_text("id,value\n1,100\n2,200")
(tmp_path / "output").mkdir()
(tmp_path / "output" / "results.json").write_text('{"count": 2}')
# All temp files accessible within the block
for p in tmp_path.rglob("*"):
if p.is_file():
print(f" {p.relative_to(tmp_path)}")
# processing.csv
# output/results.json
# After the block: tmp_dir and everything in it is deleted automatically
print(Path(tmp_dir).exists()) # False
tempfile.mkdtemp() - Manual Cleanup
Use when you need the temp directory to outlive the current scope:
import tempfile
import shutil
from pathlib import Path
# Creates temp dir and returns path - YOU are responsible for cleanup
tmp_dir = tempfile.mkdtemp(prefix="myapp_", suffix="_build")
print(tmp_dir)
# /tmp/myapp_XXXXXX_build
tmp_path = Path(tmp_dir)
try:
# Do work
(tmp_path / "artifact.bin").write_bytes(b"\x00" * 1024)
# ... build process ...
final_dest = Path("/home/alice/releases/artifact.bin")
shutil.copy2(tmp_path / "artifact.bin", final_dest)
finally:
# Always clean up, even if an exception occurred
shutil.rmtree(tmp_dir, ignore_errors=True)
Real-World Pattern: Atomic File Write via Temp File
Write to temp, then rename - ensures readers never see a partial file:
import tempfile
import json
import shutil
from pathlib import Path
def atomic_write_json(data: dict, dest_path: str | Path) -> None:
"""
Write JSON to dest_path atomically.
Writes to a temp file first, then renames to final path.
Readers never see a partial file; the rename is atomic on POSIX.
"""
dest = Path(dest_path)
dest.parent.mkdir(parents=True, exist_ok=True)
# Create temp file in the same directory (same filesystem = atomic rename)
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
suffix=".tmp",
dir=dest.parent,
delete=False,
) as tmp_f:
tmp_path = Path(tmp_f.name)
try:
json.dump(data, tmp_f, indent=2)
tmp_f.flush()
# On POSIX: rename is atomic - either the new file exists or the old one
tmp_path.rename(dest)
except Exception:
tmp_path.unlink(missing_ok=True) # Clean up on failure
raise
atomic_write_json({"status": "ok", "count": 42}, "/tmp/output/results.json")
Part 9 - File Globbing
glob.glob() and glob.iglob()
import glob
# glob.glob() - returns a list
py_files = glob.glob("/tmp/project/**/*.py", recursive=True)
print(py_files)
# ['/tmp/project/main.py', '/tmp/project/src/utils.py', ...]
# glob.iglob() - returns a generator (memory-efficient for large trees)
for path in glob.iglob("/tmp/project/**/*.py", recursive=True):
print(path)
# Patterns:
# * = any number of characters within a path component (not /)
# ** = any number of path components (with recursive=True)
# ? = exactly one character
# [abc] = one character from the set
# {a,b} = NOT supported by glob - use multiple calls or Path.rglob()
# Common patterns:
glob.glob("/tmp/**/*.py", recursive=True) # All .py files, any depth
glob.glob("/tmp/data/test_*.csv") # Files matching test_*.csv
glob.glob("/tmp/logs/app.[0-9][0-9][0-9]") # Numbered log files
Path.glob() and Path.rglob()
from pathlib import Path
root = Path("/tmp/project")
# Path.glob() - relative to root, returns Path objects (not strings)
py_files = list(root.glob("*.py")) # Only in root
py_all = list(root.glob("**/*.py")) # All depths (equivalent to rglob)
py_rglob = list(root.rglob("*.py")) # Cleaner syntax for recursive
# Filter by multiple criteria
large_csvs = [
p for p in root.rglob("*.csv")
if p.stat().st_size > 1_000_000 # Larger than 1 MB
]
# Pattern: files modified in the last 24 hours
import time
recent = [
p for p in root.rglob("*")
if p.is_file() and (time.time() - p.stat().st_mtime) < 86400
]
| Method | Returns | Patterns | Recursive |
|---|---|---|---|
glob.glob() | list[str] | Shell globs | recursive=True |
glob.iglob() | iterator | Shell globs | recursive=True |
Path.glob() | iterator | Shell globs | Use ** in pattern |
Path.rglob(pat) | iterator | Shell globs | Always recursive |
Recommendation: Use Path.rglob() for new code - returns Path objects (not strings), no recursive= flag needed.
Part 10 - Disk Usage
shutil.disk_usage() - Free Space Check
import shutil
usage = shutil.disk_usage("/")
print(f"Total: {usage.total / 1e9:.1f} GB")
print(f"Used: {usage.used / 1e9:.1f} GB")
print(f"Free: {usage.free / 1e9:.1f} GB")
print(f"Usage: {usage.used / usage.total * 100:.1f}%")
# Total: 250.0 GB
# Used: 187.3 GB
# Free: 62.7 GB
# Usage: 74.9%
def check_disk_space(path: str, required_bytes: int) -> bool:
"""Return True if path has at least required_bytes of free space."""
usage = shutil.disk_usage(path)
return usage.free >= required_bytes
# Check before a large operation
if not check_disk_space("/tmp", 5 * 1024**3): # 5 GB
raise RuntimeError("Insufficient disk space for operation")
Summing a Directory Tree
from pathlib import Path
def directory_size(path: str | Path) -> int:
"""Return total size in bytes of all files in a directory tree."""
root = Path(path)
return sum(
f.stat().st_size
for f in root.rglob("*")
if f.is_file()
)
def format_size(bytes_count: int) -> str:
"""Human-readable file size."""
for unit in ["B", "KB", "MB", "GB", "TB"]:
if bytes_count < 1024:
return f"{bytes_count:.1f} {unit}"
bytes_count /= 1024
return f"{bytes_count:.1f} PB"
size = directory_size("/tmp/project")
print(f"Project size: {format_size(size)}")
# Project size: 2.3 MB
Part 11 - Putting It All Together: Real-World Patterns
Pattern 1: Log Rotation
import os
import gzip
import shutil
from datetime import datetime
from pathlib import Path
def rotate_logs(log_dir: str | Path, max_size_mb: float = 100, keep_days: int = 30) -> None:
"""
Rotate large log files by compressing them.
Delete compressed logs older than keep_days.
"""
log_dir = Path(log_dir)
max_size = max_size_mb * 1024 * 1024
cutoff_time = datetime.now().timestamp() - (keep_days * 86400)
# Compress large log files
for log_file in log_dir.glob("*.log"):
if log_file.stat().st_size <= max_size:
continue
# Compress: app.log → app_20240115_143000.log.gz
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
archive_name = log_file.stem + f"_{timestamp}.log.gz"
archive_path = log_dir / archive_name
with log_file.open("rb") as f_in:
with gzip.open(archive_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
# Truncate original (don't delete - let it continue receiving writes)
log_file.write_bytes(b"")
print(f"Rotated: {log_file.name} → {archive_name}")
# Delete old compressed logs
for gz_file in log_dir.glob("*.log.gz"):
if gz_file.stat().st_mtime < cutoff_time:
gz_file.unlink()
print(f"Deleted old log: {gz_file.name}")
Pattern 2: Dataset Management
import shutil
import json
from pathlib import Path
from datetime import datetime
class DatasetManager:
"""
Manages versioned dataset directories for ML pipelines.
Uses pathlib + shutil for all filesystem operations.
"""
def __init__(self, base_dir: str | Path):
self.base = Path(base_dir)
self.raw = self.base / "raw"
self.processed = self.base / "processed"
self.archive = self.base / "archive"
for d in [self.raw, self.processed, self.archive]:
d.mkdir(parents=True, exist_ok=True)
def ingest(self, source_path: str | Path, dataset_name: str) -> Path:
"""Copy a raw data file into the managed raw directory."""
source = Path(source_path)
if not source.exists():
raise FileNotFoundError(f"Source not found: {source}")
dest = self.raw / dataset_name / source.name
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, dest)
# Write metadata sidecar
meta = {
"source": str(source.resolve()),
"ingested_at": datetime.utcnow().isoformat() + "Z",
"size_bytes": dest.stat().st_size,
}
(dest.parent / "metadata.json").write_text(
json.dumps(meta, indent=2), encoding="utf-8"
)
return dest
def process(self, dataset_name: str, version: str) -> Path:
"""
Stub for a processing step - creates versioned output directory.
In real usage, this would call the actual transformation pipeline.
"""
input_dir = self.raw / dataset_name
output_dir = self.processed / dataset_name / version
output_dir.mkdir(parents=True, exist_ok=True)
return output_dir
def archive_version(self, dataset_name: str, version: str) -> Path:
"""Move a processed dataset version to archive."""
source = self.processed / dataset_name / version
dest = self.archive / dataset_name / version
if not source.exists():
raise FileNotFoundError(f"Processed version not found: {source}")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(source), str(dest))
print(f"Archived: {dataset_name}/{version} → {dest}")
return dest
def summary(self) -> dict:
"""Return a summary of all datasets and their total sizes."""
result = {}
for dataset_dir in self.raw.iterdir():
if not dataset_dir.is_dir():
continue
size = sum(f.stat().st_size for f in dataset_dir.rglob("*") if f.is_file())
result[dataset_dir.name] = {"size_bytes": size, "location": str(dataset_dir)}
return result
# Usage
manager = DatasetManager("/tmp/datasets")
# Ingest a raw file
test_csv = Path("/tmp/train.csv")
test_csv.write_text("id,label,text\n1,pos,great\n2,neg,terrible\n")
ingested = manager.ingest(test_csv, "sentiment_v1")
print(f"Ingested to: {ingested}")
# Create a processed version
output = manager.process("sentiment_v1", "v1.0")
(output / "train_processed.csv").write_text("id,label,embedding\n1,1,[0.1,0.2]\n")
print(json.dumps(manager.summary(), indent=2))
Pattern 3: Build System File Operations
import shutil
import hashlib
from pathlib import Path
from typing import Iterator
def find_changed_files(src_dir: Path, dest_dir: Path) -> Iterator[Path]:
"""
Yield source files that are newer or missing from dest.
Uses content hashing for reliable change detection.
"""
for src_file in src_dir.rglob("*"):
if not src_file.is_file():
continue
rel = src_file.relative_to(src_dir)
dest_file = dest_dir / rel
if not dest_file.exists() or not files_identical(src_file, dest_file):
yield src_file
def files_identical(a: Path, b: Path) -> bool:
"""Compare two files by content hash."""
def md5(path: Path) -> str:
h = hashlib.md5()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
return a.stat().st_size == b.stat().st_size and md5(a) == md5(b)
def incremental_copy(src_dir: str | Path, dest_dir: str | Path) -> int:
"""
Copy only changed files from src to dest.
Returns the number of files copied.
"""
src = Path(src_dir)
dest = Path(dest_dir)
copied = 0
for src_file in find_changed_files(src, dest):
rel = src_file.relative_to(src)
dest_file = dest / rel
dest_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src_file, dest_file)
print(f" Copied: {rel}")
copied += 1
return copied
# Usage
Path("/tmp/build/src").mkdir(parents=True, exist_ok=True)
Path("/tmp/build/src/index.html").write_text("<html>Hello</html>")
Path("/tmp/build/src/style.css").write_text("body { margin: 0; }")
n = incremental_copy("/tmp/build/src", "/tmp/build/dist")
print(f"Copied {n} changed files")
Interview Questions
Q1: What is the difference between os.makedirs() and Path.mkdir(parents=True)?
Answer: Both create a directory and all missing intermediate parent directories. The key differences are API style and error handling. os.makedirs(path, exist_ok=True) is the functional style; Path(path).mkdir(parents=True, exist_ok=True) is the object-oriented pathlib style. Functionally they are equivalent when using exist_ok=True. The pathlib version is preferred in modern Python code because it returns a Path object, integrates with the rest of pathlib's API, and is more readable. Without exist_ok=True, both raise an error if the directory already exists.
Q2: What is the difference between os.scandir() and os.listdir()? When should you use each?
Answer: os.listdir() returns a list of filename strings and requires an additional os.stat() call per file to get metadata (size, type). os.scandir() returns an iterator of DirEntry objects that cache is_file(), is_dir(), and stat() results from the OS-level directory scan - meaning you can check file type and size without extra syscalls. On directories with thousands of files where you need metadata, os.scandir() can be 5–10x faster. Use os.listdir() when you only need file names. Use os.scandir() when you need file attributes as well.
Q3: How do you modify os.walk() to skip certain subdirectories?
Answer: When using os.walk(topdown=True) (the default), you can modify dirnames in-place using slice assignment: dirnames[:] = [d for d in dirnames if d not in skip_set]. This prevents os.walk() from descending into the excluded directories. The critical detail is in-place modification - dirnames[:] = [...] works; dirnames = [...] (rebinding the name) does nothing because os.walk() has already captured a reference to the original list.
Q4: What is the difference between shutil.copy() and shutil.copy2()?
Answer: Both copy file content and permission bits. shutil.copy2() additionally copies file metadata - specifically the access time (atime) and modification time (mtime). This makes copy2() equivalent to cp -p in Unix. Use copy2() when you need to preserve timestamps - for example, when archiving files or creating backups where the original modification time matters. Use copy() for standard copies where timestamp preservation is not required.
Q5: Why is shutil.rmtree() dangerous, and what guard would you add in production code?
Answer: shutil.rmtree() immediately and permanently deletes a directory and all its contents without prompting and without moving to a recycle bin. A typo in the path can destroy important data. Two common production guards:
- Prefix validation: verify the path starts with a known safe base directory before calling
rmtree. - Forbidden path check: explicitly check that the path is not
/,$HOME, or other critical paths.
def safe_rmtree(path, require_prefix):
resolved = Path(path).resolve()
prefix = Path(require_prefix).resolve()
if not str(resolved).startswith(str(prefix)):
raise ValueError(f"Unsafe deletion: {resolved} is not under {prefix}")
shutil.rmtree(resolved)
Q6: When should you use tempfile.TemporaryDirectory() vs tempfile.mkdtemp()?
Answer: Use TemporaryDirectory() as a context manager whenever the temp directory is only needed within a limited scope - it guarantees automatic cleanup even if exceptions occur. Use mkdtemp() when the temp directory must outlive the current scope (e.g., a background worker creates it, a different function cleans it up). With mkdtemp(), you are responsible for calling shutil.rmtree() in a finally block or atexit handler. In practice, TemporaryDirectory() covers 90% of use cases and is strongly preferred because it prevents temp directory leaks.
Practice Challenges
Beginner: Directory Statistics
Write a function that takes a directory path and returns a dict with: total file count, total size in bytes, a breakdown of file counts by extension, and the 5 largest files.
Solution
from pathlib import Path
from collections import Counter
def directory_stats(path: str | Path) -> dict:
"""
Return statistics about all files in a directory tree.
"""
root = Path(path)
if not root.exists():
raise FileNotFoundError(f"Directory not found: {root}")
if not root.is_dir():
raise ValueError(f"Not a directory: {root}")
files = [f for f in root.rglob("*") if f.is_file()]
sizes = [(f, f.stat().st_size) for f in files]
total_size = sum(s for _, s in sizes)
extension_counts = Counter(f.suffix.lower() or "(no ext)" for f in files)
top_5_largest = sorted(sizes, key=lambda x: x[1], reverse=True)[:5]
return {
"total_files": len(files),
"total_size_bytes": total_size,
"total_size_human": format_size(total_size),
"extensions": dict(extension_counts.most_common()),
"largest_files": [
{"path": str(f.relative_to(root)), "size_bytes": s, "size_human": format_size(s)}
for f, s in top_5_largest
],
}
def format_size(n: int) -> str:
for unit in ["B", "KB", "MB", "GB"]:
if n < 1024:
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} TB"
# Demo
import os, json
from pathlib import Path
# Create a test directory
base = Path("/tmp/stats_demo")
(base / "src").mkdir(parents=True, exist_ok=True)
(base / "data").mkdir(exist_ok=True)
(base / "src" / "main.py").write_bytes(b"x" * 1024)
(base / "src" / "utils.py").write_bytes(b"x" * 512)
(base / "data" / "train.csv").write_bytes(b"x" * 50000)
(base / "data" / "test.csv").write_bytes(b"x" * 10000)
(base / "README.md").write_bytes(b"x" * 2048)
stats = directory_stats("/tmp/stats_demo")
print(json.dumps(stats, indent=2))
# {
# "total_files": 5,
# "total_size_bytes": 63584,
# "total_size_human": "62.1 KB",
# "extensions": {".csv": 2, ".py": 2, ".md": 1},
# "largest_files": [
# {"path": "data/train.csv", "size_bytes": 50000, "size_human": "48.8 KB"},
# ...
# ]
# }
Intermediate: Incremental Backup
Build an IncrementalBackup class that:
backup(src, dest): copies only new or modified files fromsrctodestrestore(backup_dir, dest): restores files from a backup to a destination- Uses file modification time to detect changes
- Maintains a manifest file (
manifest.json) listing what was backed up and when
Solution
import json
import shutil
from datetime import datetime
from pathlib import Path
class IncrementalBackup:
"""
Incremental file backup using modification time comparison.
Maintains a JSON manifest for auditing and restore.
"""
MANIFEST_FILE = "backup_manifest.json"
def __init__(self, backup_root: str | Path):
self.backup_root = Path(backup_root)
self.backup_root.mkdir(parents=True, exist_ok=True)
def _manifest_path(self, backup_dir: Path) -> Path:
return backup_dir / self.MANIFEST_FILE
def _load_manifest(self, backup_dir: Path) -> dict:
path = self._manifest_path(backup_dir)
if path.exists():
with path.open("r", encoding="utf-8") as f:
return json.load(f)
return {"files": {}, "created_at": None, "last_backup": None}
def _save_manifest(self, backup_dir: Path, manifest: dict) -> None:
with self._manifest_path(backup_dir).open("w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2)
def backup(self, src: str | Path, name: str) -> dict:
"""
Back up source directory to backup_root/name/.
Only copies files that are new or modified since last backup.
Returns a summary dict.
"""
src = Path(src).resolve()
dest = self.backup_root / name
dest.mkdir(parents=True, exist_ok=True)
manifest = self._load_manifest(dest)
if manifest["created_at"] is None:
manifest["created_at"] = datetime.utcnow().isoformat() + "Z"
copied = 0
skipped = 0
for src_file in src.rglob("*"):
if not src_file.is_file():
continue
rel = str(src_file.relative_to(src))
src_mtime = src_file.stat().st_mtime
last_mtime = manifest["files"].get(rel, {}).get("mtime", 0)
if src_mtime <= last_mtime:
skipped += 1
continue
# Copy changed file
dest_file = dest / rel
dest_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src_file, dest_file)
manifest["files"][rel] = {
"mtime": src_mtime,
"size": src_file.stat().st_size,
"backed_up_at": datetime.utcnow().isoformat() + "Z",
}
copied += 1
manifest["last_backup"] = datetime.utcnow().isoformat() + "Z"
self._save_manifest(dest, manifest)
summary = {"copied": copied, "skipped": skipped, "dest": str(dest)}
print(f"Backup complete: {copied} copied, {skipped} skipped → {dest}")
return summary
def restore(self, name: str, restore_to: str | Path) -> int:
"""
Restore all files from a backup to restore_to directory.
Returns the number of files restored.
"""
backup_dir = self.backup_root / name
if not backup_dir.exists():
raise FileNotFoundError(f"Backup '{name}' not found at {backup_dir}")
dest = Path(restore_to)
dest.mkdir(parents=True, exist_ok=True)
manifest = self._load_manifest(backup_dir)
restored = 0
for rel_path in manifest["files"]:
src_file = backup_dir / rel_path
dest_file = dest / rel_path
if not src_file.exists():
print(f" Warning: backup file missing: {rel_path}")
continue
dest_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src_file, dest_file)
restored += 1
print(f"Restored {restored} files to {dest}")
return restored
# Demo
backup_mgr = IncrementalBackup("/tmp/backups")
# Create source directory
src = Path("/tmp/source_project")
(src / "src").mkdir(parents=True, exist_ok=True)
(src / "data").mkdir(exist_ok=True)
(src / "src" / "app.py").write_text("# app v1")
(src / "data" / "config.json").write_text('{"version": 1}')
# First backup - copies everything
summary = backup_mgr.backup(src, "project_v1")
print(summary) # {'copied': 2, 'skipped': 0, ...}
# Modify one file
(src / "src" / "app.py").write_text("# app v2 - updated")
# Second backup - only copies the changed file
summary = backup_mgr.backup(src, "project_v1")
print(summary) # {'copied': 1, 'skipped': 1, ...}
# Restore to a new location
restored = backup_mgr.restore("project_v1", "/tmp/restored_project")
print(f"Restored {restored} files")
# Verify
print(Path("/tmp/restored_project/src/app.py").read_text())
# # app v2 - updated
Advanced: File System Watcher (Polling-Based)
Build a DirectoryWatcher that monitors a directory for changes (new files, modified files, deleted files) using periodic polling. Run it as a background thread and emit events to a callback.
This is the capstone challenge - it ties together os.scandir(), threading, pathlib, and callback patterns.
Solution
import os
import time
import threading
from pathlib import Path
from typing import Callable
from dataclasses import dataclass
@dataclass
class FileEvent:
event_type: str # "created", "modified", "deleted"
path: str
size: int = 0
mtime: float = 0.0
class DirectoryWatcher:
"""
Polls a directory for file changes and calls a callback for each event.
Note: For production use, prefer the `watchdog` library which uses
OS-level inotify/FSEvents/ReadDirectoryChanges instead of polling.
This polling implementation demonstrates the core concept clearly.
"""
def __init__(
self,
directory: str | Path,
callback: Callable[[FileEvent], None],
interval_seconds: float = 1.0,
recursive: bool = True,
):
self.directory = Path(directory)
self.callback = callback
self.interval = interval_seconds
self.recursive = recursive
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
self._snapshot: dict[str, tuple[float, int]] = {} # path → (mtime, size)
def _scan(self) -> dict[str, tuple[float, int]]:
"""Scan directory and return {path: (mtime, size)} for all files."""
result = {}
pattern = "**/*" if self.recursive else "*"
for p in self.directory.glob(pattern):
if not p.is_file():
continue
try:
stat = p.stat()
result[str(p)] = (stat.st_mtime, stat.st_size)
except OSError:
pass # File deleted between glob and stat
return result
def _poll(self) -> None:
"""Main polling loop - runs in background thread."""
self._snapshot = self._scan()
while not self._stop_event.is_set():
time.sleep(self.interval)
current = self._scan()
current_keys = set(current)
prev_keys = set(self._snapshot)
# Created: in current but not in previous
for path in current_keys - prev_keys:
mtime, size = current[path]
self.callback(FileEvent("created", path, size, mtime))
# Deleted: in previous but not in current
for path in prev_keys - current_keys:
self.callback(FileEvent("deleted", path))
# Modified: in both but mtime or size changed
for path in current_keys & prev_keys:
prev_mtime, prev_size = self._snapshot[path]
curr_mtime, curr_size = current[path]
if curr_mtime != prev_mtime or curr_size != prev_size:
self.callback(FileEvent("modified", path, curr_size, curr_mtime))
self._snapshot = current
def start(self) -> None:
"""Start watching in a background thread."""
if self._thread and self._thread.is_alive():
raise RuntimeError("Watcher is already running")
self._stop_event.clear()
self._thread = threading.Thread(target=self._poll, daemon=True, name="DirWatcher")
self._thread.start()
print(f"Watching: {self.directory}")
def stop(self) -> None:
"""Stop the watcher and wait for the thread to finish."""
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5)
print("Watcher stopped.")
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.stop()
# Demo
import json
from pathlib import Path
watch_dir = Path("/tmp/watched")
watch_dir.mkdir(exist_ok=True)
events = []
def on_event(event: FileEvent) -> None:
print(f" [{event.event_type.upper():8}] {Path(event.path).name}"
+ (f" ({event.size} bytes)" if event.size else ""))
events.append(event)
print("=== Directory Watcher Demo ===\n")
with DirectoryWatcher(watch_dir, on_event, interval_seconds=0.5) as watcher:
time.sleep(0.6) # Let first scan complete
print("Creating files...")
(watch_dir / "data.csv").write_text("id,value\n1,100")
time.sleep(0.6)
print("Modifying file...")
(watch_dir / "data.csv").write_text("id,value\n1,100\n2,200")
time.sleep(0.6)
print("Creating another file...")
(watch_dir / "config.json").write_text('{"key": "value"}')
time.sleep(0.6)
print("Deleting file...")
(watch_dir / "data.csv").unlink()
time.sleep(0.6)
print(f"\nTotal events captured: {len(events)}")
for e in events:
print(f" {e.event_type}: {Path(e.path).name}")
# Clean up
import shutil
shutil.rmtree(watch_dir)
Quick Reference
| Operation | os module | pathlib | shutil |
|---|---|---|---|
| Current directory | os.getcwd() | Path.cwd() | - |
| Make directory | os.makedirs(p, exist_ok=True) | Path(p).mkdir(parents=True, exist_ok=True) | - |
| List contents | os.listdir(p) | Path(p).iterdir() | - |
| List with stats | os.scandir(p) | Path(p).iterdir() + .stat() | - |
| Recursive walk | os.walk(p) | Path(p).rglob("*") | - |
| Copy file | - | - | shutil.copy2(src, dst) |
| Copy tree | - | - | shutil.copytree(src, dst) |
| Move/rename | os.rename(src, dst) | Path(src).rename(dst) | shutil.move(src, dst) |
| Delete file | os.unlink(p) | Path(p).unlink() | - |
| Delete empty dir | os.rmdir(p) | Path(p).rmdir() | - |
| Delete tree | - | - | shutil.rmtree(p) |
| Disk usage | - | - | shutil.disk_usage(p) |
| File size | os.stat(p).st_size | Path(p).stat().st_size | - |
| Temp dir (auto) | - | - | tempfile.TemporaryDirectory() |
| Temp dir (manual) | - | - | tempfile.mkdtemp() |
| Glob pattern | glob.glob(p, recursive=True) | Path(p).glob(pat) / .rglob(pat) | - |
Key Takeaways
- Always use
Path.mkdir(parents=True, exist_ok=True)for directory creation - it is idempotent and safe to call even when the directory already exists os.scandir()is significantly faster thanos.listdir()when you also need file metadata - it cachesis_file(),is_dir(), andstat()from the OS-level directory scanos.walk()is the right tool when you need directory structure context; usedirnames[:] = [...](in-place!) to prune subtrees; usePath.rglob()when you just need matching filesshutil.copy2()preserves timestamps (closest tocp -p);shutil.copy()does not;shutil.copytree()copies entire trees and supportsignore=patternsshutil.rmtree()is permanent and irreversible - always add path prefix validation before calling it in production codetempfile.TemporaryDirectory()as a context manager is the preferred pattern for temporary directories - it guarantees cleanup even if exceptions occurPath.rename()is atomic on the same filesystem - use it with a temp file for atomic writes; useshutil.move()for cross-filesystem moves- This module's tools - pathlib, os, context managers, file I/O, and directory operations - combine to handle the full spectrum of real-world filesystem work: scaffolding, log rotation, dataset management, build systems, and backup tools
