Skip to main content

Project 01 - Concurrent Web Scraper

Objective

Build a concurrent web scraper that fetches and parses a list of URLs in parallel, producing structured output including each URL's HTTP status, page title, word count, and outbound links. You will implement the scraper twice - once using concurrent.futures.ThreadPoolExecutor and once using asyncio with aiohttp - and compare the two approaches on real-world data.

This project tests your ability to design a concurrent system that is polite (rate-limiting, respect for domains), resilient (retry with backoff, timeout handling), and correct (no data races, clean error reporting on partial failures).

What to Build

Core Requirements

Your scraper must accept a list of URLs (from a file or command-line arguments) and produce a structured result for each URL:

@dataclass
class ScrapeResult:
url: str
status: int | None # HTTP status code, None on connection error
title: str | None # <title> tag content, None if not found
word_count: int # word count of visible body text
links_found: list[str] # all <a href="..."> absolute URLs on the page
error: str | None # error message if fetch failed, else None
fetched_at: str # ISO-8601 timestamp when fetch completed
duration_ms: int # time from request start to response complete

Implementation A - ThreadPoolExecutor

File: scraper_threads.py

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from typing import Callable

def scrape_url(url: str, timeout: float = 10.0) -> ScrapeResult:
"""Fetch and parse a single URL synchronously. Called from a thread."""
...

def scrape_all_threaded(
urls: list[str],
max_workers: int = 10,
timeout: float = 10.0,
on_result: Callable[[ScrapeResult], None] | None = None,
) -> list[ScrapeResult]:
"""
Scrape all URLs using a ThreadPoolExecutor.

Args:
urls: URLs to scrape.
max_workers: Maximum concurrent threads.
timeout: Per-request timeout in seconds.
on_result: Optional callback invoked as each result completes.

Returns:
List of ScrapeResult, one per URL, in completion order.
"""
...

Implementation B - asyncio + aiohttp

File: scraper_async.py

import asyncio
import aiohttp
from typing import Callable, Awaitable

async def scrape_url_async(
session: aiohttp.ClientSession,
url: str,
semaphore: asyncio.Semaphore,
timeout: float = 10.0,
) -> ScrapeResult:
"""Fetch and parse a single URL asynchronously."""
...

async def scrape_all_async(
urls: list[str],
concurrency: int = 10,
timeout: float = 10.0,
on_result: Callable[[ScrapeResult], Awaitable[None]] | None = None,
) -> list[ScrapeResult]:
"""
Scrape all URLs using asyncio and aiohttp.

Args:
urls: URLs to scrape.
concurrency: Maximum concurrent in-flight requests (semaphore limit).
timeout: Per-request timeout in seconds.
on_result: Optional async callback invoked as each result completes.

Returns:
List of ScrapeResult, one per URL, in completion order.
"""
...

Technical Requirements

Requirement 1 - Configurable Concurrency Limit

Both implementations must accept a max_workers (threads) or concurrency (async) parameter that caps how many requests are in flight simultaneously.

  • For the threaded version: ThreadPoolExecutor(max_workers=max_workers)
  • For the async version: asyncio.Semaphore(concurrency) - acquire before each request, release after

Enforce the limit strictly. A scraper launched with concurrency=5 must never have more than 5 simultaneous HTTP connections open.

Requirement 2 - Retry with Exponential Backoff

Both implementations must retry failed requests up to a configurable number of times. A failure is any of:

  • Connection error (ConnectionError, aiohttp.ClientConnectionError)
  • Timeout (requests.Timeout, asyncio.TimeoutError)
  • HTTP 5xx response (server errors - worth retrying)

Do not retry HTTP 4xx responses (404, 403, etc.) - these are permanent client errors.

def with_retry(fn, max_retries: int = 3, base_delay: float = 1.0):
"""
Call fn() up to max_retries times with exponential backoff.
Backoff: base_delay * (2 ** attempt) * random.uniform(0.5, 1.5)
Returns the first successful result, or raises the last exception.
"""

The async version must use asyncio.sleep() for delays, not time.sleep().

Requirement 3 - Per-Request Timeout

Every HTTP request must have a timeout. A request that takes longer than timeout seconds must be cancelled and recorded as a timeout error - it must not block the entire scrape.

# Threaded version:
response = requests.get(url, timeout=timeout) # applies connect + read timeout

