Random Executor Task Scheduler with APScheduler

  • Share this:

Code introduction


The code defines a function named random_task that creates a scheduled task scheduler using the APScheduler library. It randomly selects either a ThreadPoolExecutor or a ProcessPoolExecutor as the executor and sets up a task that runs every 10 seconds.


Technology Stack : APScheduler, ThreadPoolExecutor, ProcessPoolExecutor, BackgroundScheduler

Code Type : Timed task scheduling

Code Difficulty : Intermediate


                
                    
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
import random

def random_task():
    # This function randomly selects an executor from ThreadPoolExecutor and ProcessPoolExecutor
    # and starts a BackgroundScheduler to run a task every 10 seconds.
    executors = {
        'default': ThreadPoolExecutor(10),
        'processpool': ProcessPoolExecutor(3)
    }
    
    scheduler = BackgroundScheduler(executors=executors, log_level='INFO')
    
    # Randomly choose an executor to use
    chosen_executor = random.choice(list(executors.values()))
    
    # Define a task that will be run every 10 seconds
    def task():
        print("Task is running on", chosen_executor._processes[0].pid)
    
    # Schedule the task to run every 10 seconds
    scheduler.add_job(task, 'interval', seconds=10)
    scheduler.start()
    
    try:
        # To keep the main thread alive
        while True:
            time.sleep(2)
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()