Randomly Generated Luigi Target Processing with Postgres and HDFS

  • Share this:

Code introduction


This code defines a function named random_target that randomly generates a Luigi target for either Postgres or HDFS. Another function, process_data, processes data based on the type of target. If the target is Postgres, it executes a randomly generated SQL query and prints the results. If the target is HDFS, it reads the file and prints each line.


Technology Stack : Luigi, Postgres, HDFS

Code Type : Function

Code Difficulty : Intermediate


                
                    
import random
import luigi
from luigi.contrib.postgres import PostgresTarget, PostgresQuery
from luigi.contrib import hdfs

def random_target():
    """
    This function generates a random Luigi target using a combination of Postgres and HDFS.
    """
    # Randomly select between a Postgres or HDFS target
    target_type = random.choice(['postgres', 'hdfs'])
    
    if target_type == 'postgres':
        # Generate a random table name and query
        table_name = f'table_{random.randint(1, 1000)}'
        query = f'SELECT * FROM {table_name}'
        return PostgresTarget(table_name=table_name), query
    else:
        # Generate a random file path for HDFS
        file_path = f'/user/hdfs/random_file_{random.randint(1, 1000)}.txt'
        return hdfs.HDFSTarget(path=file_path), None

def process_data(target, query=None):
    """
    This function processes data based on the type of target (Postgres or HDFS).
    """
    if isinstance(target, PostgresTarget):
        # Execute the query and fetch the results
        with target.open('r') as connection:
            cursor = connection.cursor()
            cursor.execute(query)
            for row in cursor:
                print(row)
    else:
        # Read the file from HDFS
        with target.open('r') as file:
            for line in file:
                print(line.strip())

# Usage example
target, query = random_target()
if query:
    process_data(target, query)
else:
    process_data(target)