You can download this code by clicking the button below.
This code is now available for download.
This code defines two Luigi tasks: GenerateData and AggregateData. The GenerateData task generates a DataFrame with a random date and a random number of rows, while the AggregateData task reads the data generated by GenerateData, calculates the sum of all numbers.
Technology Stack : Luigi, pandas, numpy, datetime
Code Type : The type of code
Code Difficulty : Intermediate
import random
from luigi import Parameter, Task, requires
from datetime import datetime
import pandas as pd
import numpy as np
def random_date(start, end):
"""
Generate a random datetime between start and end dates.
"""
delta = end - start
int_delta = (delta.days * 24 * 60 * 60) + delta.seconds
random_second = random.randrange(int_delta)
return start + timedelta(seconds=random_second)
class GenerateData(Task):
"""
A Luigi task that generates a random DataFrame with dates and a random number of rows.
"""
date = Parameter()
def output(self):
return pd.DataFrame({
'date': [self.date] * 100,
'number': np.random.randint(1, 100, size=100)
})
class AggregateData(Task):
"""
A Luigi task that aggregates data from GenerateData task and calculates the sum of numbers.
"""
@requires(GenerateData)
def run(self):
df = pd.read_csv(self.input().path)
result = df['number'].sum()
self.output().write_text(str(result))
def output(self):
return local_target('aggregated_result.txt')
# Example usage
if __name__ == '__main__':
luigi.run([AggregateData(datetime.now())])