You can download this code by clicking the button below.
This code is now available for download.
This function uses the RMQ library to connect to a RabbitMQ server, declare an exchange, bind the queue to the exchange with a specified routing key, and then consume messages from the queue. The function uses operations such as connection, exchange declaration, queue declaration, queue binding, message consumption, and message acknowledgment.
Technology Stack : RMQ library, RabbitMQ server, exchange, queue, message consumption
Code Type : Function
Code Difficulty : Intermediate
def consume_messages(queue_name, exchange_name, routing_key, durable=False):
"""
This function consumes messages from a RabbitMQ queue and processes them.
It uses the RMQ library to connect to a RabbitMQ server, declare an exchange,
bind the queue to the exchange with a specified routing key, and then consume
messages from the queue.
"""
import pika
# Connect to the RabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare the exchange
channel.exchange_declare(exchange=exchange_name, exchange_type='direct', durable=durable)
# Declare the queue
channel.queue_declare(queue=queue_name, durable=durable)
# Bind the queue to the exchange with the specified routing key
channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key=routing_key)
# Define a callback function for processing messages
def callback(ch, method, properties, body):
print(f"Received message: {body}")
# Acknowledge the message
ch.basic_ack(delivery_tag=method.delivery_tag)
# Start consuming messages
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
print('Waiting for messages. To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()