Random Task Execution with Apache Airflow PythonOperator

  • Share this:

Code introduction


This function uses Apache Airflow's PythonOperator to execute a randomly selected custom task. The task can be a pause, printing a message, generating a random number, or selecting an element from a list.


Technology Stack : Apache Airflow, Python, random, time

Code Type : The type of code

Code Difficulty : Intermediate


                
                    
def random_choice_dict(arg1, arg2):
    import random
    import airflow.operators.python_operator
    from airflow.models import DAG
    from airflow.utils.dates import days_ago

    def generate_random_choice():
        choices = [
            {"name": "sleep", "args": {"duration": 5}},
            {"name": "print", "args": {"message": "Hello, Airflow!"}},
            {"name": "random.randint", "args": {"low": 1, "high": 100}},
            {"name": "random.choice", "args": {"choices": ["Apple", "Banana", "Cherry"]}}
        ]
        return random.choice(choices)

    def execute_task(task_dict):
        task_name = task_dict["name"]
        task_args = task_dict["args"]

        if task_name == "sleep":
            import time
            time.sleep(task_args["duration"])
        elif task_name == "print":
            print(task_args["message"])
        elif task_name == "random.randint":
            return random.randint(task_args["low"], task_args["high"])
        elif task_name == "random.choice":
            return random.choice(task_args["choices"])

    with DAG('random_choice_example', start_date=days_ago(1), catchup=False) as dag:
        random_choice = generate_random_choice()
        task = airflow.operators.python_operator.PythonOperator(
            task_id='random_choice_task',
            python_callable=execute_task,
            op_kwargs=random_choice
        )
        task