Skip to main content

Project 3 - Priority Task Scheduler

Modern systems constantly schedule tasks:

  • Background jobs
  • Payment retries
  • Email notifications
  • Log processing
  • Data aggregation
  • Cron-based jobs

A scheduler must:

  • Execute highest priority task first
  • Support delayed execution
  • Scale efficiently
  • Avoid full sorting

We will build a simplified priority scheduler.

System Requirements

Our scheduler must support:

  1. Add task with priority
  2. Add delayed task (execute at timestamp)
  3. Execute next available task
  4. Retry failed tasks
  5. View pending tasks
  6. Support multiple workers

Step 1 - Core Scheduler Using Heap

import heapq
import time
import itertools


class PriorityTaskScheduler:

def __init__(self):
self._queue = []
self._counter = itertools.count() # prevents comparison conflicts

def add_task(self, priority, task_name, delay=0):
execution_time = time.time() + delay
task_id = next(self._counter)

heapq.heappush(
self._queue,
(execution_time, priority, task_id, task_name)
)

def get_next_task(self):
if not self._queue:
return None

execution_time, priority, task_id, task_name = self._queue[0]

if execution_time > time.time():
return None # task not ready yet

heapq.heappop(self._queue)
return task_name

def pending_tasks(self):
return len(self._queue)

Basic Usage

scheduler = PriorityTaskScheduler()

scheduler.add_task(priority=1, task_name="Critical Fix")
scheduler.add_task(priority=5, task_name="Low Priority Task")
scheduler.add_task(priority=2, task_name="Medium Task")

while scheduler.pending_tasks():
task = scheduler.get_next_task()
if task:
print("Executing:", task)

Step 2 - Simulating Delayed Tasks

scheduler.add_task(priority=1, task_name="Run after 3 seconds", delay=3)

while True:
task = scheduler.get_next_task()
if task:
print("Executing:", task)
time.sleep(1)

This simulates time-based scheduling.

Step 3 - Worker Simulation

def worker(name, scheduler):
task = scheduler.get_next_task()
if task:
print(f"{name} executing {task}")
return True
return False


scheduler = PriorityTaskScheduler()

for i in range(10):
scheduler.add_task(priority=i % 3, task_name=f"Task_{i}")

while scheduler.pending_tasks():
worker("Worker1", scheduler)
worker("Worker2", scheduler)

Simulates parallel consumers.

Step 4 - Retry Failed Tasks

def execute_task(scheduler, task_name):
import random
success = random.choice([True, False])

if not success:
print("Task failed. Retrying...")
scheduler.add_task(priority=1, task_name=task_name, delay=2)
else:
print("Task completed:", task_name)

Modify worker:

def worker(name, scheduler):
task = scheduler.get_next_task()
if task:
print(f"{name} executing {task}")
execute_task(scheduler, task)

Now failed tasks re-enter queue.

Step 5 - Performance Analysis

Heap operations:

OperationComplexity
Add taskO(log n)
Pop taskO(log n)
Peek taskO(1)

If we used full sorting each time:

O(n log n) per scheduling cycle.

Heap enables efficient incremental scheduling.

Step 6 - Scaling Simulation

scheduler = PriorityTaskScheduler()

import random
start = time.time()

for i in range(1_000_000):
scheduler.add_task(
priority=random.randint(1, 10),
task_name=f"Task_{i}"
)

print("Added 1M tasks in:", time.time() - start)

Heap handles large queue efficiently.

Step 7 - Memory Consideration

Memory complexity:

O(n)

For extremely large queues:

  • Use persistent storage
  • Use distributed task brokers
  • Use sharding strategy

Production schedulers often:

  • Store metadata in database
  • Keep in-memory heap for active tasks only

Step 8 - Engineering Improvements

Enhance system to:

  • Track task status
  • Add cancellation feature
  • Add timeout handling
  • Add worker pool management
  • Persist queue to disk
  • Support multiple priority levels cleanly
  • Add monitoring metrics

Real-World Mapping

This simplified scheduler resembles:

  • Celery worker queue
  • Event-driven microservices
  • Cron-like systems
  • Message brokers
  • Job orchestration tools

All powered by priority logic.

What You Learned

This project required:

  • heapq for priority management
  • itertools counter for stable ordering
  • understanding O(log n) operations
  • delayed scheduling logic
  • retry and failure handling
  • worker simulation

This is systems thinking in practice.

Final Engineering Takeaway

Schedulers are core to backend systems.

Using:

  • Full sorting repeatedly → inefficient
  • List-based queue → slow under scale
  • Heap-based scheduling → scalable and efficient

Choosing the right data structure:

Makes the difference between:

Toy implementation
and
Production-ready architecture.

You have now built:

  • Inventory system
  • Analytics engine
  • Priority scheduler

You are thinking like a systems engineer.

© 2026 EngineersOfAI. All rights reserved.