You can download this code by clicking the button below.
This code is now available for download.
This code defines a Celery task that sleeps for a random duration within a randomly named event, and triggers a periodic task every minute.
Technology Stack : The code uses the Celery library to create a task that sleeps for a random duration in a randomly named event and includes a periodic task scheduler.
Code Type : The type of code
Code Difficulty :
from celery import Celery
from celery.schedules import crontab
import random
import time
def random_task_name():
return ''.join(random.choices('abcdefghijklmnopqrstuvwxyz', k=8))
def random_function():
# Create a new Celery instance
app = Celery(random_task_name())
# Define a task that sleeps for a random number of seconds
@app.task
def sleep_for_a_while(duration):
time.sleep(duration)
return f"Task completed after {duration} seconds."
# Schedule a periodic task to run every minute
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls the 'sleep_for_a_while' task every minute
sender.add_periodic_task(60.0, sleep_for_a_while.s(5), name=random_task_name())
# Return the task and the scheduled periodic task
return sleep_for_a_while, setup_periodic_tasks
# Usage example
task, periodic_task = random_function()
# Start the Celery worker to process the periodic task
# celery -A your_module_name.celery worker --loglevel=info