Random Task Executor in Celery

  • Share this:

Code introduction


This code defines a custom Celery task that randomly selects another Celery task to execute and returns the result. This involves Celery task scheduling and random selection features.


Technology Stack : The code uses the Celery package and technology stack, which is a distributed task queue system.

Code Type : The type of code

Code Difficulty : Advanced


                
                    
from celery import Celery
from celery.utils.log import get_task_logger
import random

app = Celery('tasks', broker='pyamqp://guest@localhost//')
logger = get_task_logger(__name__)

def random_task():
    # This function generates a random number and logs it
    num = random.randint(1, 100)
    logger.info(f'Generated random number: {num}')
    return num

@app.task
def add(x, y):
    return x + y

@app.task
def multiply(x, y):
    return x * y

@app.task
def divide(x, y):
    if y == 0:
        raise ValueError("Cannot divide by zero")
    return x / y

@app.task
def pow(x, y):
    return x ** y

@app.task
def random_task_with_choice():
    # This function randomly selects another task to execute
    tasks = [add, multiply, divide, pow]
    chosen_task = random.choice(tasks)
    return chosen_task(random.randint(1, 100), random.randint(1, 100))

def xxx(arg1, arg2):
    return random_task_with_choice()