RabbitMQ Message Consumption with RMQ Library

  • Share this:

Code introduction


This function uses the RMQ library to connect to a RabbitMQ server, declare an exchange, bind the queue to the exchange with a specified routing key, and then consume messages from the queue. The function uses operations such as connection, exchange declaration, queue declaration, queue binding, message consumption, and message acknowledgment.


Technology Stack : RMQ library, RabbitMQ server, exchange, queue, message consumption

Code Type : Function

Code Difficulty : Intermediate


                
                    
def consume_messages(queue_name, exchange_name, routing_key, durable=False):
    """
    This function consumes messages from a RabbitMQ queue and processes them.
    It uses the RMQ library to connect to a RabbitMQ server, declare an exchange,
    bind the queue to the exchange with a specified routing key, and then consume
    messages from the queue.
    """
    import pika
    
    # Connect to the RabbitMQ server
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # Declare the exchange
    channel.exchange_declare(exchange=exchange_name, exchange_type='direct', durable=durable)
    
    # Declare the queue
    channel.queue_declare(queue=queue_name, durable=durable)
    
    # Bind the queue to the exchange with the specified routing key
    channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key=routing_key)
    
    # Define a callback function for processing messages
    def callback(ch, method, properties, body):
        print(f"Received message: {body}")
        # Acknowledge the message
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    # Start consuming messages
    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
    
    print('Waiting for messages. To exit press CTRL+C')
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()