Creating a Background Scheduler with APScheduler

  • Share this:

Code introduction


This function uses the APScheduler library to create a background task scheduler. It accepts two arguments: the task interval time and the task parameters. The function first creates an instance of the scheduler and configures thread pool and process pool executors. Then, it adds a scheduled task that executes a random task with the specified interval time and parameters. Finally, it starts the scheduler and keeps the main thread active in an infinite loop until an interrupt signal is received.


Technology Stack : APScheduler, ThreadPoolExecutor, ProcessPoolExecutor, BackgroundScheduler, random, time

Code Type : Timed task execution function

Code Difficulty : Intermediate


                
                    
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
import random
import time

def random_task():
    print("Random task executed at", time.strftime("%Y-%m-%d %H:%M:%S"))

def xxx(arg1, arg2):
    scheduler = BackgroundScheduler(executors={ 
        'default': ThreadPoolExecutor(10),
        'processpool': ProcessPoolExecutor(3)
    })
    scheduler.add_job(random_task, 'interval', minutes=arg1, args=[arg2])
    scheduler.start()
    try:
        # To keep the main thread alive
        while True:
            time.sleep(2)
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()