You can download this code by clicking the button below.
This code is now available for download.
This code defines a Luigi task that creates a database table, populates it with data, and queries the data.
Technology Stack : Luigi, PostgreSQL
Code Type : Luigi Task
Code Difficulty : Intermediate
import random
import luigi
from luigi.contrib.postgres import PostgresTarget
def randomize_query():
operators = ["+", "-", "*", "/"]
tables = ["table1", "table2", "table3"]
columns = ["column1", "column2", "column3"]
query = f"SELECT {random.choice(columns)} FROM {random.choice(tables)} WHERE {random.choice(columns)} {random.choice(operators)} {random.choice(columns)}"
return query
def generate_data_table():
return f"CREATE TABLE IF NOT EXISTS {random.choice(['data_table1', 'data_table2', 'data_table3'])} (id INT, value VARCHAR(255));"
def populate_data_table(table_name):
query = f"INSERT INTO {table_name} (id, value) VALUES (1, 'value1'), (2, 'value2'), (3, 'value3');"
def fetch_data(table_name):
query = f"SELECT * FROM {table_name};"
return query
class CreateData luigi.Task:
def run(self):
table_name = generate_data_table()
self.output().write(table_name)
self.output().write(populate_data_table(table_name))
class PopulateDataTask(CreateData):
def output(self):
return luigi.LocalTarget(f"{self.__class__.__name__}_output.txt")
class FetchDataTask(luigi.Task):
table_name = luigi.Parameter()
def requires(self):
return CreateData()
def run(self):
query = fetch_data(self.table_name)
self.output().write(query)
def output(self):
return luigi.LocalTarget(f"{self.__class__.__name__}_{self.table_name}.txt")
# JSON representation