Project 2 - Frequency Analyzer
Real-world systems constantly analyze frequency:
- Word frequency in search engines
- Log level frequency in monitoring systems
- Click frequency in recommendation engines
- Event frequency in analytics dashboards
We will build a scalable Frequency Analyzer.
System Requirements
Our analyzer should support:
- Batch frequency analysis
- Streaming updates
- Top-K frequent elements
- Category-based grouping
- Sliding window frequency
- Performance benchmarking
Step 1 - Batch Text Frequency Analyzer
We start with basic text analytics.
from collections import Counter
import re
class FrequencyAnalyzer:
def __init__(self):
self.counter = Counter()
def process_text(self, text):
words = re.findall(r'\w+', text.lower())
self.counter.update(words)
def most_common(self, n=10):
return self.counter.most_common(n)
def total_unique_words(self):
return len(self.counter)
def total_word_count(self):
return sum(self.counter.values())
Usage Example
text = """
Python is powerful. Python is scalable.
Data structures power scalable systems.
"""
analyzer = FrequencyAnalyzer()
analyzer.process_text(text)
print("Top words:", analyzer.most_common(3))
print("Unique words:", analyzer.total_unique_words())
print("Total words:", analyzer.total_word_count())
Step 2 - Streaming Event Frequency
Simulate real-time event processing.
from collections import Counter
import random
class StreamingFrequency:
def __init__(self):
self.counter = Counter()
def process_event(self, event):
self.counter[event] += 1
def top_k(self, k=5):
return self.counter.most_common(k)
Streaming Simulation
events = ["click", "view", "purchase", "scroll"]
stream = StreamingFrequency()
for _ in range(1_000_000):
event = random.choice(events)
stream.process_event(event)
print("Top events:", stream.top_k())
This simulates event analytics pipeline.
Step 3 - Top-K Using Heap (Efficient Large Dataset)
Instead of full sorting:
import heapq
def top_k_heap(counter, k):
return heapq.nlargest(k, counter.items(), key=lambda x: x[1])
Complexity:
O(n log k) instead of O(n log n)
Better for very large datasets.
Step 4 - Log Aggregation System
Simulate log levels:
from collections import defaultdict
import random
log_levels = ["INFO", "WARNING", "ERROR", "DEBUG"]
logs = [
{"service": f"service_{random.randint(1,10)}",
"level": random.choice(log_levels)}
for _ in range(500_000)
]
log_aggregation = defaultdict(Counter)
for log in logs:
log_aggregation[log["service"]][log["level"]] += 1
for service, counts in log_aggregation.items():
print(service, counts)
This is how observability systems aggregate logs.
Step 5 - Sliding Window Frequency
Maintain frequency for last N events only.
from collections import deque, Counter
class SlidingWindowFrequency:
def __init__(self, window_size):
self.window = deque(maxlen=window_size)
self.counter = Counter()
def add_event(self, event):
if len(self.window) == self.window.maxlen:
removed = self.window[0]
self.counter[removed] -= 1
if self.counter[removed] == 0:
del self.counter[removed]
self.window.append(event)
self.counter[event] += 1
def current_top(self, k=3):
return self.counter.most_common(k)
Sliding Window Simulation
events = ["click", "view", "purchase"]
sw = SlidingWindowFrequency(window_size=1000)
for _ in range(5000):
sw.add_event(random.choice(events))
print("Top in window:", sw.current_top())
Used in:
- Real-time dashboards
- Fraud detection systems
- Trend detection
Step 6 - Performance Benchmark
import time
import random
from collections import Counter
data = [random.randint(1, 1000) for _ in range(2_000_000)]
start = time.time()
manual = {}
for x in data:
manual[x] = manual.get(x, 0) + 1
print("Manual time:", time.time() - start)
start = time.time()
counter = Counter(data)
print("Counter time:", time.time() - start)
Counter often outperforms manual dictionary counting.
Step 7 - Memory Considerations
Counter stores:
- Unique keys only
- Frequency values
Memory complexity: O(unique elements)
If unique count is very large, memory usage grows.
For extremely large data:
- Use streaming aggregation
- Use approximate algorithms (Count-Min Sketch)
- Use database aggregation
Engineering requires memory awareness.
Step 8 - Engineering Extensions
Enhance system to:
- Track time-based decay
- Export frequency report
- Detect anomalies
- Build trending topics detector
- Add threshold alerts
- Implement distributed counting
What You Learned
This project required:
- Counter for fast counting
- defaultdict for grouping
- heap for top-K
- deque for sliding window
- Understanding complexity tradeoffs
Frequency analysis is core to:
- Search engines
- Monitoring systems
- Recommendation engines
- Analytics dashboards
Final Engineering Takeaway
Frequency analysis at scale requires:
- Efficient data structures
- Memory awareness
- Incremental processing
- Top-K optimization
- Windowed analytics
Naive counting fails at scale.
Structured data structure design succeeds.
This is analytics engineering in practice.
