Random SQL Query Executor

  • Share this:

Code introduction


This function connects to a PostgreSQL database, randomly selects a query type from a provided list, and generates and executes the corresponding SQL query. It can perform SELECT, INSERT, UPDATE, or DELETE queries.


Technology Stack : psycopg2

Code Type : Function

Code Difficulty : Intermediate


                
                    
import psycopg2
import random

def execute_random_query(connection_string, query_types):
    # Connect to the PostgreSQL database
    conn = psycopg2.connect(connection_string)
    cursor = conn.cursor()

    # Randomly select a query type from the provided list
    query_type = random.choice(query_types)
    if query_type == 'SELECT':
        # Generate a random SELECT query
        table_name = random.choice(['users', 'products', 'orders'])
        columns = random.sample(['id', 'name', 'price', 'quantity', 'order_date'], k=3)
        query = f"SELECT {', '.join(columns)} FROM {table_name}"
    elif query_type == 'INSERT':
        # Generate a random INSERT query
        table_name = random.choice(['products', 'orders'])
        columns = ['name', 'price', 'quantity']
        values = [f"'{random.choice(['Product A', 'Product B', 'Product C'])}', "
                  f"{random.uniform(10.0, 1000.0):.2f}, "
                  f"{random.randint(1, 100)}"
                  ]
        query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join(values)})"
    elif query_type == 'UPDATE':
        # Generate a random UPDATE query
        table_name = random.choice(['products', 'orders'])
        column = random.choice(['price', 'quantity'])
        condition = f"{random.choice(['price', 'quantity'])} = {random.uniform(10.0, 1000.0):.2f}"
        query = f"UPDATE {table_name} SET {column} = {random.uniform(10.0, 1000.0):.2f} WHERE {condition}"
    elif query_type == 'DELETE':
        # Generate a random DELETE query
        table_name = random.choice(['products', 'orders'])
        condition = f"{random.choice(['price', 'quantity'])} = {random.uniform(10.0, 1000.0):.2f}"
        query = f"DELETE FROM {table_name} WHERE {condition}"

    try:
        # Execute the query
        cursor.execute(query)
        # Fetch and print the results if it's a SELECT query
        if query_type == 'SELECT':
            results = cursor.fetchall()
            print(results)
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        # Close the cursor and connection
        cursor.close()
        conn.close()

# Usage example
connection_string = "dbname='your_db' user='your_user' password='your_password' host='localhost'"
query_types = ['SELECT', 'INSERT', 'UPDATE', 'DELETE']
execute_random_query(connection_string, query_types)                
              
Tags: