Using RabbitMQ With Python
RabbitMQ is an open-source message-broker software that implements the Advanced Message Queuing Protocol (AMQP). It allows clients to connect and transfer messages to each other in a decentralized and flexible manner, enabling communication between different parts of a system through a common platform. RabbitMQ is used for building distributed systems and decoupling the components that produce and consume messages, making it easier to manage and scale applications.
Here are some common use cases when you should consider using RabbitMQ
- Task Queue: RabbitMQ can be used to distribute time-consuming tasks among multiple workers.
- Publish/Subscribe: RabbitMQ can be used as a message broker to handle the publish/subscribe pattern, allowing different parts of a system to subscribe to specific messages.
- Remote Procedure Call (RPC): RabbitMQ can be used to make synchronous and asynchronous remote procedure calls between different systems.
- Event-Driven Architecture: RabbitMQ can be used to build an event-driven architecture, where different parts of a system can send and receive messages about specific events.
- Microservices: RabbitMQ can be used to decouple microservices and enable communication between them.
- Internet of Things (IoT): RabbitMQ can be used to handle the communication between IoT devices and a central system.
- E-commerce systems: RabbitMQ can be used to handle the flow of messages between the various components of an e-commerce system, such as order management, inventory management, and payment processing.
Using RabbitMQ with Python
In this example, the server sends a message for the client to process. Β This kind of architecture can be used in an IOT setting, Video Processing, Frankly any long running tasks.
Server Code
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
Client Code
import pika
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
Let's Take Another Example Using Flask
A very common scenario where your flask API implements a long running task. A similar approach can be used with Fast API too.
Server(Flask)
from flask import Flask, request
import pika
app = Flask(__name__)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
@app.route('/task', methods=['POST'])
def add_task():
task = request.form['task']
channel.basic_publish(exchange='',
routing_key='task_queue',
body=task,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print(" [x] Sent task: %r" % task)
return 'Task added'
if __name__ == '__main__':
app.run()
Client(Recieving Script)
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received task: %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
In this example, the Flask server accepts a POST request at /task
, extracts the task
parameter from the request, and publishes it to RabbitMQ with the routing key task_queue
. The client declares the queue task_queue
, sets up a basic consumer that listens to the queue, and starts consuming messages. Whenever a message is received, the callback
function is executed, and the message body is printed. The time.sleep
function is used to simulate a time-consuming task, and the basic_ack
function is used to acknowledge the completion of the task.
Best Practices When Using RMQ
- Durable Queues: Ensure that your queues are durable so that messages are not lost even if RabbitMQ restarts.
- Acknowledgments: Use acknowledgments to ensure that messages are not lost if a consumer crashes or becomes unavailable.
- Message Persistence: Make sure that important messages are persisted to disk by setting the
delivery_mode
property to 2. - Queue Declarations: Declare queues before using them, and set the
durable
property toTrue
if you want to ensure that messages are not lost in case of a crash. - Error Handling: Implement proper error handling in your code to catch exceptions and log errors.
In conclusion, RabbitMQ is a powerful messaging middleware that is well suited for a wide range of use cases. By following these best practices, you can ensure that your RabbitMQ-based system is reliable, scalable, and performant.
Happy Hacking π