You can download this code by clicking the button below.
This code is now available for download.
This code creates a scheduled task that uses ThreadPoolExecutor from APScheduler to execute a simple task, and handles the actions after task execution through event listening.
Technology Stack : APScheduler, ThreadPoolExecutor
Code Type : Timed task execution
Code Difficulty : Intermediate
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED
from apscheduler.jobstores.base import JobStore
def random_task():
# This function demonstrates the use of ThreadPoolExecutor from apscheduler to execute a job
def task():
print("Executing a task with ThreadPoolExecutor")
# Create a job store
job_store = JobStore()
# Create a thread pool executor
executor = ThreadPoolExecutor(max_workers=2)
# Schedule the task to run every 10 seconds
job = job_store.add_job(task, 'interval', seconds=10)
# Start the executor
executor.start()
# Define a function to handle job execution events
def job_listener(event):
if event.event_type == EVENT_JOB_EXECUTED:
print("Job executed")
# Add the listener to the executor
executor.add_listener(job_listener, EVENT_JOB_EXECUTED)
# Stop the executor after 30 seconds
executor.shutdown(wait=True)
# Remove the job from the store
job_store.remove_job(job.id)