Using APScheduler with ThreadPoolExecutor for Scheduled Tasks

  • Share this:

Code introduction


This code creates a scheduled task that uses ThreadPoolExecutor from APScheduler to execute a simple task, and handles the actions after task execution through event listening.


Technology Stack : APScheduler, ThreadPoolExecutor

Code Type : Timed task execution

Code Difficulty : Intermediate


                
                    
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED
from apscheduler.jobstores.base import JobStore

def random_task():
    # This function demonstrates the use of ThreadPoolExecutor from apscheduler to execute a job
    def task():
        print("Executing a task with ThreadPoolExecutor")

    # Create a job store
    job_store = JobStore()

    # Create a thread pool executor
    executor = ThreadPoolExecutor(max_workers=2)

    # Schedule the task to run every 10 seconds
    job = job_store.add_job(task, 'interval', seconds=10)

    # Start the executor
    executor.start()

    # Define a function to handle job execution events
    def job_listener(event):
        if event.event_type == EVENT_JOB_EXECUTED:
            print("Job executed")

    # Add the listener to the executor
    executor.add_listener(job_listener, EVENT_JOB_EXECUTED)

    # Stop the executor after 30 seconds
    executor.shutdown(wait=True)

    # Remove the job from the store
    job_store.remove_job(job.id)