Apache Airflow DAG for CSV Data Processing with PythonOperator

  • Share this:

Code introduction


This code block defines an Apache Airflow DAG containing a PythonOperator task that processes CSV file data and saves the results to a new CSV file. The code uses the Pandas library for data processing and leverages the DAG and task definition features of Airflow.


Technology Stack : Apache Airflow, Pandas, Python

Code Type : The type of code

Code Difficulty : Advanced


                
                    
def random_task(file_path, output_path):
    import os
    import pandas as pd
    from airflow.operators.python_operator import PythonOperator
    from airflow.utils.dates import days_ago
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.hooks.base import BaseHook
    from airflow.models import DAG

    def process_data():
        # Read CSV file into a DataFrame
        df = pd.read_csv(file_path)
        # Process the data (e.g., filtering, aggregation)
        filtered_df = df[df['value'] > 100]
        # Save the processed data to a new CSV file
        filtered_df.to_csv(output_path, index=False)

    # Define the DAG
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': days_ago(1),
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
    }
    dag = DAG('random_data_processing', default_args=default_args, schedule_interval='@daily')

    # Define the task
    task = PythonOperator(
        task_id='process_data',
        python_callable=process_data,
        provide_context=True,
        dag=dag,
    )

    # Define the start and end tasks
    start_task = DummyOperator(task_id='start', dag=dag)
    end_task = DummyOperator(task_id='end', dag=dag)

    # Set the task dependencies
    start_task >> task >> end_task