Project 01 - Concurrent Web Scraper
Objective
Build a concurrent web scraper that fetches and parses a list of URLs in parallel, producing structured output including each URL's HTTP status, page title, word count, and outbound links. You will implement the scraper twice - once using concurrent.futures.ThreadPoolExecutor and once using asyncio with aiohttp - and compare the two approaches on real-world data.
This project tests your ability to design a concurrent system that is polite (rate-limiting, respect for domains), resilient (retry with backoff, timeout handling), and correct (no data races, clean error reporting on partial failures).
What to Build
Core Requirements
Your scraper must accept a list of URLs (from a file or command-line arguments) and produce a structured result for each URL:
@dataclass
class ScrapeResult:
url: str
status: int | None # HTTP status code, None on connection error
title: str | None # <title> tag content, None if not found
word_count: int # word count of visible body text
links_found: list[str] # all <a href="..."> absolute URLs on the page
error: str | None # error message if fetch failed, else None
fetched_at: str # ISO-8601 timestamp when fetch completed
duration_ms: int # time from request start to response complete
Implementation A - ThreadPoolExecutor
File: scraper_threads.py
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from typing import Callable
def scrape_url(url: str, timeout: float = 10.0) -> ScrapeResult:
"""Fetch and parse a single URL synchronously. Called from a thread."""
...
def scrape_all_threaded(
urls: list[str],
max_workers: int = 10,
timeout: float = 10.0,
on_result: Callable[[ScrapeResult], None] | None = None,
) -> list[ScrapeResult]:
"""
Scrape all URLs using a ThreadPoolExecutor.
Args:
urls: URLs to scrape.
max_workers: Maximum concurrent threads.
timeout: Per-request timeout in seconds.
on_result: Optional callback invoked as each result completes.
Returns:
List of ScrapeResult, one per URL, in completion order.
"""
...
Implementation B - asyncio + aiohttp
File: scraper_async.py
import asyncio
import aiohttp
from typing import Callable, Awaitable
async def scrape_url_async(
session: aiohttp.ClientSession,
url: str,
semaphore: asyncio.Semaphore,
timeout: float = 10.0,
) -> ScrapeResult:
"""Fetch and parse a single URL asynchronously."""
...
async def scrape_all_async(
urls: list[str],
concurrency: int = 10,
timeout: float = 10.0,
on_result: Callable[[ScrapeResult], Awaitable[None]] | None = None,
) -> list[ScrapeResult]:
"""
Scrape all URLs using asyncio and aiohttp.
Args:
urls: URLs to scrape.
concurrency: Maximum concurrent in-flight requests (semaphore limit).
timeout: Per-request timeout in seconds.
on_result: Optional async callback invoked as each result completes.
Returns:
List of ScrapeResult, one per URL, in completion order.
"""
...
Technical Requirements
Requirement 1 - Configurable Concurrency Limit
Both implementations must accept a max_workers (threads) or concurrency (async) parameter that caps how many requests are in flight simultaneously.
- For the threaded version:
ThreadPoolExecutor(max_workers=max_workers) - For the async version:
asyncio.Semaphore(concurrency)- acquire before each request, release after
Enforce the limit strictly. A scraper launched with concurrency=5 must never have more than 5 simultaneous HTTP connections open.
Requirement 2 - Retry with Exponential Backoff
Both implementations must retry failed requests up to a configurable number of times. A failure is any of:
- Connection error (
ConnectionError,aiohttp.ClientConnectionError) - Timeout (
requests.Timeout,asyncio.TimeoutError) - HTTP 5xx response (server errors - worth retrying)
Do not retry HTTP 4xx responses (404, 403, etc.) - these are permanent client errors.
def with_retry(fn, max_retries: int = 3, base_delay: float = 1.0):
"""
Call fn() up to max_retries times with exponential backoff.
Backoff: base_delay * (2 ** attempt) * random.uniform(0.5, 1.5)
Returns the first successful result, or raises the last exception.
"""
The async version must use asyncio.sleep() for delays, not time.sleep().
Requirement 3 - Per-Request Timeout
Every HTTP request must have a timeout. A request that takes longer than timeout seconds must be cancelled and recorded as a timeout error - it must not block the entire scrape.
# Threaded version:
response = requests.get(url, timeout=timeout) # applies connect + read timeout
# Async version:
async with asyncio.timeout(timeout): # Python 3.11+, or asyncio.wait_for for 3.10
async with session.get(url) as response:
content = await response.text()
Requirement 4 - Domain-Level Rate Limiting
To avoid hammering any single server, your scraper must enforce a minimum delay between successive requests to the same domain. The delay is configurable (default: 1 second).
from urllib.parse import urlparse
import time
import threading
class DomainRateLimiter:
"""Thread-safe per-domain rate limiter."""
def __init__(self, delay_seconds: float = 1.0):
self._last_request: dict[str, float] = {}
self._lock = threading.Lock()
self.delay = delay_seconds
def wait(self, url: str) -> None:
"""Block until it is polite to fetch this URL's domain."""
domain = urlparse(url).netloc
with self._lock:
last = self._last_request.get(domain, 0.0)
elapsed = time.monotonic() - last
wait_time = max(0.0, self.delay - elapsed)
self._last_request[domain] = time.monotonic() + wait_time
if wait_time > 0:
time.sleep(wait_time) # OK in threaded version - blocks only this thread
For the async version, replace time.sleep with await asyncio.sleep and use asyncio.Lock instead of threading.Lock.
Requirement 5 - HTML Parsing
Extract the following from each successfully fetched page. HTML parsing must be isolated in parser.py and shared between both implementations:
| Field | How to extract |
|---|---|
title | First <title> tag text, stripped of whitespace. None if absent. |
word_count | Count of whitespace-separated tokens in all visible text (exclude <script>, <style>, <head> content). |
links_found | All <a href="..."> URLs, resolved to absolute URLs using urllib.parse.urljoin. Deduplicated. Maximum 100 per page. |
Use Python's built-in html.parser (HTMLParser) or BeautifulSoup with html.parser as the backend. Do not use lxml (requires a C extension that may not be available in all environments).
# parser.py
from html.parser import HTMLParser
from urllib.parse import urljoin, urlparse
def parse_page(html: str, base_url: str) -> dict:
"""
Parse an HTML page and return extracted fields.
Returns:
{
"title": str | None,
"word_count": int,
"links": list[str], # absolute URLs, max 100
}
"""
...
Requirement 6 - Structured Output
Both implementations must support writing results to:
- JSON - a single array of
ScrapeResultobjects as dicts - CSV - one row per result, columns matching
ScrapeResultfields
# output.py
import json
import csv
from dataclasses import asdict
def write_json(results: list[ScrapeResult], path: str) -> None:
with open(path, "w") as f:
json.dump([asdict(r) for r in results], f, indent=2)
def write_csv(results: list[ScrapeResult], path: str) -> None:
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=[
"url", "status", "title", "word_count",
"links_found", "error", "fetched_at", "duration_ms",
])
writer.writeheader()
for r in results:
row = asdict(r)
row["links_found"] = len(r.links_found) # store count, not the list
writer.writerow(row)
Requirement 7 - Graceful Error Handling
The scraper must never crash because one URL fails. All errors must be captured in ScrapeResult.error and reported in the final output. Categories to handle:
| Error type | status | error message |
|---|---|---|
| Connection refused / DNS failure | None | "ConnectionError: <detail>" |
| Request timeout | None | "Timeout after <N>s" |
| HTTP 4xx | actual status code | "HTTP 404: Not Found" |
| HTTP 5xx (after max retries) | actual status code | "HTTP 503: Service Unavailable (after 3 retries)" |
| HTML parse error | actual status code | "ParseError: <detail>" |
word_count and links_found are 0 and [] respectively when an error occurs.
Acceptance Criteria
Your implementation passes when all of the following are true:
- Scraping 50 URLs with
max_workers=10(threaded) orconcurrency=10(async) completes faster than scraping them serially (measure withtime.perf_counter). - At most
max_workers(orconcurrency) HTTP connections are open simultaneously at any point during the scrape. (Verify by logging connection open/close events.) - A URL that times out records
status=Noneanderrorstarting with"Timeout"- the scraper does not hang or crash. - A URL returning HTTP 500 is retried up to
max_retriestimes, with delay between retries. After max retries,errorcontains the final HTTP status. - A URL returning HTTP 404 is not retried - it records
status=404immediately. - Two requests to the same domain are separated by at least
domain_delayseconds (within ±100 ms tolerance). Verify with request timestamps in the output. -
titleis extracted correctly from a page with a<title>tag. -
word_countexcludes text inside<script>and<style>tags. -
links_foundcontains only absolute URLs (starting withhttp://orhttps://). -
links_foundcontains at most 100 URLs per page. - All results are written to a valid JSON file.
json.load()on the output file succeeds without error. - All results are written to a valid CSV file.
csv.DictReaderon the output file yields one row per input URL. - The scraper completes when every URL has either succeeded or exhausted its retries - it does not hang indefinitely.
- A mix of successful, errored, and timed-out URLs in the input produces a complete result list with one entry per URL.
Scraper Flow
CLI Interface
Both implementations must be runnable from the command line with the same interface:
# Threaded
python scraper_threads.py urls.txt \
--max-workers 10 \
--timeout 8 \
--max-retries 3 \
--domain-delay 1.0 \
--output-json results.json \
--output-csv results.csv
# Async
python scraper_async.py urls.txt \
--concurrency 10 \
--timeout 8 \
--max-retries 3 \
--domain-delay 1.0 \
--output-json results.json \
--output-csv results.csv
Both must print a summary when complete:
Scrape complete in 12.4s
Total URLs: 50
Succeeded: 43
Failed: 5 (HTTP errors)
Timed out: 2
Avg duration: 847ms
Fastest: 112ms (https://example.com)
Slowest: 7823ms (https://slow-site.example)
Hints
How do I extract visible text while excluding script and style content?
Subclass HTMLParser and track whether you are inside a tag you want to skip:
from html.parser import HTMLParser
class VisibleTextExtractor(HTMLParser):
SKIP_TAGS = {"script", "style", "head", "noscript", "meta", "link"}
def __init__(self):
super().__init__()
self._skip_depth = 0
self._text_parts: list[str] = []
def handle_starttag(self, tag, attrs):
if tag.lower() in self.SKIP_TAGS:
self._skip_depth += 1
def handle_endtag(self, tag):
if tag.lower() in self.SKIP_TAGS and self._skip_depth > 0:
self._skip_depth -= 1
def handle_data(self, data):
if self._skip_depth == 0:
stripped = data.strip()
if stripped:
self._text_parts.append(stripped)
def get_word_count(self) -> int:
full_text = " ".join(self._text_parts)
return len(full_text.split())
def count_words(html: str) -> int:
extractor = VisibleTextExtractor()
extractor.feed(html)
return extractor.get_word_count()
How do I resolve relative links to absolute URLs?
Use urllib.parse.urljoin. It correctly handles relative paths, protocol-relative URLs, and absolute URLs:
from urllib.parse import urljoin, urlparse
def resolve_links(hrefs: list[str], base_url: str) -> list[str]:
absolute = []
for href in hrefs:
if not href or href.startswith("#") or href.startswith("mailto:"):
continue
resolved = urljoin(base_url, href)
parsed = urlparse(resolved)
if parsed.scheme in ("http", "https"):
absolute.append(resolved)
return list(dict.fromkeys(absolute))[:100] # deduplicate, cap at 100
urljoin("https://example.com/news/", "../about") → "https://example.com/about". It handles all the edge cases you would otherwise miss.
How do I implement exponential backoff correctly for both threaded and async versions?
Factor the retry logic into a function that accepts a callable, so the same interface works for both sync and async:
# retry.py
import time
import random
import asyncio
def with_retry_sync(fn, max_retries: int = 3, base_delay: float = 1.0):
last_exc = None
for attempt in range(max_retries + 1):
try:
return fn()
except Exception as exc:
last_exc = exc
if attempt == max_retries:
break
delay = base_delay * (2 ** attempt) * random.uniform(0.5, 1.5)
time.sleep(delay)
raise last_exc
async def with_retry_async(coro_fn, max_retries: int = 3, base_delay: float = 1.0):
"""coro_fn must be a zero-argument callable that returns a coroutine."""
last_exc = None
for attempt in range(max_retries + 1):
try:
return await coro_fn()
except Exception as exc:
last_exc = exc
if attempt == max_retries:
break
delay = base_delay * (2 ** attempt) * random.uniform(0.5, 1.5)
await asyncio.sleep(delay)
raise last_exc
# Usage in threaded version:
result = with_retry_sync(lambda: requests.get(url, timeout=timeout), max_retries=3)
# Usage in async version:
result = await with_retry_async(
lambda: session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)),
max_retries=3,
)
How do I implement the async domain rate limiter correctly?
The async version needs asyncio.Lock (not threading.Lock) and asyncio.sleep (not time.sleep):
import asyncio
import time
from urllib.parse import urlparse
class AsyncDomainRateLimiter:
def __init__(self, delay_seconds: float = 1.0):
self._last_request: dict[str, float] = {}
self._locks: dict[str, asyncio.Lock] = {}
self.delay = delay_seconds
def _get_lock(self, domain: str) -> asyncio.Lock:
if domain not in self._locks:
self._locks[domain] = asyncio.Lock()
return self._locks[domain]
async def wait(self, url: str) -> None:
domain = urlparse(url).netloc
lock = self._get_lock(domain)
async with lock:
last = self._last_request.get(domain, 0.0)
elapsed = time.monotonic() - last
wait_time = max(0.0, self.delay - elapsed)
if wait_time > 0:
await asyncio.sleep(wait_time)
self._last_request[domain] = time.monotonic()
The async with lock ensures only one coroutine at a time reads and updates _last_request[domain], preventing race conditions where two coroutines targeting the same domain both read the same last value and both decide they don't need to wait.
How do I use as_completed to stream results as they finish?
concurrent.futures.as_completed yields futures as they complete, not in submission order:
from concurrent.futures import ThreadPoolExecutor, as_completed
def scrape_all_threaded(urls, max_workers=10, timeout=10.0, on_result=None):
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {
executor.submit(scrape_url, url, timeout): url
for url in urls
}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
except Exception as exc:
result = ScrapeResult(
url=url, status=None, title=None, word_count=0,
links_found=[], error=str(exc),
fetched_at=datetime.utcnow().isoformat(),
duration_ms=0,
)
results.append(result)
if on_result:
on_result(result)
return results
For the async version, use asyncio.as_completed (Python 3.10+) or wrap each coroutine in a task and use asyncio.gather(..., return_exceptions=True) if you don't need streaming.
How do I measure concurrent connections to verify the semaphore is working?
Wrap the fetch function with a counter using threading.Semaphore (not for limiting - for observing):
import threading
_active_connections = 0
_max_observed = 0
_counter_lock = threading.Lock()
def tracked_scrape_url(url, timeout):
global _active_connections, _max_observed
with _counter_lock:
_active_connections += 1
_max_observed = max(_max_observed, _active_connections)
try:
return scrape_url(url, timeout)
finally:
with _counter_lock:
_active_connections -= 1
# After scraping:
print(f"Peak concurrent connections: {_max_observed}")
assert _max_observed <= max_workers
Extension Challenges
These are optional. Attempt them only after all acceptance criteria pass.
Extension A - Politeness: sitemap.xml Parsing
Before scraping a domain, fetch its sitemap.xml (if it exists) and use it to discover URLs rather than requiring them to be provided manually:
- Fetch
https://{domain}/sitemap.xml - Parse the XML and extract all
<loc>URLs - Deduplicate against the input URL list
- Respect
<changefreq>hints - skip URLs withchangefreq=neverthat were already scraped recently (store results in a SQLite cache keyed by URL + last-scraped date)
Extension B - robots.txt Compliance
Before fetching any URL, check the domain's robots.txt and skip URLs that are disallowed for your user agent:
import urllib.robotparser
def is_allowed(url: str, user_agent: str = "MyCrawler/1.0") -> bool:
from urllib.parse import urlparse
parsed = urlparse(url)
robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
rp = urllib.robotparser.RobotFileParser()
rp.set_url(robots_url)
rp.read()
return rp.can_fetch(user_agent, url)
Cache the parsed robots.txt per domain so you do not re-fetch it for every URL on the same domain.
Extension C - Performance Benchmark
Add a benchmark.py script that runs both the threaded and async implementations against the same list of 100 URLs and produces a comparison report:
=== Scraper Benchmark (100 URLs) ===
ThreadPoolExecutor (max_workers=10):
Total time: 18.2s
p50 latency: 412ms
p95 latency: 1240ms
Succeeded: 87 / 100
asyncio + aiohttp (concurrency=10):
Total time: 14.7s
p50 latency: 383ms
p95 latency: 1108ms
Succeeded: 87 / 100
Winner: asyncio (19% faster, same success rate)
Use statistics.quantiles for percentile calculations. Run each implementation three times and report the median total time to reduce noise.