# Async version:
async with asyncio.timeout(timeout): # Python 3.11+, or asyncio.wait_for for 3.10
async with session.get(url) as response:
content = await response.text()

Requirement 4 - Domain-Level Rate Limiting

To avoid hammering any single server, your scraper must enforce a minimum delay between successive requests to the same domain. The delay is configurable (default: 1 second).

from urllib.parse import urlparse
import time
import threading

class DomainRateLimiter:
"""Thread-safe per-domain rate limiter."""

def __init__(self, delay_seconds: float = 1.0):
self._last_request: dict[str, float] = {}
self._lock = threading.Lock()
self.delay = delay_seconds

def wait(self, url: str) -> None:
"""Block until it is polite to fetch this URL's domain."""
domain = urlparse(url).netloc
with self._lock:
last = self._last_request.get(domain, 0.0)
elapsed = time.monotonic() - last
wait_time = max(0.0, self.delay - elapsed)
self._last_request[domain] = time.monotonic() + wait_time
if wait_time > 0:
time.sleep(wait_time) # OK in threaded version - blocks only this thread

For the async version, replace time.sleep with await asyncio.sleep and use asyncio.Lock instead of threading.Lock.

Requirement 5 - HTML Parsing

Extract the following from each successfully fetched page. HTML parsing must be isolated in parser.py and shared between both implementations:

FieldHow to extract
titleFirst <title> tag text, stripped of whitespace. None if absent.
word_countCount of whitespace-separated tokens in all visible text (exclude <script>, <style>, <head> content).
links_foundAll <a href="..."> URLs, resolved to absolute URLs using urllib.parse.urljoin. Deduplicated. Maximum 100 per page.

Use Python's built-in html.parser (HTMLParser) or BeautifulSoup with html.parser as the backend. Do not use lxml (requires a C extension that may not be available in all environments).

# parser.py
from html.parser import HTMLParser
from urllib.parse import urljoin, urlparse

def parse_page(html: str, base_url: str) -> dict:
"""
Parse an HTML page and return extracted fields.

Returns:
{
"title": str | None,
"word_count": int,
"links": list[str], # absolute URLs, max 100
}
"""
...

Requirement 6 - Structured Output

Both implementations must support writing results to:

  1. JSON - a single array of ScrapeResult objects as dicts
  2. CSV - one row per result, columns matching ScrapeResult fields
# output.py
import json
import csv
from dataclasses import asdict

def write_json(results: list[ScrapeResult], path: str) -> None:
with open(path, "w") as f:
json.dump([asdict(r) for r in results], f, indent=2)

def write_csv(results: list[ScrapeResult], path: str) -> None:
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=[
"url", "status", "title", "word_count",
"links_found", "error", "fetched_at", "duration_ms",
])
writer.writeheader()
for r in results:
row = asdict(r)
row["links_found"] = len(r.links_found) # store count, not the list
writer.writerow(row)

Requirement 7 - Graceful Error Handling

The scraper must never crash because one URL fails. All errors must be captured in ScrapeResult.error and reported in the final output. Categories to handle:

Error typestatuserror message
Connection refused / DNS failureNone"ConnectionError: <detail>"
Request timeoutNone"Timeout after <N>s"
HTTP 4xxactual status code"HTTP 404: Not Found"
HTTP 5xx (after max retries)actual status code"HTTP 503: Service Unavailable (after 3 retries)"
HTML parse erroractual status code"ParseError: <detail>"

word_count and links_found are 0 and [] respectively when an error occurs.

Acceptance Criteria

