Project 3 - Priority Task Scheduler
Modern systems constantly schedule tasks:
- Background jobs
- Payment retries
- Email notifications
- Log processing
- Data aggregation
- Cron-based jobs
A scheduler must:
- Execute highest priority task first
- Support delayed execution
- Scale efficiently
- Avoid full sorting
We will build a simplified priority scheduler.
System Requirements
Our scheduler must support:
- Add task with priority
- Add delayed task (execute at timestamp)
- Execute next available task
- Retry failed tasks
- View pending tasks
- Support multiple workers
Step 1 - Core Scheduler Using Heap
import heapq
import time
import itertools
class PriorityTaskScheduler:
def __init__(self):
self._queue = []
self._counter = itertools.count() # prevents comparison conflicts
def add_task(self, priority, task_name, delay=0):
execution_time = time.time() + delay
task_id = next(self._counter)
heapq.heappush(
self._queue,
(execution_time, priority, task_id, task_name)
)
def get_next_task(self):
if not self._queue:
return None
execution_time, priority, task_id, task_name = self._queue[0]
if execution_time > time.time():
return None # task not ready yet
heapq.heappop(self._queue)
return task_name
def pending_tasks(self):
return len(self._queue)
Basic Usage
scheduler = PriorityTaskScheduler()
scheduler.add_task(priority=1, task_name="Critical Fix")
scheduler.add_task(priority=5, task_name="Low Priority Task")
scheduler.add_task(priority=2, task_name="Medium Task")
while scheduler.pending_tasks():
task = scheduler.get_next_task()
if task:
print("Executing:", task)
Step 2 - Simulating Delayed Tasks
scheduler.add_task(priority=1, task_name="Run after 3 seconds", delay=3)
while True:
task = scheduler.get_next_task()
if task:
print("Executing:", task)
time.sleep(1)
This simulates time-based scheduling.
Step 3 - Worker Simulation
def worker(name, scheduler):
task = scheduler.get_next_task()
if task:
print(f"{name} executing {task}")
return True
return False
scheduler = PriorityTaskScheduler()
for i in range(10):
scheduler.add_task(priority=i % 3, task_name=f"Task_{i}")
while scheduler.pending_tasks():
worker("Worker1", scheduler)
worker("Worker2", scheduler)
Simulates parallel consumers.
Step 4 - Retry Failed Tasks
def execute_task(scheduler, task_name):
import random
success = random.choice([True, False])
if not success:
print("Task failed. Retrying...")
scheduler.add_task(priority=1, task_name=task_name, delay=2)
else:
print("Task completed:", task_name)
Modify worker:
def worker(name, scheduler):
task = scheduler.get_next_task()
if task:
print(f"{name} executing {task}")
execute_task(scheduler, task)
Now failed tasks re-enter queue.
Step 5 - Performance Analysis
Heap operations:
| Operation | Complexity |
|---|---|
| Add task | O(log n) |
| Pop task | O(log n) |
| Peek task | O(1) |
If we used full sorting each time:
O(n log n) per scheduling cycle.
Heap enables efficient incremental scheduling.
Step 6 - Scaling Simulation
scheduler = PriorityTaskScheduler()
import random
start = time.time()
for i in range(1_000_000):
scheduler.add_task(
priority=random.randint(1, 10),
task_name=f"Task_{i}"
)
print("Added 1M tasks in:", time.time() - start)
Heap handles large queue efficiently.
Step 7 - Memory Consideration
Memory complexity:
O(n)
For extremely large queues:
- Use persistent storage
- Use distributed task brokers
- Use sharding strategy
Production schedulers often:
- Store metadata in database
- Keep in-memory heap for active tasks only
Step 8 - Engineering Improvements
Enhance system to:
- Track task status
- Add cancellation feature
- Add timeout handling
- Add worker pool management
- Persist queue to disk
- Support multiple priority levels cleanly
- Add monitoring metrics
Real-World Mapping
This simplified scheduler resembles:
- Celery worker queue
- Event-driven microservices
- Cron-like systems
- Message brokers
- Job orchestration tools
All powered by priority logic.
What You Learned
This project required:
- heapq for priority management
- itertools counter for stable ordering
- understanding O(log n) operations
- delayed scheduling logic
- retry and failure handling
- worker simulation
This is systems thinking in practice.
Final Engineering Takeaway
Schedulers are core to backend systems.
Using:
- Full sorting repeatedly → inefficient
- List-based queue → slow under scale
- Heap-based scheduling → scalable and efficient
Choosing the right data structure:
Makes the difference between:
Toy implementation
and
Production-ready architecture.
You have now built:
- Inventory system
- Analytics engine
- Priority scheduler
You are thinking like a systems engineer.
