Luigi Task for Database Table Creation, Population, and Querying

  • Share this:

Code introduction


This code defines a Luigi task that creates a database table, populates it with data, and queries the data.


Technology Stack : Luigi, PostgreSQL

Code Type : Luigi Task

Code Difficulty : Intermediate


                
                    
import random
import luigi
from luigi.contrib.postgres import PostgresTarget

def randomize_query():
    operators = ["+", "-", "*", "/"]
    tables = ["table1", "table2", "table3"]
    columns = ["column1", "column2", "column3"]
    query = f"SELECT {random.choice(columns)} FROM {random.choice(tables)} WHERE {random.choice(columns)} {random.choice(operators)} {random.choice(columns)}"
    return query

def generate_data_table():
    return f"CREATE TABLE IF NOT EXISTS {random.choice(['data_table1', 'data_table2', 'data_table3'])} (id INT, value VARCHAR(255));"

def populate_data_table(table_name):
    query = f"INSERT INTO {table_name} (id, value) VALUES (1, 'value1'), (2, 'value2'), (3, 'value3');"

def fetch_data(table_name):
    query = f"SELECT * FROM {table_name};"
    return query

class CreateData luigi.Task:
    def run(self):
        table_name = generate_data_table()
        self.output().write(table_name)
        self.output().write(populate_data_table(table_name))

class PopulateDataTask(CreateData):
    def output(self):
        return luigi.LocalTarget(f"{self.__class__.__name__}_output.txt")

class FetchDataTask(luigi.Task):
    table_name = luigi.Parameter()

    def requires(self):
        return CreateData()

    def run(self):
        query = fetch_data(self.table_name)
        self.output().write(query)

    def output(self):
        return luigi.LocalTarget(f"{self.__class__.__name__}_{self.table_name}.txt")

# JSON representation