Your implementation passes when all of the following are true:

  • Scraping 50 URLs with max_workers=10 (threaded) or concurrency=10 (async) completes faster than scraping them serially (measure with time.perf_counter).
  • At most max_workers (or concurrency) HTTP connections are open simultaneously at any point during the scrape. (Verify by logging connection open/close events.)
  • A URL that times out records status=None and error starting with "Timeout" - the scraper does not hang or crash.
  • A URL returning HTTP 500 is retried up to max_retries times, with delay between retries. After max retries, error contains the final HTTP status.
  • A URL returning HTTP 404 is not retried - it records status=404 immediately.
  • Two requests to the same domain are separated by at least domain_delay seconds (within ±100 ms tolerance). Verify with request timestamps in the output.
  • title is extracted correctly from a page with a <title> tag.
  • word_count excludes text inside <script> and <style> tags.
  • links_found contains only absolute URLs (starting with http:// or https://).
  • links_found contains at most 100 URLs per page.
  • All results are written to a valid JSON file. json.load() on the output file succeeds without error.
  • All results are written to a valid CSV file. csv.DictReader on the output file yields one row per input URL.
  • The scraper completes when every URL has either succeeded or exhausted its retries - it does not hang indefinitely.
  • A mix of successful, errored, and timed-out URLs in the input produces a complete result list with one entry per URL.

Scraper Flow

CLI Interface

Both implementations must be runnable from the command line with the same interface:

# Threaded
python scraper_threads.py urls.txt \
--max-workers 10 \
--timeout 8 \
--max-retries 3 \
--domain-delay 1.0 \
--output-json results.json \
--output-csv results.csv

# Async
python scraper_async.py urls.txt \
--concurrency 10 \
--timeout 8 \
--max-retries 3 \
--domain-delay 1.0 \
--output-json results.json \
--output-csv results.csv

Both must print a summary when complete:

Scrape complete in 12.4s
Total URLs: 50
Succeeded: 43
Failed: 5 (HTTP errors)
Timed out: 2
Avg duration: 847ms
Fastest: 112ms (https://example.com)
Slowest: 7823ms (https://slow-site.example)

Hints

How do I extract visible text while excluding script and style content?

Subclass HTMLParser and track whether you are inside a tag you want to skip:

from html.parser import HTMLParser

class VisibleTextExtractor(HTMLParser):
SKIP_TAGS = {"script", "style", "head", "noscript", "meta", "link"}

def __init__(self):
super().__init__()
self._skip_depth = 0
self._text_parts: list[str] = []

def handle_starttag(self, tag, attrs):
if tag.lower() in self.SKIP_TAGS:
self._skip_depth += 1

def handle_endtag(self, tag):
if tag.lower() in self.SKIP_TAGS and self._skip_depth > 0:
self._skip_depth -= 1

def handle_data(self, data):
if self._skip_depth == 0:
stripped = data.strip()
if stripped:
self._text_parts.append(stripped)

def get_word_count(self) -> int:
full_text = " ".join(self._text_parts)
return len(full_text.split())

def count_words(html: str) -> int:
extractor = VisibleTextExtractor()
extractor.feed(html)
return extractor.get_word_count()
How do I resolve relative links to absolute URLs?

Use urllib.parse.urljoin. It correctly handles relative paths, protocol-relative URLs, and absolute URLs:

from urllib.parse import urljoin, urlparse

def resolve_links(hrefs: list[str], base_url: str) -> list[str]:
absolute = []
for href in hrefs:
if not href or href.startswith("#") or href.startswith("mailto:"):
continue
resolved = urljoin(base_url, href)
parsed = urlparse(resolved)
if parsed.scheme in ("http", "https"):
absolute.append(resolved)
return list(dict.fromkeys(absolute))[:100] # deduplicate, cap at 100

urljoin("https://example.com/news/", "../about")"https://example.com/about". It handles all the edge cases you would otherwise miss.

How do I implement exponential backoff correctly for both threaded and async versions?

Factor the retry logic into a function that accepts a callable, so the same interface works for both sync and async:

# retry.py

import time
import random
import asyncio

def with_retry_sync(fn, max_retries: int = 3, base_delay: float = 1.0):
last_exc = None
for attempt in range(max_retries + 1):
try:
return fn()
except Exception as exc:
last_exc = exc
if attempt == max_retries:
break
delay = base_delay * (2 ** attempt) * random.uniform(0.5, 1.5)
time.sleep(delay)
raise last_exc

async def with_retry_async(coro_fn, max_retries: int = 3, base_delay: float = 1.0):
"""coro_fn must be a zero-argument callable that returns a coroutine."""
last_exc = None
for attempt in range(max_retries + 1):
try:
return await coro_fn()
except Exception as exc:
last_exc = exc
if attempt == max_retries:
break
delay = base_delay * (2 ** attempt) * random.uniform(0.5, 1.5)
await asyncio.sleep(delay)
raise last_exc

# Usage in threaded version:
result = with_retry_sync(lambda: requests.get(url, timeout=timeout), max_retries=3)

# Usage in async version:
result = await with_retry_async(
lambda: session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)),
max_retries=3,
)
How do I implement the async domain rate limiter correctly?

The async version needs asyncio.Lock (not threading.Lock) and asyncio.sleep (not time.sleep):

import asyncio
import time
from urllib.parse import urlparse

class AsyncDomainRateLimiter:
def __init__(self, delay_seconds: float = 1.0):
self._last_request: dict[str, float] = {}
self._locks: dict[str, asyncio.Lock] = {}
self.delay = delay_seconds

def _get_lock(self, domain: str) -> asyncio.Lock:
if domain not in self._locks:
self._locks[domain] = asyncio.Lock()
return self._locks[domain]

async def wait(self, url: str) -> None:
domain = urlparse(url).netloc
lock = self._get_lock(domain)
async with lock:
last = self._last_request.get(domain, 0.0)
elapsed = time.monotonic() - last
wait_time = max(0.0, self.delay - elapsed)
if wait_time > 0:
await asyncio.sleep(wait_time)
self._last_request[domain] = time.monotonic()

The async with lock ensures only one coroutine at a time reads and updates _last_request[domain], preventing race conditions where two coroutines targeting the same domain both read the same last value and both decide they don't need to wait.

How do I use as_completed to stream results as they finish?

concurrent.futures.as_completed yields futures as they complete, not in submission order:

from concurrent.futures import ThreadPoolExecutor, as_completed

def scrape_all_threaded(urls, max_workers=10, timeout=10.0, on_result=None):
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {
executor.submit(scrape_url, url, timeout): url
for url in urls
}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
except Exception as exc:
result = ScrapeResult(
url=url, status=None, title=None, word_count=0,
links_found=[], error=str(exc),
fetched_at=datetime.utcnow().isoformat(),
duration_ms=0,
)
results.append(result)
if on_result:
on_result(result)
return results

For the async version, use asyncio.as_completed (Python 3.10+) or wrap each coroutine in a task and use asyncio.gather(..., return_exceptions=True) if you don't need streaming.

How do I measure concurrent connections to verify the semaphore is working?

Wrap the fetch function with a counter using threading.Semaphore (not for limiting - for observing):

import threading

_active_connections = 0
_max_observed = 0
_counter_lock = threading.Lock()

def tracked_scrape_url(url, timeout):
global _active_connections, _max_observed
with _counter_lock:
_active_connections += 1
_max_observed = max(_max_observed, _active_connections)
try:
return scrape_url(url, timeout)
finally:
with _counter_lock:
_active_connections -= 1

# After scraping:
print(f"Peak concurrent connections: {_max_observed}")
assert _max_observed <= max_workers

Extension Challenges

These are optional. Attempt them only after all acceptance criteria pass.

Extension A - Politeness: sitemap.xml Parsing

Before scraping a domain, fetch its sitemap.xml (if it exists) and use it to discover URLs rather than requiring them to be provided manually:

  • Fetch https://{domain}/sitemap.xml
  • Parse the XML and extract all <loc> URLs
  • Deduplicate against the input URL list
  • Respect <changefreq> hints - skip URLs with changefreq=never that were already scraped recently (store results in a SQLite cache keyed by URL + last-scraped date)

Extension B - robots.txt Compliance

Before fetching any URL, check the domain's robots.txt and skip URLs that are disallowed for your user agent:

import urllib.robotparser

def is_allowed(url: str, user_agent: str = "MyCrawler/1.0") -> bool:
from urllib.parse import urlparse
parsed = urlparse(url)
robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
rp = urllib.robotparser.RobotFileParser()
rp.set_url(robots_url)
rp.read()
return rp.can_fetch(user_agent, url)

Cache the parsed robots.txt per domain so you do not re-fetch it for every URL on the same domain.

Extension C - Performance Benchmark

Add a benchmark.py script that runs both the threaded and async implementations against the same list of 100 URLs and produces a comparison report:

=== Scraper Benchmark (100 URLs) ===

ThreadPoolExecutor (max_workers=10):
Total time: 18.2s
p50 latency: 412ms
p95 latency: 1240ms
Succeeded: 87 / 100

asyncio + aiohttp (concurrency=10):
Total time: 14.7s
p50 latency: 383ms
p95 latency: 1108ms
Succeeded: 87 / 100

Winner: asyncio (19% faster, same success rate)

Use statistics.quantiles for percentile calculations. Run each implementation three times and report the median total time to reduce noise.

© 2026 EngineersOfAI. All rights reserved.