Luigi Tasks for Random Data Generation and Aggregation

  • Share this:

Code introduction


This code defines two Luigi tasks: GenerateData and AggregateData. The GenerateData task generates a DataFrame with a random date and a random number of rows, while the AggregateData task reads the data generated by GenerateData, calculates the sum of all numbers.


Technology Stack : Luigi, pandas, numpy, datetime

Code Type : The type of code

Code Difficulty : Intermediate


                
                    
import random
from luigi import Parameter, Task, requires
from datetime import datetime
import pandas as pd
import numpy as np

def random_date(start, end):
    """
    Generate a random datetime between start and end dates.
    """
    delta = end - start
    int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
    random_second = random.randrange(int_delta)
    return start + timedelta(seconds=random_second)

class GenerateData(Task):
    """
    A Luigi task that generates a random DataFrame with dates and a random number of rows.
    """
    date = Parameter()

    def output(self):
        return pd.DataFrame({
            'date': [self.date] * 100,
            'number': np.random.randint(1, 100, size=100)
        })

class AggregateData(Task):
    """
    A Luigi task that aggregates data from GenerateData task and calculates the sum of numbers.
    """
    @requires(GenerateData)
    def run(self):
        df = pd.read_csv(self.input().path)
        result = df['number'].sum()
        self.output().write_text(str(result))

    def output(self):
        return local_target('aggregated_result.txt')

# Example usage
if __name__ == '__main__':
    luigi.run([AggregateData(datetime.now())])