CSV Handling - Reading and Writing Tabular Data
Reading time: ~18 minutes | Level: Foundation → Engineering
Here is a behavior that catches almost every developer the first time:
import csv
# Write a row containing a comma inside a field
with open("/tmp/test.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["Alice", "New York, NY", "Engineer"])
# Read it back
with open("/tmp/test.csv", newline="") as f:
reader = csv.reader(f)
for row in reader:
print(row)
print(f"Fields: {len(row)}")
Output:
['Alice', 'New York, NY', 'Engineer']
Fields: 3
CSV contains a comma inside "New York, NY" - yet the reader correctly produces 3 fields, not 4. This works because csv.writer automatically quotes values that contain the delimiter. The csv module handles the quoting and escaping; you should never manually split CSV lines with str.split(",").
What You Will Learn
- What CSV is: the RFC 4180 standard, its limitations, and common real-world variations
csv.readerandcsv.writer: the low-level row-oriented interfacecsv.DictReaderandcsv.DictWriter: the dictionary interface - the production standard- CSV dialects: how to handle tab-separated, semicolon-separated, and custom formats
csv.Sniffer: automatically detecting delimiters and quoting from an unknown file- Encoding: why you must use
encoding='utf-8-sig'to handle Excel-generated files - Quoting modes:
QUOTE_MINIMAL,QUOTE_ALL,QUOTE_NONNUMERIC,QUOTE_NONE - Streaming large CSV files row by row versus loading everything into memory
- When the
csvmodule is enough and when to reach forpandas
Prerequisites
- Python file I/O:
open(), context managers (withstatement) - Basic Python data structures: lists and dictionaries
- Understanding of Python iteration and
forloops - Knowledge of string encoding (UTF-8 vs ASCII) is helpful
What Is CSV?
CSV stands for "comma-separated values." It is one of the oldest and most universal data exchange formats - supported by every spreadsheet application, database system, and data pipeline tool in existence.
RFC 4180 CSV Format Rules:
- Each record on a separate line, ending with CRLF (
\r\n) - Last record may or may not have a trailing line break
- First record MAY be a header row
- Fields separated by commas
- Fields containing commas, double-quotes, or line breaks MUST be enclosed in double-quotes
- A double-quote inside a quoted field is escaped as two double-quotes (
"")
name,location,bio
Alice,"New York, NY","Software engineer, 10 years"
Bob,"London","He said ""hello"" and left."
Carol,"Austin","
Multi-line bio"
Common real-world variations (NOT RFC 4180): semicolon delimiter (European Excel exports), tab delimiter (TSV), single quotes, LF-only line endings, missing quoting around fields with embedded commas.
:::warning Never Parse CSV With str.split(",")
CSV fields can contain commas inside quoted strings, newlines, and escaped double-quotes. Manual splitting with line.split(",") breaks on all of these. Always use the csv module - it implements the full RFC 4180 parser in C, making it both correct and fast.
:::
Part 1 - csv.reader: Low-Level Row Iteration
csv.reader wraps a file object and yields each row as a list of strings.
import csv
# Sample CSV file (pretend this is on disk)
# name,department,salary,start_date
# Alice,Engineering,95000,2020-03-15
# Bob,Marketing,72000,2021-07-01
# Carol,Engineering,110000,2019-01-20
with open("employees.csv", newline="") as f:
reader = csv.reader(f)
for row in reader:
print(row)
# ['name', 'department', 'salary', 'start_date']
# ['Alice', 'Engineering', '95000', '2020-03-15']
# ['Bob', 'Marketing', '72000', '2021-07-01']
# ['Carol', 'Engineering', '110000', '2019-01-20']
:::tip Always Open CSV Files with newline=""
The Python documentation requires opening CSV files with newline="". This disables universal newlines translation and lets the csv module handle line endings itself. Without it, embedded newlines inside quoted fields get mangled on Windows.
:::
Skipping the Header Row
import csv
with open("employees.csv", newline="") as f:
reader = csv.reader(f)
header = next(reader) # Read and discard the header
print(f"Columns: {header}")
for row in reader:
name, dept, salary, start = row
print(f"{name} earns ${int(salary):,} in {dept}")
# Columns: ['name', 'department', 'salary', 'start_date']
# Alice earns $95,000 in Engineering
# Bob earns $72,000 in Marketing
# Carol earns $110,000 in Engineering
Reading Into a List (Small Files)
import csv
def load_csv(filepath):
"""Load a small CSV file into a list of lists."""
with open(filepath, newline="", encoding="utf-8") as f:
reader = csv.reader(f)
return list(reader)
rows = load_csv("employees.csv")
header = rows[0]
data = rows[1:]
print(f"{len(data)} employees, columns: {header}")
Part 2 - csv.writer: Writing Rows
csv.writer wraps a file object and provides a writerow() method that handles all quoting and escaping for you.
import csv
# Write a new CSV file
employees = [
["name", "department", "salary", "start_date"], # header
["Alice", "Engineering", 95000, "2020-03-15"],
["Bob", "Marketing", 72000, "2021-07-01"],
["Carol", "Engineering", 110000, "2019-01-20"],
]
with open("employees_out.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerows(employees) # writerows() = multiple rows at once
# The output file:
# name,department,salary,start_date
# Alice,Engineering,95000,2020-03-15
# Bob,Marketing,72000,2021-07-01
# Carol,Engineering,110000,2019-01-20
Handling Special Characters Automatically
import csv
# csv.writer handles quoting automatically
tricky_data = [
["id", "name", "notes"],
[1, "Alice", "Works on NLP, ML"], # comma in field
[2, "Bob", 'Said "hello" to everyone'], # quotes in field
[3, "Carol", "Multi-line\nnotes here"], # newline in field
]
with open("tricky.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerows(tricky_data)
# File contents:
# id,name,notes
# 1,Alice,"Works on NLP, ML"
# 2,Bob,"Said ""hello"" to everyone"
# 3,Carol,"Multi-line
# notes here"
# Round-trip: read it back and verify
with open("tricky.csv", newline="", encoding="utf-8") as f:
reader = csv.reader(f)
for row in reader:
print(row)
# ['id', 'name', 'notes']
# ['1', 'Alice', 'Works on NLP, ML']
# ['2', 'Bob', 'Said "hello" to everyone']
# ['3', 'Carol', 'Multi-line\nnotes here']
Part 3 - csv.DictReader: The Production Standard
csv.DictReader reads each row as an OrderedDict (Python 3.7: regular dict) keyed by the header row. This is the preferred interface for production code because:
- No positional indexing -
row["salary"]instead ofrow[2] - Column additions/removals do not break your indexing
- Self-documenting code
- Robust to column reordering
import csv
with open("employees.csv", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
# reader.fieldnames is available after the first next() call
for row in reader:
# row is a dict: {'name': 'Alice', 'department': 'Engineering', ...}
print(f"{row['name']:10} dept={row['department']:15} salary={row['salary']}")
# Alice dept=Engineering salary=95000
# Bob dept=Marketing salary=72000
# Carol dept=Engineering salary=110000
Accessing Column Names
import csv
with open("employees.csv", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
# Must iterate at least once (or call next()) to populate fieldnames
first_row = next(reader)
print(reader.fieldnames) # ['name', 'department', 'salary', 'start_date']
Custom Header When File Has No Header Row
import csv
# If the CSV has no header, provide fieldnames manually
fieldnames = ["id", "name", "email", "signup_date"]
with open("users_no_header.csv", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f, fieldnames=fieldnames)
for row in reader:
print(f"{row['name']} <{row['email']}>")
Part 4 - csv.DictWriter: Writing Dicts to CSV
csv.DictWriter is the writer counterpart to DictReader. You provide a list of fieldnames, then write dicts.
import csv
from datetime import date
employees = [
{"name": "Alice", "department": "Engineering", "salary": 95000, "start_date": "2020-03-15"},
{"name": "Bob", "department": "Marketing", "salary": 72000, "start_date": "2021-07-01"},
{"name": "Carol", "department": "Engineering", "salary": 110000, "start_date": "2019-01-20"},
]
fieldnames = ["name", "department", "salary", "start_date"]
with open("employees_out.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader() # Write the header row
writer.writerows(employees) # Write all data rows
Handling Extra or Missing Fields
import csv
fieldnames = ["name", "department", "salary"]
# Row with an extra field
row_with_extra = {"name": "Dave", "department": "HR", "salary": 68000, "age": 35}
with open("out.csv", "w", newline="", encoding="utf-8") as f:
# extrasaction="ignore" silently drops fields not in fieldnames
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
writer.writerow(row_with_extra)
# Row with a missing field - use restval to fill missing values
row_with_missing = {"name": "Eve", "department": "Design"}
with open("out.csv", "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, restval="N/A")
writer.writerow(row_with_missing)
# Eve,Design,N/A
Part 5 - Dialects: Handling Non-Standard CSV Formats
A dialect is a named collection of formatting parameters. Python's csv module ships with three built-in dialects:
| Dialect | delimiter | lineterminator | quoting | Notes |
|---|---|---|---|---|
csv.excel (default) | , | \r\n | QUOTE_MINIMAL | quotechar='"', doublequote=True, skipinitialspace=False |
csv.excel_tab | \t | \r\n | QUOTE_MINIMAL | TSV files |
csv.unix_dialect | , | \n | QUOTE_ALL | Unix line endings |
Using Built-in Dialects
import csv
# Read a tab-separated file
with open("data.tsv", newline="", encoding="utf-8") as f:
reader = csv.reader(f, dialect="excel-tab")
for row in reader:
print(row)
# Or equivalently, use delimiter parameter directly
with open("data.tsv", newline="", encoding="utf-8") as f:
reader = csv.reader(f, delimiter="\t")
for row in reader:
print(row)
Registering a Custom Dialect
import csv
# European CSV: semicolons as delimiter, comma as decimal separator
# e.g., "Alice";1.234,56;"Senior Engineer"
csv.register_dialect(
"european",
delimiter=";",
quotechar='"',
doublequote=True,
skipinitialspace=True,
lineterminator="\r\n",
)
with open("european_data.csv", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f, dialect="european")
for row in reader:
print(row)
# Pipe-separated with single quotes
csv.register_dialect(
"pipe_separated",
delimiter="|",
quotechar="'",
doublequote=False,
escapechar="\\",
)
# List all registered dialects
print(csv.list_dialects())
# ['excel', 'excel-tab', 'unix', 'european', 'pipe_separated']
Part 6 - csv.Sniffer: Auto-Detecting the Dialect
When you receive a CSV file from an external system and do not know its format, csv.Sniffer can detect the delimiter and quoting style automatically.
import csv
def read_unknown_csv(filepath, sample_bytes=4096):
"""
Read a CSV file whose dialect is unknown.
Sniff the first N bytes to detect format, then read the rest.
"""
with open(filepath, newline="", encoding="utf-8") as f:
sample = f.read(sample_bytes)
f.seek(0) # Reset to the beginning
sniffer = csv.Sniffer()
try:
dialect = sniffer.sniff(sample)
has_header = sniffer.has_header(sample)
except csv.Error:
# Sniffer failed - fall back to default
dialect = csv.excel
has_header = True
print(f"Detected: delimiter={dialect.delimiter!r} "
f"quotechar={dialect.quotechar!r} "
f"has_header={has_header}")
reader = csv.DictReader(f, dialect=dialect) if has_header else csv.reader(f, dialect=dialect)
return list(reader)
# Sniffing works well for standard delimiters (comma, tab, semicolon, pipe)
# It can be fooled by short files or highly regular data
# Always add a fallback for production use
:::warning Sniffer Has Limits
csv.Sniffer works best on files with at least a few hundred bytes of varied data. It can be confused by:
- Files with only one column (no delimiter to detect)
- Files where the header and data rows have different patterns
- Short files with only one or two rows
For production pipelines where you control the source, explicitly specify the dialect rather than sniffing. :::
Part 7 - Encoding: The Excel BOM Problem
Excel saves CSV files with a BOM (Byte Order Mark) at the start - the bytes \xef\xbb\xbf that signal UTF-8 encoding. Python's standard utf-8 codec treats these as part of the first field name, producing a corrupted header like '\ufeffname' instead of 'name'.
import csv
# The problem
with open("excel_export.csv", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
first_row = next(reader)
print(list(first_row.keys()))
# ['\ufeffname', 'department', 'salary'] <- BOM in first key!
# This breaks: first_row['name'] raises KeyError
# first_row['\ufeffname'] would work but is terrible
import csv
# The fix: use utf-8-sig codec, which strips the BOM automatically
with open("excel_export.csv", newline="", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
first_row = next(reader)
print(list(first_row.keys()))
# ['name', 'department', 'salary'] <- clean!
| Encoding | When to use |
|---|---|
'utf-8' | Your code controls the format; files from non-Excel sources |
'utf-8-sig' | Reading Excel exports (strips BOM); writing files for Excel (adds BOM) |
'latin-1' / 'cp1252' | Old database exports, pre-2010 Excel files; try if UnicodeDecodeError with utf-8 |
Writing Excel-Compatible CSV
import csv
data = [
{"name": "Alice", "notes": "Müller - Unicode test"},
{"name": "Bob", "notes": "Résumé with accents"},
]
# Write with BOM so Excel opens it correctly without encoding dialog
with open("for_excel.csv", "w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=["name", "notes"])
writer.writeheader()
writer.writerows(data)
Part 8 - Quoting Modes
The quoting parameter controls when csv.writer adds quote characters around fields:
import csv
data = [["name", "salary", "notes"],
["Alice", "95000", "Senior engineer"],
["Bob", "72000", ""], # empty field
]
# QUOTE_MINIMAL (default): only quote when necessary
with open("minimal.csv", "w", newline="") as f:
writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
writer.writerows(data)
# name,salary,notes
# Alice,95000,Senior engineer
# Bob,72000,
# QUOTE_ALL: quote every field, always
with open("all.csv", "w", newline="") as f:
writer = csv.writer(f, quoting=csv.QUOTE_ALL)
writer.writerows(data)
# "name","salary","notes"
# "Alice","95000","Senior engineer"
# "Bob","72000",""
# QUOTE_NONNUMERIC: quote all non-numeric fields
# Reader with this mode converts unquoted fields to float
with open("nonnumeric.csv", "w", newline="") as f:
writer = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC)
writer.writerows(data)
# "name","salary","notes"
# "Alice",95000,"Senior engineer"
# "Bob",72000,""
# QUOTE_NONE: never add quotes; use escapechar instead
with open("none.csv", "w", newline="") as f:
writer = csv.writer(f, quoting=csv.QUOTE_NONE, escapechar="\\")
writer.writerows(data)
# name,salary,notes
# Alice,95000,Senior engineer
# Bob,72000,
Part 9 - Streaming Large CSV Files
When a CSV file is larger than available RAM, loading it with list(reader) is not an option. Stream it row by row.
The Wrong Way (Loads Everything Into Memory)
import csv
# BAD for large files - loads entire CSV into RAM
with open("huge_sales_data.csv", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
all_rows = list(reader) # 10GB file = 10GB+ in RAM = OOM crash
total = sum(float(row["amount"]) for row in all_rows)
The Right Way: Process Row by Row
import csv
def compute_total_revenue(filepath):
"""
Compute total revenue from a large CSV file.
Streams row by row - O(1) memory regardless of file size.
"""
total = 0.0
row_count = 0
error_count = 0
with open(filepath, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
try:
amount = float(row["amount"])
total += amount
row_count += 1
except (ValueError, KeyError) as e:
error_count += 1
# Log and continue - don't abort on one bad row
print(f"Processed {row_count:,} rows ({error_count} errors)")
print(f"Total revenue: ${total:,.2f}")
return total
# This works on a 50GB file with 64MB of RAM
Chunked Processing for Batch Operations
import csv
from itertools import islice
def process_in_chunks(filepath, chunk_size=1000):
"""
Process a large CSV in chunks of N rows.
Useful for batch database inserts or API calls.
"""
with open(filepath, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
while True:
# Read next chunk_size rows
chunk = list(islice(reader, chunk_size))
if not chunk:
break
yield chunk
print(f"Processed chunk of {len(chunk)} rows")
# Usage
for batch in process_in_chunks("large_users.csv", chunk_size=500):
# Insert batch into database in a single transaction
insert_users_batch(batch) # Your DB function
Part 10 \text{---} CSV vs pandas: Know When to Switch
Use csv module | Use pandas |
|---|---|
| Simple read/write of tabular data | Filtering, grouping, or aggregating data |
| No numpy/pandas dependency (minimal envs) | Joining/merging multiple tables |
| Streaming very large files row by row | Statistical analysis or computation |
| Simple ETL (extract, transform, write) | Data cleaning (fillna, dropna, type inference) |
| Reading config or lookup tables | Plotting or visualization |
| Just moving data around, no computation | Reading multiple formats (xlsx, parquet, JSON) |
| Startup time matters (stdlib, zero import cost) | Machine learning feature engineering |
import csv
import pandas as pd
# csv module: read a file and compute a total \text{---} perfectly fine
total = 0.0
with open("sales.csv", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
total += float(row["amount"])
print(f"Total: ${total:,.2f}")
# pandas: same operation - more concise but heavier dependency
df = pd.read_csv("sales.csv")
print(f"Total: ${df['amount'].sum():,.2f}")
# pandas shines for complex operations the csv module cannot do cleanly
df = pd.read_csv("sales.csv")
summary = (
df.groupby("department")["amount"]
.agg(["sum", "mean", "count"])
.sort_values("sum", ascending=False)
)
print(summary)
Part 11 \text{---} Real-World: ETL Pipeline
Here is a complete, production-quality ETL (extract-transform-load) script that reads a raw database export CSV, cleans and validates each row, and writes a cleaned output CSV:
import csv
import re
import sys
from datetime import datetime
from pathlib import Path
def parse_date(value, formats=("\%Y-\%m-\%d", "\%m/\%d/\%Y", "\%d-\%m-\%Y")):
"""Try multiple date formats; return ISO string or None."""
for fmt in formats:
try:
return datetime.strptime(value.strip(), fmt).strftime("\%Y-\%m-\%d")
except ValueError:
continue
return None
def clean_phone(value):
"""Normalize phone numbers to +1XXXXXXXXXX or None."""
digits = re.sub(r"\D", "", value)
if len(digits) == 10:
return f"+1{digits}"
elif len(digits) == 11 and digits[0] == "1":
return f"+{digits}"
return None
def validate_email(value):
"""Basic email validation."""
pattern = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
return value.strip().lower() if re.match(pattern, value.strip()) else None
class ETLPipeline:
"""
CSV ETL pipeline: read raw export, validate/transform, write clean output.
Streams row by row - handles files of any size.
"""
INPUT_FIELDS = ["id", "full_name", "email", "phone", "signup_date", "plan"]
OUTPUT_FIELDS = ["id", "first_name", "last_name", "email", "phone",
"signup_date", "plan", "is_valid"]
def __init__(self, input_path, output_path, error_path=None):
self.input_path = Path(input_path)
self.output_path = Path(output_path)
self.error_path = Path(error_path) if error_path else None
self.stats = {
"total": 0,
"valid": 0,
"invalid": 0,
"errors": [],
}
def transform_row(self, row):
"""
Validate and transform one row.
Returns (cleaned_row, list_of_errors).
"""
errors = []
cleaned = {}
# ID
cleaned["id"] = row.get("id", "").strip()
if not cleaned["id"].isdigit():
errors.append("id: must be a positive integer")
# Name: split into first/last
full_name = row.get("full_name", "").strip()
parts = full_name.split(None, 1) # split on first whitespace
cleaned["first_name"] = parts[0] if parts else ""
cleaned["last_name"] = parts[1] if len(parts) > 1 else ""
if not cleaned["first_name"]:
errors.append("full_name: empty or missing")
# Email
email = validate_email(row.get("email", ""))
if email:
cleaned["email"] = email
else:
cleaned["email"] = ""
errors.append(f"email: invalid ({row.get('email', '')})")
# Phone (optional)
phone_raw = row.get("phone", "").strip()
if phone_raw:
phone = clean_phone(phone_raw)
cleaned["phone"] = phone or ""
if not phone:
errors.append(f"phone: invalid format ({phone_raw})")
else:
cleaned["phone"] = ""
# Date
date_raw = row.get("signup_date", "").strip()
date_iso = parse_date(date_raw)
cleaned["signup_date"] = date_iso or ""
if not date_iso:
errors.append(f"signup_date: unrecognized format ({date_raw})")
# Plan
valid_plans = {"free", "starter", "professional", "enterprise"}
plan = row.get("plan", "").strip().lower()
cleaned["plan"] = plan if plan in valid_plans else ""
if plan not in valid_plans:
errors.append(f"plan: unknown value ({plan})")
cleaned["is_valid"] = "true" if not errors else "false"
return cleaned, errors
def run(self):
"""Execute the ETL pipeline."""
with (
open(self.input_path, newline="", encoding="utf-8-sig") as infile,
open(self.output_path, "w", newline="", encoding="utf-8") as outfile,
):
reader = csv.DictReader(infile)
writer = csv.DictWriter(
outfile,
fieldnames=self.OUTPUT_FIELDS,
extrasaction="ignore",
)
writer.writeheader()
# Optional error file
error_writer = None
error_file = None
if self.error_path:
error_file = open(self.error_path, "w", newline="", encoding="utf-8")
error_writer = csv.DictWriter(
error_file,
fieldnames=self.INPUT_FIELDS + ["_errors"],
extrasaction="ignore",
)
error_writer.writeheader()
try:
for row in reader:
self.stats["total"] += 1
cleaned, errors = self.transform_row(row)
writer.writerow(cleaned)
if errors:
self.stats["invalid"] += 1
if error_writer:
error_row = dict(row)
error_row["_errors"] = "; ".join(errors)
error_writer.writerow(error_row)
else:
self.stats["valid"] += 1
finally:
if error_file:
error_file.close()
self._print_summary()
def _print_summary(self):
s = self.stats
pct_valid = 100 * s["valid"] / s["total"] if s["total"] else 0
print(f"ETL complete:")
print(f" Input: {self.input_path}")
print(f" Output: {self.output_path}")
print(f" Total: {s['total']:,} rows")
print(f" Valid: {s['valid']:,} ({pct_valid:.1f}%)")
print(f" Invalid: {s['invalid']:,}")
if self.error_path:
print(f" Errors: {self.error_path}")
# Run the pipeline
if __name__ == "__main__":
pipeline = ETLPipeline(
input_path="raw_users_export.csv",
output_path="cleaned_users.csv",
error_path="validation_errors.csv",
)
pipeline.run()
# Output:
# ETL complete:
# Input: raw_users_export.csv
# Output: cleaned_users.csv
# Total: 45,231 rows
# Valid: 43,888 (97.0%)
# Invalid: 1,343
# Errors: validation_errors.csv
Interview Questions
Q1: Why should you never parse CSV files with line.split(",")?
Answer: RFC 4180 CSV allows fields to contain commas, double-quotes, and newlines as long as the field is enclosed in double-quotes. str.split(",") has no concept of quoting - it splits on every comma, producing the wrong number of fields for any quoted value containing a comma.
Example: Alice,"New York, NY",Engineer has 3 fields. line.split(",") produces ['Alice', '"New York', ' NY"', 'Engineer'] - 4 elements, all wrong. The csv module implements the full RFC 4180 parser in C and handles all edge cases correctly.
Q2: Why must CSV files be opened with newline="", and what happens if you omit it?
Answer: On Windows, Python's default file mode translates \r\n to \n (universal newlines). This is helpful for text files but breaks CSV parsing in two ways: (1) the csv module's own newline handling runs on already-translated data, causing double processing; (2) embedded newlines inside quoted fields get mangled because the \r is stripped before the csv parser sees it.
Opening with newline="" disables the universal newlines translation and passes the raw bytes to the csv module, which handles line endings according to the dialect (typically \r\n for excel dialect). The Python documentation explicitly requires this.
Q3: What is the difference between csv.reader and csv.DictReader, and when should you use each?
Answer: csv.reader yields each row as a list of strings. Column access is positional: row[0], row[2]. This is fragile - if columns are reordered or new columns are inserted, all your index references break.
csv.DictReader yields each row as a dictionary keyed by the header row values. Column access is by name: row["salary"]. This is robust to column reordering and additions, and produces self-documenting code.
Use csv.reader only when: the file has no header row and you are processing every column uniformly, or you need maximum performance for a very tight loop. Use csv.DictReader for all production code that accesses specific named columns.
Q4: What is the Excel BOM problem with CSV files, and how do you fix it?
Answer: Excel saves UTF-8 CSV files with a BOM (Byte Order Mark) - the bytes \xef\xbb\xbf prepended to the file to signal UTF-8 encoding. When you open such a file with encoding='utf-8', Python includes the BOM bytes as part of the first field value, typically corrupting the first column name. reader.fieldnames becomes ['\ufeffname', 'department', ...] instead of ['name', 'department', ...], which breaks any lookup on the first field.
Fix: open the file with encoding='utf-8-sig'. This codec automatically strips the BOM when reading and optionally adds it when writing. It is the correct encoding for any CSV file that originates from Excel.
Q5: What are the four CSV quoting modes and when would you use each?
Answer:
csv.QUOTE_MINIMAL(default): quote fields only when they contain the delimiter, quotechar, or line terminator. Produces the most compact output.csv.QUOTE_ALL: quote every field unconditionally. Useful when the receiving system is strict about format or cannot detect types correctly.csv.QUOTE_NONNUMERIC: quote all non-numeric fields; leave integers and floats unquoted. When used incsv.reader, unquoted fields are automatically converted tofloat. Useful for data where numeric type must be preserved.csv.QUOTE_NONE: never quote fields. Requires anescapecharto be set so the delimiter can appear in field values. Rarely used - produces output that may be ambiguous.
Q6: How do you efficiently process a 50GB CSV file in Python without running out of memory?
Answer: Use the csv module's iterator protocol - csv.DictReader is an iterator that reads one row at a time without loading the entire file. Never call list(reader) on large files.
import csv
with open("huge.csv", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
process(row) # O(1) memory - one row at a time
For batch operations (e.g., inserting into a database in chunks), use itertools.islice(reader, chunk_size) to collect N rows at a time without loading everything. The key is that csv.DictReader reads from the file lazily - each call to next() reads and parses one line. This works for files of any size as long as individual rows fit in memory (which they always do).
Practice Challenges
Beginner - CSV Round-Trip
Write a create_sample_csv(filepath, num_rows) function that generates a sample CSV with columns id, name, score (random float 0-100), and grade (A/B/C/D/F based on score). Then write a read_and_summarize(filepath) function that reads it back and prints the average score and grade distribution. Use csv.DictWriter to write and csv.DictReader to read.
Solution
import csv
import random
import string
from collections import Counter
def make_random_name():
"""Generate a random two-word name."""
first_names = ["Alice", "Bob", "Carol", "Dave", "Eve", "Frank", "Grace", "Hank"]
last_names = ["Smith", "Jones", "Brown", "Davis", "Wilson", "Moore", "Taylor"]
return f"{random.choice(first_names)} {random.choice(last_names)}"
def score_to_grade(score):
"""Convert numeric score to letter grade."""
if score >= 90: return "A"
if score >= 80: return "B"
if score >= 70: return "C"
if score >= 60: return "D"
return "F"
def create_sample_csv(filepath, num_rows=100, seed=42):
"""
Generate a sample CSV file with student data.
Args:
filepath: output file path
num_rows: number of student rows to generate
seed: random seed for reproducibility
"""
random.seed(seed)
fieldnames = ["id", "name", "score", "grade"]
with open(filepath, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for i in range(1, num_rows + 1):
score = round(random.uniform(45.0, 100.0), 2)
writer.writerow({
"id": i,
"name": make_random_name(),
"score": score,
"grade": score_to_grade(score),
})
print(f"Created {filepath} with {num_rows} rows")
def read_and_summarize(filepath):
"""
Read the sample CSV and compute summary statistics.
Returns:
dict with keys: avg_score, grade_distribution, total_rows
"""
scores = []
grades = []
with open(filepath, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
scores.append(float(row["score"]))
grades.append(row["grade"])
grade_dist = Counter(grades)
avg = sum(scores) / len(scores) if scores else 0
print(f"\nSummary of {filepath}:")
print(f" Total students: {len(scores)}")
print(f" Average score: {avg:.2f}")
print(f" Grade distribution:")
for grade in ["A", "B", "C", "D", "F"]:
count = grade_dist.get(grade, 0)
pct = 100 * count / len(scores) if scores else 0
bar = "#" * int(pct / 2)
print(f" {grade}: {count:3d} ({pct:4.1f}%) {bar}")
return {
"avg_score": avg,
"grade_distribution": dict(grade_dist),
"total_rows": len(scores),
}
# Demo
if __name__ == "__main__":
path = "/tmp/students.csv"
create_sample_csv(path, num_rows=200)
stats = read_and_summarize(path)
# Output:
# Created /tmp/students.csv with 200 rows
#
# Summary of /tmp/students.csv:
# Total students: 200
# Average score: 75.43
# Grade distribution:
# A: 38 (19.0%) #########
# B: 42 (21.0%) ##########
# C: 45 (22.5%) ###########
# D: 36 (18.0%) #########
# F: 39 (19.5%) #########
Intermediate - Multi-Format CSV Merger
Write a function merge_csv_files(input_files, output_file) that:
- Accepts a list of CSV file paths that may use different delimiters (comma, tab, semicolon)
- Uses
csv.Snifferto auto-detect each file's dialect - Requires all files to have the same column names (case-insensitive) - raises
ValueErrorif they differ - Merges all rows into a single output CSV with a consistent format
- Adds a
source_filecolumn to each row indicating which input file it came from
Solution
import csv
import os
from pathlib import Path
def detect_dialect(filepath, sample_bytes=8192):
"""
Detect the CSV dialect of a file.
Falls back to excel dialect if sniffing fails.
"""
with open(filepath, newline="", encoding="utf-8-sig") as f:
sample = f.read(sample_bytes)
sniffer = csv.Sniffer()
try:
dialect = sniffer.sniff(sample, delimiters=",\t;|")
return dialect
except csv.Error:
return csv.excel
def get_fieldnames(filepath, dialect):
"""Read the header row of a CSV file."""
with open(filepath, newline="", encoding="utf-8-sig") as f:
reader = csv.reader(f, dialect=dialect)
header = next(reader)
return [col.strip() for col in header]
def merge_csv_files(input_files, output_file):
"""
Merge multiple CSV files with potentially different dialects.
All files must have the same columns (case-insensitive).
Args:
input_files: list of input file paths
output_file: path to write merged output
Returns:
dict with stats: {'total_rows': int, 'files_merged': int}
Raises:
ValueError: if input files have inconsistent column names
FileNotFoundError: if any input file is missing
"""
if not input_files:
raise ValueError("No input files provided")
# Phase 1: detect dialects and validate column consistency
file_info = []
reference_cols = None
for filepath in input_files:
if not os.path.exists(filepath):
raise FileNotFoundError(f"Input file not found: {filepath}")
dialect = detect_dialect(filepath)
cols = get_fieldnames(filepath, dialect)
cols_normalized = [c.lower() for c in cols]
if reference_cols is None:
reference_cols = cols_normalized
reference_file = filepath
elif cols_normalized != reference_cols:
raise ValueError(
f"Column mismatch:\n"
f" {reference_file}: {reference_cols}\n"
f" {filepath}: {cols_normalized}"
)
file_info.append({
"path": filepath,
"dialect": dialect,
"delimiter": dialect.delimiter,
})
print(f" {Path(filepath).name}: delimiter={dialect.delimiter!r} "
f"cols={len(cols)}")
# Use normalized (lowercase) field names from the first file
output_fieldnames = reference_cols + ["source_file"]
total_rows = 0
# Phase 2: merge all files into output
with open(output_file, "w", newline="", encoding="utf-8") as outf:
writer = csv.DictWriter(
outf,
fieldnames=output_fieldnames,
extrasaction="ignore",
)
writer.writeheader()
for info in file_info:
source_name = Path(info["path"]).name
file_rows = 0
with open(info["path"], newline="", encoding="utf-8-sig") as inf:
reader = csv.DictReader(inf, dialect=info["dialect"])
for row in reader:
# Normalize keys to lowercase
normalized = {k.lower(): v for k, v in row.items()}
normalized["source_file"] = source_name
writer.writerow(normalized)
file_rows += 1
total_rows += 1
print(f" Merged {file_rows:,} rows from {source_name}")
stats = {"total_rows": total_rows, "files_merged": len(input_files)}
print(f"\nMerge complete: {total_rows:,} total rows → {output_file}")
return stats
# Demo
if __name__ == "__main__":
import tempfile
# Create test files with different dialects
with tempfile.TemporaryDirectory() as tmpdir:
# File 1: comma-separated
f1 = os.path.join(tmpdir, "sales_q1.csv")
with open(f1, "w", newline="") as f:
w = csv.writer(f)
w.writerow(["name", "amount", "date"])
w.writerow(["Alice", "1200.00", "2024-01-15"])
w.writerow(["Bob", "850.50", "2024-01-20"])
# File 2: tab-separated
f2 = os.path.join(tmpdir, "sales_q2.tsv")
with open(f2, "w", newline="") as f:
w = csv.writer(f, dialect="excel-tab")
w.writerow(["name", "amount", "date"])
w.writerow(["Carol", "2100.00", "2024-04-10"])
w.writerow(["Dave", "975.25", "2024-05-01"])
# File 3: semicolon-separated
f3 = os.path.join(tmpdir, "sales_q3.csv")
with open(f3, "w", newline="") as f:
csv.register_dialect("semi", delimiter=";")
w = csv.writer(f, dialect="semi")
w.writerow(["name", "amount", "date"])
w.writerow(["Eve", "3400.00", "2024-07-22"])
output = os.path.join(tmpdir, "merged.csv")
print("Merging files:")
stats = merge_csv_files([f1, f2, f3], output)
# Verify output
with open(output, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
print(f" {row}")
# Output:
# Merging files:
# sales_q1.csv: delimiter=',' cols=3
# sales_q2.tsv: delimiter='\t' cols=3
# sales_q3.csv: delimiter=';' cols=3
# Merged 2 rows from sales_q1.csv
# Merged 2 rows from sales_q2.tsv
# Merged 1 rows from sales_q3.csv
#
# Merge complete: 5 total rows → /tmp/.../merged.csv
# {'name': 'Alice', 'amount': '1200.00', ..., 'source_file': 'sales_q1.csv'}
# ...
Advanced - Streaming CSV Transformer With Schema Validation
Build a CSVTransformer class that:
- Accepts a schema dict mapping column names to type callables (e.g.,
{"salary": int, "active": bool_from_str}) - Streams the input CSV row by row (no full load into memory)
- Applies the schema to convert types, collecting per-column error counts
- Writes valid rows to an output CSV and invalid rows to a separate error CSV
- Supports a
transform_fnhook: an optional function applied to each valid row after type conversion - Prints a final report showing type conversion success rates per column
Solution
import csv
import sys
from pathlib import Path
from typing import Any, Callable
from collections import defaultdict
def bool_from_str(value: str) -> bool:
"""Convert string boolean representations to Python bool."""
cleaned = value.strip().lower()
if cleaned in ("true", "1", "yes", "on"):
return True
if cleaned in ("false", "0", "no", "off", ""):
return False
raise ValueError(f"Cannot convert {value!r} to bool")
def nullable_int(value: str) -> int | None:
"""Convert string to int, returning None for empty strings."""
if not value.strip():
return None
return int(value.strip())
def nullable_float(value: str) -> float | None:
"""Convert string to float, returning None for empty strings."""
if not value.strip():
return None
return float(value.strip().replace(",", "")) # Handle thousands separators
class CSVTransformer:
"""
Streaming CSV transformer with schema validation and error segregation.
Processes files of arbitrary size in O(1) memory.
"""
def __init__(
self,
schema: dict[str, Callable],
transform_fn: Callable | None = None,
input_encoding: str = "utf-8-sig",
output_encoding: str = "utf-8",
):
"""
Args:
schema: dict mapping column names to type conversion callables
transform_fn: optional function(row_dict) -> row_dict applied after conversion
input_encoding: encoding for input file (utf-8-sig handles Excel BOM)
output_encoding: encoding for output files
"""
self.schema = schema
self.transform_fn = transform_fn
self.input_encoding = input_encoding
self.output_encoding = output_encoding
def transform(
self,
input_path: str,
output_path: str,
error_path: str,
) -> dict:
"""
Run the transformation.
Args:
input_path: path to input CSV
output_path: path to write valid, transformed rows
error_path: path to write rows with type conversion errors
Returns:
stats dict with per-column error counts and totals
"""
stats = {
"total_rows": 0,
"valid_rows": 0,
"error_rows": 0,
"column_errors": defaultdict(int), # column -> error count
}
with (
open(input_path, newline="", encoding=self.input_encoding) as infile,
open(output_path, "w", newline="", encoding=self.output_encoding) as outfile,
open(error_path, "w", newline="", encoding=self.output_encoding) as errfile,
):
reader = csv.DictReader(infile)
if reader.fieldnames is None:
raise ValueError("Input CSV has no header row")
# Output: same columns as input (after type conversion, values become strings)
out_writer = csv.DictWriter(
outfile,
fieldnames=reader.fieldnames,
extrasaction="ignore",
)
out_writer.writeheader()
# Error file: original columns + error column
err_writer = csv.DictWriter(
errfile,
fieldnames=reader.fieldnames + ["_conversion_errors"],
extrasaction="ignore",
)
err_writer.writeheader()
for row in reader:
stats["total_rows"] += 1
converted = {}
row_errors = []
# Apply schema conversions
for col, raw_value in row.items():
if col in self.schema:
converter = self.schema[col]
try:
converted[col] = converter(raw_value)
except (ValueError, TypeError) as e:
converted[col] = None
stats["column_errors"][col] += 1
row_errors.append(f"{col}: {e}")
else:
converted[col] = raw_value # No schema - pass through as string
if row_errors:
stats["error_rows"] += 1
error_row = dict(row)
error_row["_conversion_errors"] = "; ".join(row_errors)
err_writer.writerow(error_row)
else:
# Apply optional transform hook
if self.transform_fn:
try:
converted = self.transform_fn(converted)
except Exception as e:
stats["error_rows"] += 1
error_row = dict(row)
error_row["_conversion_errors"] = f"transform_fn: {e}"
err_writer.writerow(error_row)
continue
# Write converted values back as strings for CSV output
out_row = {k: ("" if v is None else str(v)) for k, v in converted.items()}
out_writer.writerow(out_row)
stats["valid_rows"] += 1
self._print_report(stats)
return stats
def _print_report(self, stats: dict):
total = stats["total_rows"]
valid = stats["valid_rows"]
errors = stats["error_rows"]
pct_valid = 100 * valid / total if total else 0
print(f"\nTransformation Report")
print(f" Total rows: {total:,}")
print(f" Valid rows: {valid:,} ({pct_valid:.1f}%)")
print(f" Error rows: {errors:,} ({100 - pct_valid:.1f}%)")
if stats["column_errors"]:
print(f"\n Column-level errors:")
for col, count in sorted(stats["column_errors"].items(), key=lambda x: -x[1]):
pct = 100 * count / total if total else 0
print(f" {col}: {count:,} errors ({pct:.1f}%)")
# Demo usage
if __name__ == "__main__":
import tempfile, os
sample_csv = """id,name,salary,active,signup_date,score
1,Alice,95000,true,2020-03-15,87.5
2,Bob,INVALID,false,2021-07-01,92.1
3,Carol,110000,yes,2019-01-20,78.3
4,Dave,68000,maybe,2022-11-30,INVALID
5,Eve,88500,true,,95.0
"""
def add_salary_band(row):
"""Add a salary_band field based on salary."""
salary = row.get("salary")
if salary is None:
row["salary_band"] = "unknown"
elif salary >= 100_000:
row["salary_band"] = "senior"
elif salary >= 75_000:
row["salary_band"] = "mid"
else:
row["salary_band"] = "junior"
return row
schema = {
"id": int,
"salary": nullable_int,
"active": bool_from_str,
"score": nullable_float,
}
with tempfile.TemporaryDirectory() as tmpdir:
input_path = os.path.join(tmpdir, "employees.csv")
output_path = os.path.join(tmpdir, "employees_clean.csv")
error_path = os.path.join(tmpdir, "employees_errors.csv")
with open(input_path, "w") as f:
f.write(sample_csv)
transformer = CSVTransformer(
schema=schema,
transform_fn=add_salary_band,
)
stats = transformer.transform(input_path, output_path, error_path)
print("\nClean output:")
with open(output_path, newline="") as f:
for row in csv.DictReader(f):
print(f" {row}")
print("\nError rows:")
with open(error_path, newline="") as f:
for row in csv.DictReader(f):
print(f" {row}")
# Expected output:
# Transformation Report
# Total rows: 5
# Valid rows: 3 (60.0%)
# Error rows: 2 (40.0%)
#
# Column-level errors:
# salary: 1 errors (20.0%)
# active: 1 errors (20.0%)
# score: 1 errors (20.0%)
#
# Clean output:
# {'id': '1', 'name': 'Alice', 'salary': '95000', ...}
# {'id': '3', 'name': 'Carol', 'salary': '110000', ...}
# {'id': '5', 'name': 'Eve', 'salary': '88500', ...}
#
# Error rows:
# {'id': '2', ..., '_conversion_errors': "salary: invalid literal for int()..."}
# {'id': '4', ..., '_conversion_errors': "active: Cannot convert 'maybe' to bool..."}
Quick Reference
| Operation | Code | Notes |
|---|---|---|
| Read rows as lists | csv.reader(f) | Yields list[str] per row |
| Read rows as dicts | csv.DictReader(f) | Yields dict per row - preferred |
| Skip header | next(reader) | Reads and discards first row |
| Write rows | csv.writer(f).writerow([...]) | Handles quoting automatically |
| Write many rows | writer.writerows([[...], ...]) | Single call, multiple rows |
| Write dicts | csv.DictWriter(f, fieldnames=[...]) | Dict-based writer |
| Write header | writer.writeheader() | Must call before writerows |
| Tab-separated | csv.reader(f, delimiter='\t') | Or use dialect="excel-tab" |
| Custom dialect | csv.register_dialect("name", delimiter=";") | Register once, reuse |
| Auto-detect | csv.Sniffer().sniff(sample) | Returns dialect object |
| Has header? | csv.Sniffer().has_header(sample) | Returns bool |
| Excel BOM fix | encoding='utf-8-sig' | On open() call |
| Excel output | encoding='utf-8-sig' | Adds BOM, Excel reads it |
| Quote all | quoting=csv.QUOTE_ALL | Every field quoted |
| Quote minimal | quoting=csv.QUOTE_MINIMAL | Default - minimal quoting |
| Quote none | quoting=csv.QUOTE_NONE, escapechar='\\' | No quotes, use escape |
| Stream large | for row in reader: | Never list(reader) for big files |
| Chunk large | itertools.islice(reader, N) | Process N rows at a time |
| Extra fields | extrasaction="ignore" | In DictWriter - drop extra keys |
| Missing fields | restval="N/A" | In DictWriter - fill missing keys |
| List dialects | csv.list_dialects() | All registered dialect names |
Key Takeaways
- Never parse CSV with
str.split(",")- thecsvmodule handles quoted commas, embedded newlines, and double-quote escaping correctly; string splitting does none of these - Always open CSV files with
newline=""- this prevents Python's universal newlines translation from corrupting embedded newlines inside quoted fields csv.DictReaderis the production standard - it accesses columns by name rather than position, making code robust to column reordering and additions- Use
encoding='utf-8-sig'when reading files exported from Excel - it strips the BOM that would otherwise corrupt the first column name csv.Sniffercan auto-detect delimiters and quoting from a sample, but always provide a fallback dialect for production code where sniffing might fail- Stream large CSV files row by row with
for row in reader:- never calllist(reader)on files that might be large; iterating is O(1) memory - The
csvmodule is the right tool for simple ETL, data moving, and configuration files; reach forpandaswhen you need filtering, grouping, aggregation, or joins on the data csv.DictWriterwithextrasaction="ignore"andrestvalhandles the common cases of rows with extra or missing fields gracefully
