As a best-selling author, I invite you to explore my books on Amazon. Don’t forget to follow me on Medium and show your support. Thank you! Your support means the world!
Message queues have become essential components of modern distributed systems, providing asynchronous communication between services while ensuring reliable message delivery. In Python, several libraries and frameworks make implementing message queue systems efficient and straightforward. I’ll explore six powerful techniques for processing message queues in Python applications and provide practical code examples for each.
RabbitMQ and Pika: The Reliable Message Broker
RabbitMQ remains one of the most popular message brokers due to its reliability and flexibility. The Pika library provides a Python interface to RabbitMQ, making it easy to implement producers and consumers.
When working with RabbitMQ, I prefer implementing consumers with explicit acknowledgments to ensure messages aren’t lost when processing fails:
<span>import</span> <span>pika</span><span>import</span> <span>json</span><span>import</span> <span>time</span><span>def</span> <span>connect_to_rabbitmq</span><span>():</span><span># Implement connection with retry logic </span> <span>retry_count</span> <span>=</span> <span>0</span><span>while</span> <span>retry_count</span> <span><</span> <span>5</span><span>:</span><span>try</span><span>:</span><span>connection</span> <span>=</span> <span>pika</span><span>.</span><span>BlockingConnection</span><span>(</span><span>pika</span><span>.</span><span>ConnectionParameters</span><span>(</span><span>host</span><span>=</span><span>'</span><span>localhost</span><span>'</span><span>,</span> <span>heartbeat</span><span>=</span><span>600</span><span>)</span><span>)</span><span>return</span> <span>connection</span><span>except</span> <span>pika</span><span>.</span><span>exceptions</span><span>.</span><span>AMQPConnectionError</span><span>:</span><span>retry_count</span> <span>+=</span> <span>1</span><span>time</span><span>.</span><span>sleep</span><span>(</span><span>2</span><span>)</span><span>raise</span> <span>Exception</span><span>(</span><span>"</span><span>Failed to connect to RabbitMQ after multiple attempts</span><span>"</span><span>)</span><span>def</span> <span>process_message</span><span>(</span><span>channel</span><span>,</span> <span>method</span><span>,</span> <span>properties</span><span>,</span> <span>body</span><span>):</span><span>try</span><span>:</span><span>message</span> <span>=</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>body</span><span>)</span><span>print</span><span>(</span><span>f</span><span>"</span><span>Processing message: </span><span>{</span><span>message</span><span>}</span><span>"</span><span>)</span><span># Simulate processing work </span> <span>time</span><span>.</span><span>sleep</span><span>(</span><span>1</span><span>)</span><span># Message successfully processed, send acknowledgment </span> <span>channel</span><span>.</span><span>basic_ack</span><span>(</span><span>delivery_tag</span><span>=</span><span>method</span><span>.</span><span>delivery_tag</span><span>)</span><span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span><span>print</span><span>(</span><span>f</span><span>"</span><span>Error processing message: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span><span># Reject the message and don't requeue </span> <span>channel</span><span>.</span><span>basic_nack</span><span>(</span><span>delivery_tag</span><span>=</span><span>method</span><span>.</span><span>delivery_tag</span><span>,</span> <span>requeue</span><span>=</span><span>False</span><span>)</span><span>def</span> <span>start_consumer</span><span>():</span><span>connection</span> <span>=</span> <span>connect_to_rabbitmq</span><span>()</span><span>channel</span> <span>=</span> <span>connection</span><span>.</span><span>channel</span><span>()</span><span># Declare queue with durability for persistence </span> <span>channel</span><span>.</span><span>queue_declare</span><span>(</span><span>queue</span><span>=</span><span>'</span><span>task_queue</span><span>'</span><span>,</span> <span>durable</span><span>=</span><span>True</span><span>)</span><span># Prefetch limits to avoid overwhelming the consumer </span> <span>channel</span><span>.</span><span>basic_qos</span><span>(</span><span>prefetch_count</span><span>=</span><span>10</span><span>)</span><span># Register consumer </span> <span>channel</span><span>.</span><span>basic_consume</span><span>(</span><span>queue</span><span>=</span><span>'</span><span>task_queue</span><span>'</span><span>,</span> <span>on_message_callback</span><span>=</span><span>process_message</span><span>)</span><span>print</span><span>(</span><span>"</span><span>Consumer started. Press CTRL+C to exit</span><span>"</span><span>)</span><span>try</span><span>:</span><span>channel</span><span>.</span><span>start_consuming</span><span>()</span><span>except</span> <span>KeyboardInterrupt</span><span>:</span><span>channel</span><span>.</span><span>stop_consuming</span><span>()</span><span>connection</span><span>.</span><span>close</span><span>()</span><span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span><span>start_consumer</span><span>()</span><span>import</span> <span>pika</span> <span>import</span> <span>json</span> <span>import</span> <span>time</span> <span>def</span> <span>connect_to_rabbitmq</span><span>():</span> <span># Implement connection with retry logic </span> <span>retry_count</span> <span>=</span> <span>0</span> <span>while</span> <span>retry_count</span> <span><</span> <span>5</span><span>:</span> <span>try</span><span>:</span> <span>connection</span> <span>=</span> <span>pika</span><span>.</span><span>BlockingConnection</span><span>(</span> <span>pika</span><span>.</span><span>ConnectionParameters</span><span>(</span><span>host</span><span>=</span><span>'</span><span>localhost</span><span>'</span><span>,</span> <span>heartbeat</span><span>=</span><span>600</span><span>)</span> <span>)</span> <span>return</span> <span>connection</span> <span>except</span> <span>pika</span><span>.</span><span>exceptions</span><span>.</span><span>AMQPConnectionError</span><span>:</span> <span>retry_count</span> <span>+=</span> <span>1</span> <span>time</span><span>.</span><span>sleep</span><span>(</span><span>2</span><span>)</span> <span>raise</span> <span>Exception</span><span>(</span><span>"</span><span>Failed to connect to RabbitMQ after multiple attempts</span><span>"</span><span>)</span> <span>def</span> <span>process_message</span><span>(</span><span>channel</span><span>,</span> <span>method</span><span>,</span> <span>properties</span><span>,</span> <span>body</span><span>):</span> <span>try</span><span>:</span> <span>message</span> <span>=</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>body</span><span>)</span> <span>print</span><span>(</span><span>f</span><span>"</span><span>Processing message: </span><span>{</span><span>message</span><span>}</span><span>"</span><span>)</span> <span># Simulate processing work </span> <span>time</span><span>.</span><span>sleep</span><span>(</span><span>1</span><span>)</span> <span># Message successfully processed, send acknowledgment </span> <span>channel</span><span>.</span><span>basic_ack</span><span>(</span><span>delivery_tag</span><span>=</span><span>method</span><span>.</span><span>delivery_tag</span><span>)</span> <span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span> <span>print</span><span>(</span><span>f</span><span>"</span><span>Error processing message: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span> <span># Reject the message and don't requeue </span> <span>channel</span><span>.</span><span>basic_nack</span><span>(</span><span>delivery_tag</span><span>=</span><span>method</span><span>.</span><span>delivery_tag</span><span>,</span> <span>requeue</span><span>=</span><span>False</span><span>)</span> <span>def</span> <span>start_consumer</span><span>():</span> <span>connection</span> <span>=</span> <span>connect_to_rabbitmq</span><span>()</span> <span>channel</span> <span>=</span> <span>connection</span><span>.</span><span>channel</span><span>()</span> <span># Declare queue with durability for persistence </span> <span>channel</span><span>.</span><span>queue_declare</span><span>(</span><span>queue</span><span>=</span><span>'</span><span>task_queue</span><span>'</span><span>,</span> <span>durable</span><span>=</span><span>True</span><span>)</span> <span># Prefetch limits to avoid overwhelming the consumer </span> <span>channel</span><span>.</span><span>basic_qos</span><span>(</span><span>prefetch_count</span><span>=</span><span>10</span><span>)</span> <span># Register consumer </span> <span>channel</span><span>.</span><span>basic_consume</span><span>(</span><span>queue</span><span>=</span><span>'</span><span>task_queue</span><span>'</span><span>,</span> <span>on_message_callback</span><span>=</span><span>process_message</span><span>)</span> <span>print</span><span>(</span><span>"</span><span>Consumer started. Press CTRL+C to exit</span><span>"</span><span>)</span> <span>try</span><span>:</span> <span>channel</span><span>.</span><span>start_consuming</span><span>()</span> <span>except</span> <span>KeyboardInterrupt</span><span>:</span> <span>channel</span><span>.</span><span>stop_consuming</span><span>()</span> <span>connection</span><span>.</span><span>close</span><span>()</span> <span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span> <span>start_consumer</span><span>()</span>import pika import json import time def connect_to_rabbitmq(): # Implement connection with retry logic retry_count = 0 while retry_count < 5: try: connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost', heartbeat=600) ) return connection except pika.exceptions.AMQPConnectionError: retry_count += 1 time.sleep(2) raise Exception("Failed to connect to RabbitMQ after multiple attempts") def process_message(channel, method, properties, body): try: message = json.loads(body) print(f"Processing message: {message}") # Simulate processing work time.sleep(1) # Message successfully processed, send acknowledgment channel.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: print(f"Error processing message: {e}") # Reject the message and don't requeue channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False) def start_consumer(): connection = connect_to_rabbitmq() channel = connection.channel() # Declare queue with durability for persistence channel.queue_declare(queue='task_queue', durable=True) # Prefetch limits to avoid overwhelming the consumer channel.basic_qos(prefetch_count=10) # Register consumer channel.basic_consume(queue='task_queue', on_message_callback=process_message) print("Consumer started. Press CTRL+C to exit") try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() connection.close() if __name__ == "__main__": start_consumer()
Enter fullscreen mode Exit fullscreen mode
The key features in this implementation include connection retries, prefetch limits to control throughput, and proper message acknowledgment. For production systems, I’ve found that implementing a circuit breaker pattern around the consumer helps manage service dependencies effectively.
Apache Kafka and kafka-python: High-throughput Stream Processing
When working with high-volume data streams, Kafka provides excellent throughput and scalability. The kafka-python library offers a straightforward way to interact with Kafka clusters:
<span>from</span> <span>kafka</span> <span>import</span> <span>KafkaConsumer</span><span>,</span> <span>KafkaProducer</span><span>import</span> <span>json</span><span>from</span> <span>concurrent.futures</span> <span>import</span> <span>ThreadPoolExecutor</span><span>class</span> <span>KafkaHandler</span><span>:</span><span>def</span> <span>__init__</span><span>(</span><span>self</span><span>,</span> <span>bootstrap_servers</span><span>=</span><span>[</span><span>'</span><span>localhost:9092</span><span>'</span><span>]):</span><span>self</span><span>.</span><span>bootstrap_servers</span> <span>=</span> <span>bootstrap_servers</span><span>self</span><span>.</span><span>producer</span> <span>=</span> <span>KafkaProducer</span><span>(</span><span>bootstrap_servers</span><span>=</span><span>self</span><span>.</span><span>bootstrap_servers</span><span>,</span><span>value_serializer</span><span>=</span><span>lambda</span> <span>v</span><span>:</span> <span>json</span><span>.</span><span>dumps</span><span>(</span><span>v</span><span>).</span><span>encode</span><span>(</span><span>'</span><span>utf-8</span><span>'</span><span>),</span><span>acks</span><span>=</span><span>'</span><span>all</span><span>'</span><span>)</span><span>def</span> <span>produce_message</span><span>(</span><span>self</span><span>,</span> <span>topic</span><span>,</span> <span>message</span><span>):</span><span>future</span> <span>=</span> <span>self</span><span>.</span><span>producer</span><span>.</span><span>send</span><span>(</span><span>topic</span><span>,</span> <span>message</span><span>)</span><span># Wait for message to be sent </span> <span>result</span> <span>=</span> <span>future</span><span>.</span><span>get</span><span>(</span><span>timeout</span><span>=</span><span>60</span><span>)</span><span>return</span> <span>result</span><span>def</span> <span>consume_messages</span><span>(</span><span>self</span><span>,</span> <span>topic</span><span>,</span> <span>group_id</span><span>,</span> <span>callback</span><span>):</span><span>consumer</span> <span>=</span> <span>KafkaConsumer</span><span>(</span><span>topic</span><span>,</span><span>bootstrap_servers</span><span>=</span><span>self</span><span>.</span><span>bootstrap_servers</span><span>,</span><span>group_id</span><span>=</span><span>group_id</span><span>,</span><span>auto_offset_reset</span><span>=</span><span>'</span><span>earliest</span><span>'</span><span>,</span><span>enable_auto_commit</span><span>=</span><span>False</span><span>,</span><span>value_deserializer</span><span>=</span><span>lambda</span> <span>m</span><span>:</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>m</span><span>.</span><span>decode</span><span>(</span><span>'</span><span>utf-8</span><span>'</span><span>))</span><span>)</span><span>with</span> <span>ThreadPoolExecutor</span><span>(</span><span>max_workers</span><span>=</span><span>10</span><span>)</span> <span>as</span> <span>executor</span><span>:</span><span>for</span> <span>message</span> <span>in</span> <span>consumer</span><span>:</span><span>executor</span><span>.</span><span>submit</span><span>(</span><span>self</span><span>.</span><span>_process_message</span><span>,</span> <span>consumer</span><span>,</span> <span>message</span><span>,</span> <span>callback</span><span>)</span><span>def</span> <span>_process_message</span><span>(</span><span>self</span><span>,</span> <span>consumer</span><span>,</span> <span>message</span><span>,</span> <span>callback</span><span>):</span><span>try</span><span>:</span><span>callback</span><span>(</span><span>message</span><span>.</span><span>value</span><span>)</span><span>consumer</span><span>.</span><span>commit</span><span>({</span><span>message</span><span>.</span><span>topic_partition</span><span>:</span> <span>message</span><span>.</span><span>offset</span> <span>+</span> <span>1</span><span>})</span><span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span><span>print</span><span>(</span><span>f</span><span>"</span><span>Error processing message: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span><span># Implement retry or dead-letter logic here </span><span># Example usage </span><span>def</span> <span>message_processor</span><span>(</span><span>message</span><span>):</span><span>print</span><span>(</span><span>f</span><span>"</span><span>Processing: </span><span>{</span><span>message</span><span>}</span><span>"</span><span>)</span><span># Business logic here </span><span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span><span>kafka_handler</span> <span>=</span> <span>KafkaHandler</span><span>()</span><span>kafka_handler</span><span>.</span><span>consume_messages</span><span>(</span><span>"</span><span>data-stream</span><span>"</span><span>,</span> <span>"</span><span>processing-group</span><span>"</span><span>,</span> <span>message_processor</span><span>)</span><span>from</span> <span>kafka</span> <span>import</span> <span>KafkaConsumer</span><span>,</span> <span>KafkaProducer</span> <span>import</span> <span>json</span> <span>from</span> <span>concurrent.futures</span> <span>import</span> <span>ThreadPoolExecutor</span> <span>class</span> <span>KafkaHandler</span><span>:</span> <span>def</span> <span>__init__</span><span>(</span><span>self</span><span>,</span> <span>bootstrap_servers</span><span>=</span><span>[</span><span>'</span><span>localhost:9092</span><span>'</span><span>]):</span> <span>self</span><span>.</span><span>bootstrap_servers</span> <span>=</span> <span>bootstrap_servers</span> <span>self</span><span>.</span><span>producer</span> <span>=</span> <span>KafkaProducer</span><span>(</span> <span>bootstrap_servers</span><span>=</span><span>self</span><span>.</span><span>bootstrap_servers</span><span>,</span> <span>value_serializer</span><span>=</span><span>lambda</span> <span>v</span><span>:</span> <span>json</span><span>.</span><span>dumps</span><span>(</span><span>v</span><span>).</span><span>encode</span><span>(</span><span>'</span><span>utf-8</span><span>'</span><span>),</span> <span>acks</span><span>=</span><span>'</span><span>all</span><span>'</span> <span>)</span> <span>def</span> <span>produce_message</span><span>(</span><span>self</span><span>,</span> <span>topic</span><span>,</span> <span>message</span><span>):</span> <span>future</span> <span>=</span> <span>self</span><span>.</span><span>producer</span><span>.</span><span>send</span><span>(</span><span>topic</span><span>,</span> <span>message</span><span>)</span> <span># Wait for message to be sent </span> <span>result</span> <span>=</span> <span>future</span><span>.</span><span>get</span><span>(</span><span>timeout</span><span>=</span><span>60</span><span>)</span> <span>return</span> <span>result</span> <span>def</span> <span>consume_messages</span><span>(</span><span>self</span><span>,</span> <span>topic</span><span>,</span> <span>group_id</span><span>,</span> <span>callback</span><span>):</span> <span>consumer</span> <span>=</span> <span>KafkaConsumer</span><span>(</span> <span>topic</span><span>,</span> <span>bootstrap_servers</span><span>=</span><span>self</span><span>.</span><span>bootstrap_servers</span><span>,</span> <span>group_id</span><span>=</span><span>group_id</span><span>,</span> <span>auto_offset_reset</span><span>=</span><span>'</span><span>earliest</span><span>'</span><span>,</span> <span>enable_auto_commit</span><span>=</span><span>False</span><span>,</span> <span>value_deserializer</span><span>=</span><span>lambda</span> <span>m</span><span>:</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>m</span><span>.</span><span>decode</span><span>(</span><span>'</span><span>utf-8</span><span>'</span><span>))</span> <span>)</span> <span>with</span> <span>ThreadPoolExecutor</span><span>(</span><span>max_workers</span><span>=</span><span>10</span><span>)</span> <span>as</span> <span>executor</span><span>:</span> <span>for</span> <span>message</span> <span>in</span> <span>consumer</span><span>:</span> <span>executor</span><span>.</span><span>submit</span><span>(</span><span>self</span><span>.</span><span>_process_message</span><span>,</span> <span>consumer</span><span>,</span> <span>message</span><span>,</span> <span>callback</span><span>)</span> <span>def</span> <span>_process_message</span><span>(</span><span>self</span><span>,</span> <span>consumer</span><span>,</span> <span>message</span><span>,</span> <span>callback</span><span>):</span> <span>try</span><span>:</span> <span>callback</span><span>(</span><span>message</span><span>.</span><span>value</span><span>)</span> <span>consumer</span><span>.</span><span>commit</span><span>({</span> <span>message</span><span>.</span><span>topic_partition</span><span>:</span> <span>message</span><span>.</span><span>offset</span> <span>+</span> <span>1</span> <span>})</span> <span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span> <span>print</span><span>(</span><span>f</span><span>"</span><span>Error processing message: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span> <span># Implement retry or dead-letter logic here </span> <span># Example usage </span><span>def</span> <span>message_processor</span><span>(</span><span>message</span><span>):</span> <span>print</span><span>(</span><span>f</span><span>"</span><span>Processing: </span><span>{</span><span>message</span><span>}</span><span>"</span><span>)</span> <span># Business logic here </span> <span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span> <span>kafka_handler</span> <span>=</span> <span>KafkaHandler</span><span>()</span> <span>kafka_handler</span><span>.</span><span>consume_messages</span><span>(</span><span>"</span><span>data-stream</span><span>"</span><span>,</span> <span>"</span><span>processing-group</span><span>"</span><span>,</span> <span>message_processor</span><span>)</span>from kafka import KafkaConsumer, KafkaProducer import json from concurrent.futures import ThreadPoolExecutor class KafkaHandler: def __init__(self, bootstrap_servers=['localhost:9092']): self.bootstrap_servers = bootstrap_servers self.producer = KafkaProducer( bootstrap_servers=self.bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8'), acks='all' ) def produce_message(self, topic, message): future = self.producer.send(topic, message) # Wait for message to be sent result = future.get(timeout=60) return result def consume_messages(self, topic, group_id, callback): consumer = KafkaConsumer( topic, bootstrap_servers=self.bootstrap_servers, group_id=group_id, auto_offset_reset='earliest', enable_auto_commit=False, value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) with ThreadPoolExecutor(max_workers=10) as executor: for message in consumer: executor.submit(self._process_message, consumer, message, callback) def _process_message(self, consumer, message, callback): try: callback(message.value) consumer.commit({ message.topic_partition: message.offset + 1 }) except Exception as e: print(f"Error processing message: {e}") # Implement retry or dead-letter logic here # Example usage def message_processor(message): print(f"Processing: {message}") # Business logic here if __name__ == "__main__": kafka_handler = KafkaHandler() kafka_handler.consume_messages("data-stream", "processing-group", message_processor)
Enter fullscreen mode Exit fullscreen mode
This implementation includes thread pooling for parallel processing while maintaining offset management for exactly-once processing semantics. In my experience, using thread pools with Kafka consumers significantly improves throughput for I/O-bound processing tasks.
Redis Streams: Lightweight Queue Implementation
Redis Streams provides a lightweight alternative to full-featured message brokers, especially suitable for scenarios where simplicity and performance are priorities:
<span>import</span> <span>redis</span><span>import</span> <span>json</span><span>import</span> <span>time</span><span>import</span> <span>uuid</span><span>class</span> <span>RedisStreamProcessor</span><span>:</span><span>def</span> <span>__init__</span><span>(</span><span>self</span><span>,</span> <span>redis_url</span><span>=</span><span>'</span><span>redis://localhost:6379/0</span><span>'</span><span>):</span><span>self</span><span>.</span><span>redis_client</span> <span>=</span> <span>redis</span><span>.</span><span>from_url</span><span>(</span><span>redis_url</span><span>)</span><span>self</span><span>.</span><span>consumer_name</span> <span>=</span> <span>f</span><span>"</span><span>consumer-</span><span>{</span><span>uuid</span><span>.</span><span>uuid4</span><span>()</span><span>}</span><span>"</span><span>def</span> <span>add_message</span><span>(</span><span>self</span><span>,</span> <span>stream_name</span><span>,</span> <span>message</span><span>):</span><span>message_id</span> <span>=</span> <span>self</span><span>.</span><span>redis_client</span><span>.</span><span>xadd</span><span>(</span><span>stream_name</span><span>,</span><span>{</span><span>b</span><span>'</span><span>data</span><span>'</span><span>:</span> <span>json</span><span>.</span><span>dumps</span><span>(</span><span>message</span><span>).</span><span>encode</span><span>()}</span><span>)</span><span>return</span> <span>message_id</span><span>def</span> <span>create_consumer_group</span><span>(</span><span>self</span><span>,</span> <span>stream_name</span><span>,</span> <span>group_name</span><span>):</span><span>try</span><span>:</span><span>self</span><span>.</span><span>redis_client</span><span>.</span><span>xgroup_create</span><span>(</span><span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>id</span><span>=</span><span>'</span><span>0</span><span>'</span><span>,</span> <span>mkstream</span><span>=</span><span>True</span><span>)</span><span>except</span> <span>redis</span><span>.</span><span>exceptions</span><span>.</span><span>ResponseError</span> <span>as</span> <span>e</span><span>:</span><span># Group already exists </span> <span>if</span> <span>'</span><span>already exists</span><span>'</span> <span>not</span> <span>in</span> <span>str</span><span>(</span><span>e</span><span>):</span><span>raise</span><span>def</span> <span>process_stream</span><span>(</span><span>self</span><span>,</span> <span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>batch_size</span><span>=</span><span>10</span><span>,</span> <span>processor_func</span><span>=</span><span>None</span><span>):</span><span>self</span><span>.</span><span>create_consumer_group</span><span>(</span><span>stream_name</span><span>,</span> <span>group_name</span><span>)</span><span>while</span> <span>True</span><span>:</span><span>try</span><span>:</span><span># Read new messages </span> <span>streams</span> <span>=</span> <span>{</span><span>stream_name</span><span>:</span> <span>'</span><span>></span><span>'</span><span>}</span><span>messages</span> <span>=</span> <span>self</span><span>.</span><span>redis_client</span><span>.</span><span>xreadgroup</span><span>(</span><span>group_name</span><span>,</span> <span>self</span><span>.</span><span>consumer_name</span><span>,</span><span>streams</span><span>,</span> <span>count</span><span>=</span><span>batch_size</span><span>,</span> <span>block</span><span>=</span><span>2000</span><span>)</span><span>if</span> <span>not</span> <span>messages</span><span>:</span><span># Process pending messages that weren't acknowledged </span> <span>pending</span> <span>=</span> <span>self</span><span>.</span><span>redis_client</span><span>.</span><span>xpending_range</span><span>(</span><span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>'</span><span>-</span><span>'</span><span>,</span> <span>'</span><span>+</span><span>'</span><span>,</span> <span>count</span><span>=</span><span>batch_size</span><span>)</span><span>if</span> <span>pending</span><span>:</span><span>message_ids</span> <span>=</span> <span>[</span><span>item</span><span>[</span><span>'</span><span>message_id</span><span>'</span><span>]</span> <span>for</span> <span>item</span> <span>in</span> <span>pending</span><span>]</span><span>claimed</span> <span>=</span> <span>self</span><span>.</span><span>redis_client</span><span>.</span><span>xclaim</span><span>(</span><span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>self</span><span>.</span><span>consumer_name</span><span>,</span><span>min_idle_time</span><span>=</span><span>60000</span><span>,</span> <span>message_ids</span><span>=</span><span>message_ids</span><span>)</span><span>self</span><span>.</span><span>_process_messages</span><span>(</span><span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>claimed</span><span>,</span> <span>processor_func</span><span>)</span><span>time</span><span>.</span><span>sleep</span><span>(</span><span>0.1</span><span>)</span><span>continue</span><span>self</span><span>.</span><span>_process_messages</span><span>(</span><span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>messages</span><span>[</span><span>0</span><span>][</span><span>1</span><span>],</span> <span>processor_func</span><span>)</span><span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span><span>print</span><span>(</span><span>f</span><span>"</span><span>Error in stream processing: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span><span>time</span><span>.</span><span>sleep</span><span>(</span><span>1</span><span>)</span><span>def</span> <span>_process_messages</span><span>(</span><span>self</span><span>,</span> <span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>messages</span><span>,</span> <span>processor_func</span><span>):</span><span>for</span> <span>message_id</span><span>,</span> <span>message_data</span> <span>in</span> <span>messages</span><span>:</span><span>try</span><span>:</span><span>data</span> <span>=</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>message_data</span><span>[</span><span>b</span><span>'</span><span>data</span><span>'</span><span>].</span><span>decode</span><span>())</span><span>if</span> <span>processor_func</span><span>:</span><span>processor_func</span><span>(</span><span>data</span><span>)</span><span># Acknowledge the message </span> <span>self</span><span>.</span><span>redis_client</span><span>.</span><span>xack</span><span>(</span><span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>message_id</span><span>)</span><span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span><span>print</span><span>(</span><span>f</span><span>"</span><span>Error processing message </span><span>{</span><span>message_id</span><span>}</span><span>: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span><span># Message will be reprocessed later </span><span># Example usage </span><span>def</span> <span>process_data</span><span>(</span><span>data</span><span>):</span><span>print</span><span>(</span><span>f</span><span>"</span><span>Processing: </span><span>{</span><span>data</span><span>}</span><span>"</span><span>)</span><span># Business logic here </span><span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span><span>processor</span> <span>=</span> <span>RedisStreamProcessor</span><span>()</span><span>processor</span><span>.</span><span>process_stream</span><span>(</span><span>"</span><span>data-stream</span><span>"</span><span>,</span> <span>"</span><span>processing-group</span><span>"</span><span>,</span> <span>processor_func</span><span>=</span><span>process_data</span><span>)</span><span>import</span> <span>redis</span> <span>import</span> <span>json</span> <span>import</span> <span>time</span> <span>import</span> <span>uuid</span> <span>class</span> <span>RedisStreamProcessor</span><span>:</span> <span>def</span> <span>__init__</span><span>(</span><span>self</span><span>,</span> <span>redis_url</span><span>=</span><span>'</span><span>redis://localhost:6379/0</span><span>'</span><span>):</span> <span>self</span><span>.</span><span>redis_client</span> <span>=</span> <span>redis</span><span>.</span><span>from_url</span><span>(</span><span>redis_url</span><span>)</span> <span>self</span><span>.</span><span>consumer_name</span> <span>=</span> <span>f</span><span>"</span><span>consumer-</span><span>{</span><span>uuid</span><span>.</span><span>uuid4</span><span>()</span><span>}</span><span>"</span> <span>def</span> <span>add_message</span><span>(</span><span>self</span><span>,</span> <span>stream_name</span><span>,</span> <span>message</span><span>):</span> <span>message_id</span> <span>=</span> <span>self</span><span>.</span><span>redis_client</span><span>.</span><span>xadd</span><span>(</span> <span>stream_name</span><span>,</span> <span>{</span><span>b</span><span>'</span><span>data</span><span>'</span><span>:</span> <span>json</span><span>.</span><span>dumps</span><span>(</span><span>message</span><span>).</span><span>encode</span><span>()}</span> <span>)</span> <span>return</span> <span>message_id</span> <span>def</span> <span>create_consumer_group</span><span>(</span><span>self</span><span>,</span> <span>stream_name</span><span>,</span> <span>group_name</span><span>):</span> <span>try</span><span>:</span> <span>self</span><span>.</span><span>redis_client</span><span>.</span><span>xgroup_create</span><span>(</span> <span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>id</span><span>=</span><span>'</span><span>0</span><span>'</span><span>,</span> <span>mkstream</span><span>=</span><span>True</span> <span>)</span> <span>except</span> <span>redis</span><span>.</span><span>exceptions</span><span>.</span><span>ResponseError</span> <span>as</span> <span>e</span><span>:</span> <span># Group already exists </span> <span>if</span> <span>'</span><span>already exists</span><span>'</span> <span>not</span> <span>in</span> <span>str</span><span>(</span><span>e</span><span>):</span> <span>raise</span> <span>def</span> <span>process_stream</span><span>(</span><span>self</span><span>,</span> <span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>batch_size</span><span>=</span><span>10</span><span>,</span> <span>processor_func</span><span>=</span><span>None</span><span>):</span> <span>self</span><span>.</span><span>create_consumer_group</span><span>(</span><span>stream_name</span><span>,</span> <span>group_name</span><span>)</span> <span>while</span> <span>True</span><span>:</span> <span>try</span><span>:</span> <span># Read new messages </span> <span>streams</span> <span>=</span> <span>{</span><span>stream_name</span><span>:</span> <span>'</span><span>></span><span>'</span><span>}</span> <span>messages</span> <span>=</span> <span>self</span><span>.</span><span>redis_client</span><span>.</span><span>xreadgroup</span><span>(</span> <span>group_name</span><span>,</span> <span>self</span><span>.</span><span>consumer_name</span><span>,</span> <span>streams</span><span>,</span> <span>count</span><span>=</span><span>batch_size</span><span>,</span> <span>block</span><span>=</span><span>2000</span> <span>)</span> <span>if</span> <span>not</span> <span>messages</span><span>:</span> <span># Process pending messages that weren't acknowledged </span> <span>pending</span> <span>=</span> <span>self</span><span>.</span><span>redis_client</span><span>.</span><span>xpending_range</span><span>(</span> <span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>'</span><span>-</span><span>'</span><span>,</span> <span>'</span><span>+</span><span>'</span><span>,</span> <span>count</span><span>=</span><span>batch_size</span> <span>)</span> <span>if</span> <span>pending</span><span>:</span> <span>message_ids</span> <span>=</span> <span>[</span><span>item</span><span>[</span><span>'</span><span>message_id</span><span>'</span><span>]</span> <span>for</span> <span>item</span> <span>in</span> <span>pending</span><span>]</span> <span>claimed</span> <span>=</span> <span>self</span><span>.</span><span>redis_client</span><span>.</span><span>xclaim</span><span>(</span> <span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>self</span><span>.</span><span>consumer_name</span><span>,</span> <span>min_idle_time</span><span>=</span><span>60000</span><span>,</span> <span>message_ids</span><span>=</span><span>message_ids</span> <span>)</span> <span>self</span><span>.</span><span>_process_messages</span><span>(</span><span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>claimed</span><span>,</span> <span>processor_func</span><span>)</span> <span>time</span><span>.</span><span>sleep</span><span>(</span><span>0.1</span><span>)</span> <span>continue</span> <span>self</span><span>.</span><span>_process_messages</span><span>(</span><span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>messages</span><span>[</span><span>0</span><span>][</span><span>1</span><span>],</span> <span>processor_func</span><span>)</span> <span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span> <span>print</span><span>(</span><span>f</span><span>"</span><span>Error in stream processing: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span> <span>time</span><span>.</span><span>sleep</span><span>(</span><span>1</span><span>)</span> <span>def</span> <span>_process_messages</span><span>(</span><span>self</span><span>,</span> <span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>messages</span><span>,</span> <span>processor_func</span><span>):</span> <span>for</span> <span>message_id</span><span>,</span> <span>message_data</span> <span>in</span> <span>messages</span><span>:</span> <span>try</span><span>:</span> <span>data</span> <span>=</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>message_data</span><span>[</span><span>b</span><span>'</span><span>data</span><span>'</span><span>].</span><span>decode</span><span>())</span> <span>if</span> <span>processor_func</span><span>:</span> <span>processor_func</span><span>(</span><span>data</span><span>)</span> <span># Acknowledge the message </span> <span>self</span><span>.</span><span>redis_client</span><span>.</span><span>xack</span><span>(</span><span>stream_name</span><span>,</span> <span>group_name</span><span>,</span> <span>message_id</span><span>)</span> <span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span> <span>print</span><span>(</span><span>f</span><span>"</span><span>Error processing message </span><span>{</span><span>message_id</span><span>}</span><span>: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span> <span># Message will be reprocessed later </span> <span># Example usage </span><span>def</span> <span>process_data</span><span>(</span><span>data</span><span>):</span> <span>print</span><span>(</span><span>f</span><span>"</span><span>Processing: </span><span>{</span><span>data</span><span>}</span><span>"</span><span>)</span> <span># Business logic here </span> <span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span> <span>processor</span> <span>=</span> <span>RedisStreamProcessor</span><span>()</span> <span>processor</span><span>.</span><span>process_stream</span><span>(</span><span>"</span><span>data-stream</span><span>"</span><span>,</span> <span>"</span><span>processing-group</span><span>"</span><span>,</span> <span>processor_func</span><span>=</span><span>process_data</span><span>)</span>import redis import json import time import uuid class RedisStreamProcessor: def __init__(self, redis_url='redis://localhost:6379/0'): self.redis_client = redis.from_url(redis_url) self.consumer_name = f"consumer-{uuid.uuid4()}" def add_message(self, stream_name, message): message_id = self.redis_client.xadd( stream_name, {b'data': json.dumps(message).encode()} ) return message_id def create_consumer_group(self, stream_name, group_name): try: self.redis_client.xgroup_create( stream_name, group_name, id='0', mkstream=True ) except redis.exceptions.ResponseError as e: # Group already exists if 'already exists' not in str(e): raise def process_stream(self, stream_name, group_name, batch_size=10, processor_func=None): self.create_consumer_group(stream_name, group_name) while True: try: # Read new messages streams = {stream_name: '>'} messages = self.redis_client.xreadgroup( group_name, self.consumer_name, streams, count=batch_size, block=2000 ) if not messages: # Process pending messages that weren't acknowledged pending = self.redis_client.xpending_range( stream_name, group_name, '-', '+', count=batch_size ) if pending: message_ids = [item['message_id'] for item in pending] claimed = self.redis_client.xclaim( stream_name, group_name, self.consumer_name, min_idle_time=60000, message_ids=message_ids ) self._process_messages(stream_name, group_name, claimed, processor_func) time.sleep(0.1) continue self._process_messages(stream_name, group_name, messages[0][1], processor_func) except Exception as e: print(f"Error in stream processing: {e}") time.sleep(1) def _process_messages(self, stream_name, group_name, messages, processor_func): for message_id, message_data in messages: try: data = json.loads(message_data[b'data'].decode()) if processor_func: processor_func(data) # Acknowledge the message self.redis_client.xack(stream_name, group_name, message_id) except Exception as e: print(f"Error processing message {message_id}: {e}") # Message will be reprocessed later # Example usage def process_data(data): print(f"Processing: {data}") # Business logic here if __name__ == "__main__": processor = RedisStreamProcessor() processor.process_stream("data-stream", "processing-group", processor_func=process_data)
Enter fullscreen mode Exit fullscreen mode
This implementation leverages Redis Streams’ consumer groups for distributed processing with automatic handling of pending messages. Redis Streams excels in scenarios requiring high throughput with minimal latency, especially when Redis is already part of the architecture.
Celery: Distributed Task Processing
Celery provides a complete solution for distributed task processing, with built-in support for various message brokers:
<span># tasks.py </span><span>import</span> <span>time</span><span>from</span> <span>celery</span> <span>import</span> <span>Celery</span><span>,</span> <span>Task</span><span>from</span> <span>celery.signals</span> <span>import</span> <span>task_failure</span><span>import</span> <span>logging</span><span># Configure logging </span><span>logging</span><span>.</span><span>basicConfig</span><span>(</span><span>level</span><span>=</span><span>logging</span><span>.</span><span>INFO</span><span>)</span><span>logger</span> <span>=</span> <span>logging</span><span>.</span><span>getLogger</span><span>(</span><span>__name__</span><span>)</span><span># Initialize Celery with RabbitMQ </span><span>app</span> <span>=</span> <span>Celery</span><span>(</span><span>'</span><span>tasks</span><span>'</span><span>,</span><span>broker</span><span>=</span><span>'</span><span>pyamqp://guest:guest@localhost//</span><span>'</span><span>,</span><span>backend</span><span>=</span><span>'</span><span>redis://localhost</span><span>'</span><span>)</span><span># Configure Celery </span><span>app</span><span>.</span><span>conf</span><span>.</span><span>update</span><span>(</span><span>task_acks_late</span><span>=</span><span>True</span><span>,</span> <span># Acknowledge after task completes </span> <span>task_reject_on_worker_lost</span><span>=</span><span>True</span><span>,</span> <span># Requeue tasks if worker dies </span> <span>worker_prefetch_multiplier</span><span>=</span><span>1</span><span>,</span> <span># Process one task at a time </span> <span>task_serializer</span><span>=</span><span>'</span><span>json</span><span>'</span><span>,</span><span>accept_content</span><span>=</span><span>[</span><span>'</span><span>json</span><span>'</span><span>],</span><span>result_serializer</span><span>=</span><span>'</span><span>json</span><span>'</span><span>,</span><span>timezone</span><span>=</span><span>'</span><span>UTC</span><span>'</span><span>,</span><span>enable_utc</span><span>=</span><span>True</span><span>,</span><span>)</span><span># Custom task base class with retry logic </span><span>class</span> <span>RetryableTask</span><span>(</span><span>Task</span><span>):</span><span>autoretry_for</span> <span>=</span> <span>(</span><span>Exception</span><span>,)</span><span>retry_kwargs</span> <span>=</span> <span>{</span><span>'</span><span>max_retries</span><span>'</span><span>:</span> <span>3</span><span>,</span> <span>'</span><span>countdown</span><span>'</span><span>:</span> <span>5</span><span>}</span><span>def</span> <span>on_failure</span><span>(</span><span>self</span><span>,</span> <span>exc</span><span>,</span> <span>task_id</span><span>,</span> <span>args</span><span>,</span> <span>kwargs</span><span>,</span> <span>einfo</span><span>):</span><span>logger</span><span>.</span><span>error</span><span>(</span><span>f</span><span>"</span><span>Task </span><span>{</span><span>task_id</span><span>}</span><span> failed: </span><span>{</span><span>exc</span><span>}</span><span>"</span><span>)</span><span>super</span><span>().</span><span>on_failure</span><span>(</span><span>exc</span><span>,</span> <span>task_id</span><span>,</span> <span>args</span><span>,</span> <span>kwargs</span><span>,</span> <span>einfo</span><span>)</span><span>@task_failure.connect</span><span>def</span> <span>handle_task_failure</span><span>(</span><span>sender</span><span>=</span><span>None</span><span>,</span> <span>task_id</span><span>=</span><span>None</span><span>,</span> <span>exception</span><span>=</span><span>None</span><span>,</span> <span>**</span><span>kwargs</span><span>):</span><span>logger</span><span>.</span><span>error</span><span>(</span><span>f</span><span>"</span><span>Task </span><span>{</span><span>task_id</span><span>}</span><span> failed with exception: </span><span>{</span><span>exception</span><span>}</span><span>"</span><span>)</span><span># Could implement notification or dead-letter queue here </span><span>@app.task</span><span>(</span><span>base</span><span>=</span><span>RetryableTask</span><span>)</span><span>def</span> <span>process_order</span><span>(</span><span>order_data</span><span>):</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Processing order: </span><span>{</span><span>order_data</span><span>}</span><span>"</span><span>)</span><span># Simulate processing work </span> <span>time</span><span>.</span><span>sleep</span><span>(</span><span>2</span><span>)</span><span># Simulate occasional failures </span> <span>if</span> <span>order_data</span><span>.</span><span>get</span><span>(</span><span>'</span><span>id</span><span>'</span><span>,</span> <span>0</span><span>)</span> <span>%</span> <span>5</span> <span>==</span> <span>0</span><span>:</span><span>raise</span> <span>ValueError</span><span>(</span><span>"</span><span>Simulated processing error</span><span>"</span><span>)</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Order </span><span>{</span><span>order_data</span><span>.</span><span>get</span><span>(</span><span>'</span><span>id</span><span>'</span><span>)</span><span>}</span><span> processed successfully</span><span>"</span><span>)</span><span>return</span> <span>{</span><span>"</span><span>status</span><span>"</span><span>:</span> <span>"</span><span>processed</span><span>"</span><span>,</span> <span>"</span><span>order_id</span><span>"</span><span>:</span> <span>order_data</span><span>.</span><span>get</span><span>(</span><span>'</span><span>id</span><span>'</span><span>)}</span><span>@app.task</span><span>(</span><span>base</span><span>=</span><span>RetryableTask</span><span>)</span><span>def</span> <span>send_notification</span><span>(</span><span>user_id</span><span>,</span> <span>message</span><span>):</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Sending notification to user </span><span>{</span><span>user_id</span><span>}</span><span>: </span><span>{</span><span>message</span><span>}</span><span>"</span><span>)</span><span># Notification logic here </span> <span>return</span> <span>{</span><span>"</span><span>status</span><span>"</span><span>:</span> <span>"</span><span>sent</span><span>"</span><span>,</span> <span>"</span><span>user_id</span><span>"</span><span>:</span> <span>user_id</span><span>}</span><span># tasks.py </span><span>import</span> <span>time</span> <span>from</span> <span>celery</span> <span>import</span> <span>Celery</span><span>,</span> <span>Task</span> <span>from</span> <span>celery.signals</span> <span>import</span> <span>task_failure</span> <span>import</span> <span>logging</span> <span># Configure logging </span><span>logging</span><span>.</span><span>basicConfig</span><span>(</span><span>level</span><span>=</span><span>logging</span><span>.</span><span>INFO</span><span>)</span> <span>logger</span> <span>=</span> <span>logging</span><span>.</span><span>getLogger</span><span>(</span><span>__name__</span><span>)</span> <span># Initialize Celery with RabbitMQ </span><span>app</span> <span>=</span> <span>Celery</span><span>(</span><span>'</span><span>tasks</span><span>'</span><span>,</span> <span>broker</span><span>=</span><span>'</span><span>pyamqp://guest:guest@localhost//</span><span>'</span><span>,</span> <span>backend</span><span>=</span><span>'</span><span>redis://localhost</span><span>'</span><span>)</span> <span># Configure Celery </span><span>app</span><span>.</span><span>conf</span><span>.</span><span>update</span><span>(</span> <span>task_acks_late</span><span>=</span><span>True</span><span>,</span> <span># Acknowledge after task completes </span> <span>task_reject_on_worker_lost</span><span>=</span><span>True</span><span>,</span> <span># Requeue tasks if worker dies </span> <span>worker_prefetch_multiplier</span><span>=</span><span>1</span><span>,</span> <span># Process one task at a time </span> <span>task_serializer</span><span>=</span><span>'</span><span>json</span><span>'</span><span>,</span> <span>accept_content</span><span>=</span><span>[</span><span>'</span><span>json</span><span>'</span><span>],</span> <span>result_serializer</span><span>=</span><span>'</span><span>json</span><span>'</span><span>,</span> <span>timezone</span><span>=</span><span>'</span><span>UTC</span><span>'</span><span>,</span> <span>enable_utc</span><span>=</span><span>True</span><span>,</span> <span>)</span> <span># Custom task base class with retry logic </span><span>class</span> <span>RetryableTask</span><span>(</span><span>Task</span><span>):</span> <span>autoretry_for</span> <span>=</span> <span>(</span><span>Exception</span><span>,)</span> <span>retry_kwargs</span> <span>=</span> <span>{</span><span>'</span><span>max_retries</span><span>'</span><span>:</span> <span>3</span><span>,</span> <span>'</span><span>countdown</span><span>'</span><span>:</span> <span>5</span><span>}</span> <span>def</span> <span>on_failure</span><span>(</span><span>self</span><span>,</span> <span>exc</span><span>,</span> <span>task_id</span><span>,</span> <span>args</span><span>,</span> <span>kwargs</span><span>,</span> <span>einfo</span><span>):</span> <span>logger</span><span>.</span><span>error</span><span>(</span><span>f</span><span>"</span><span>Task </span><span>{</span><span>task_id</span><span>}</span><span> failed: </span><span>{</span><span>exc</span><span>}</span><span>"</span><span>)</span> <span>super</span><span>().</span><span>on_failure</span><span>(</span><span>exc</span><span>,</span> <span>task_id</span><span>,</span> <span>args</span><span>,</span> <span>kwargs</span><span>,</span> <span>einfo</span><span>)</span> <span>@task_failure.connect</span> <span>def</span> <span>handle_task_failure</span><span>(</span><span>sender</span><span>=</span><span>None</span><span>,</span> <span>task_id</span><span>=</span><span>None</span><span>,</span> <span>exception</span><span>=</span><span>None</span><span>,</span> <span>**</span><span>kwargs</span><span>):</span> <span>logger</span><span>.</span><span>error</span><span>(</span><span>f</span><span>"</span><span>Task </span><span>{</span><span>task_id</span><span>}</span><span> failed with exception: </span><span>{</span><span>exception</span><span>}</span><span>"</span><span>)</span> <span># Could implement notification or dead-letter queue here </span> <span>@app.task</span><span>(</span><span>base</span><span>=</span><span>RetryableTask</span><span>)</span> <span>def</span> <span>process_order</span><span>(</span><span>order_data</span><span>):</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Processing order: </span><span>{</span><span>order_data</span><span>}</span><span>"</span><span>)</span> <span># Simulate processing work </span> <span>time</span><span>.</span><span>sleep</span><span>(</span><span>2</span><span>)</span> <span># Simulate occasional failures </span> <span>if</span> <span>order_data</span><span>.</span><span>get</span><span>(</span><span>'</span><span>id</span><span>'</span><span>,</span> <span>0</span><span>)</span> <span>%</span> <span>5</span> <span>==</span> <span>0</span><span>:</span> <span>raise</span> <span>ValueError</span><span>(</span><span>"</span><span>Simulated processing error</span><span>"</span><span>)</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Order </span><span>{</span><span>order_data</span><span>.</span><span>get</span><span>(</span><span>'</span><span>id</span><span>'</span><span>)</span><span>}</span><span> processed successfully</span><span>"</span><span>)</span> <span>return</span> <span>{</span><span>"</span><span>status</span><span>"</span><span>:</span> <span>"</span><span>processed</span><span>"</span><span>,</span> <span>"</span><span>order_id</span><span>"</span><span>:</span> <span>order_data</span><span>.</span><span>get</span><span>(</span><span>'</span><span>id</span><span>'</span><span>)}</span> <span>@app.task</span><span>(</span><span>base</span><span>=</span><span>RetryableTask</span><span>)</span> <span>def</span> <span>send_notification</span><span>(</span><span>user_id</span><span>,</span> <span>message</span><span>):</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Sending notification to user </span><span>{</span><span>user_id</span><span>}</span><span>: </span><span>{</span><span>message</span><span>}</span><span>"</span><span>)</span> <span># Notification logic here </span> <span>return</span> <span>{</span><span>"</span><span>status</span><span>"</span><span>:</span> <span>"</span><span>sent</span><span>"</span><span>,</span> <span>"</span><span>user_id</span><span>"</span><span>:</span> <span>user_id</span><span>}</span># tasks.py import time from celery import Celery, Task from celery.signals import task_failure import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize Celery with RabbitMQ app = Celery('tasks', broker='pyamqp://guest:guest@localhost//', backend='redis://localhost') # Configure Celery app.conf.update( task_acks_late=True, # Acknowledge after task completes task_reject_on_worker_lost=True, # Requeue tasks if worker dies worker_prefetch_multiplier=1, # Process one task at a time task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, ) # Custom task base class with retry logic class RetryableTask(Task): autoretry_for = (Exception,) retry_kwargs = {'max_retries': 3, 'countdown': 5} def on_failure(self, exc, task_id, args, kwargs, einfo): logger.error(f"Task {task_id} failed: {exc}") super().on_failure(exc, task_id, args, kwargs, einfo) @task_failure.connect def handle_task_failure(sender=None, task_id=None, exception=None, **kwargs): logger.error(f"Task {task_id} failed with exception: {exception}") # Could implement notification or dead-letter queue here @app.task(base=RetryableTask) def process_order(order_data): logger.info(f"Processing order: {order_data}") # Simulate processing work time.sleep(2) # Simulate occasional failures if order_data.get('id', 0) % 5 == 0: raise ValueError("Simulated processing error") logger.info(f"Order {order_data.get('id')} processed successfully") return {"status": "processed", "order_id": order_data.get('id')} @app.task(base=RetryableTask) def send_notification(user_id, message): logger.info(f"Sending notification to user {user_id}: {message}") # Notification logic here return {"status": "sent", "user_id": user_id}
Enter fullscreen mode Exit fullscreen mode
To run a worker and send tasks:
<span># worker.py </span><span>from</span> <span>tasks</span> <span>import</span> <span>app</span><span>if</span> <span>__name__</span> <span>==</span> <span>'</span><span>__main__</span><span>'</span><span>:</span><span>app</span><span>.</span><span>worker_main</span><span>([</span><span>'</span><span>worker</span><span>'</span><span>,</span> <span>'</span><span>--loglevel=info</span><span>'</span><span>,</span> <span>'</span><span>-c</span><span>'</span><span>,</span> <span>'</span><span>4</span><span>'</span><span>])</span><span># client.py </span><span>from</span> <span>tasks</span> <span>import</span> <span>process_order</span><span>,</span> <span>send_notification</span><span>if</span> <span>__name__</span> <span>==</span> <span>'</span><span>__main__</span><span>'</span><span>:</span><span># Chain tasks together </span> <span>for</span> <span>i</span> <span>in</span> <span>range</span><span>(</span><span>10</span><span>):</span><span>order_data</span> <span>=</span> <span>{</span><span>"</span><span>id</span><span>"</span><span>:</span> <span>i</span><span>,</span> <span>"</span><span>product</span><span>"</span><span>:</span> <span>f</span><span>"</span><span>Product-</span><span>{</span><span>i</span><span>}</span><span>"</span><span>,</span> <span>"</span><span>quantity</span><span>"</span><span>:</span> <span>i</span><span>+</span><span>1</span><span>}</span><span>result</span> <span>=</span> <span>process_order</span><span>.</span><span>apply_async</span><span>(</span><span>args</span><span>=</span><span>[</span><span>order_data</span><span>],</span><span>link</span><span>=</span><span>send_notification</span><span>.</span><span>s</span><span>(</span><span>42</span><span>,</span> <span>f</span><span>"</span><span>Order </span><span>{</span><span>i</span><span>}</span><span> processed</span><span>"</span><span>)</span><span>)</span><span>print</span><span>(</span><span>f</span><span>"</span><span>Task scheduled: </span><span>{</span><span>result</span><span>.</span><span>id</span><span>}</span><span>"</span><span>)</span><span># worker.py </span><span>from</span> <span>tasks</span> <span>import</span> <span>app</span> <span>if</span> <span>__name__</span> <span>==</span> <span>'</span><span>__main__</span><span>'</span><span>:</span> <span>app</span><span>.</span><span>worker_main</span><span>([</span><span>'</span><span>worker</span><span>'</span><span>,</span> <span>'</span><span>--loglevel=info</span><span>'</span><span>,</span> <span>'</span><span>-c</span><span>'</span><span>,</span> <span>'</span><span>4</span><span>'</span><span>])</span> <span># client.py </span><span>from</span> <span>tasks</span> <span>import</span> <span>process_order</span><span>,</span> <span>send_notification</span> <span>if</span> <span>__name__</span> <span>==</span> <span>'</span><span>__main__</span><span>'</span><span>:</span> <span># Chain tasks together </span> <span>for</span> <span>i</span> <span>in</span> <span>range</span><span>(</span><span>10</span><span>):</span> <span>order_data</span> <span>=</span> <span>{</span><span>"</span><span>id</span><span>"</span><span>:</span> <span>i</span><span>,</span> <span>"</span><span>product</span><span>"</span><span>:</span> <span>f</span><span>"</span><span>Product-</span><span>{</span><span>i</span><span>}</span><span>"</span><span>,</span> <span>"</span><span>quantity</span><span>"</span><span>:</span> <span>i</span><span>+</span><span>1</span><span>}</span> <span>result</span> <span>=</span> <span>process_order</span><span>.</span><span>apply_async</span><span>(</span> <span>args</span><span>=</span><span>[</span><span>order_data</span><span>],</span> <span>link</span><span>=</span><span>send_notification</span><span>.</span><span>s</span><span>(</span><span>42</span><span>,</span> <span>f</span><span>"</span><span>Order </span><span>{</span><span>i</span><span>}</span><span> processed</span><span>"</span><span>)</span> <span>)</span> <span>print</span><span>(</span><span>f</span><span>"</span><span>Task scheduled: </span><span>{</span><span>result</span><span>.</span><span>id</span><span>}</span><span>"</span><span>)</span># worker.py from tasks import app if __name__ == '__main__': app.worker_main(['worker', '--loglevel=info', '-c', '4']) # client.py from tasks import process_order, send_notification if __name__ == '__main__': # Chain tasks together for i in range(10): order_data = {"id": i, "product": f"Product-{i}", "quantity": i+1} result = process_order.apply_async( args=[order_data], link=send_notification.s(42, f"Order {i} processed") ) print(f"Task scheduled: {result.id}")
Enter fullscreen mode Exit fullscreen mode
Celery’s strength lies in its comprehensive feature set, including task chaining, scheduling, and monitoring. I’ve found it particularly useful for background processing in web applications, especially when tasks have complex dependencies.
Asyncio-based Queue Processing: High Performance
For high-performance, single-process message handling, asyncio provides excellent throughput:
<span>import</span> <span>asyncio</span><span>import</span> <span>json</span><span>import</span> <span>aiohttp</span><span>import</span> <span>signal</span><span>import</span> <span>functools</span><span>import</span> <span>logging</span><span>from</span> <span>datetime</span> <span>import</span> <span>datetime</span><span># Configure logging </span><span>logging</span><span>.</span><span>basicConfig</span><span>(</span><span>level</span><span>=</span><span>logging</span><span>.</span><span>INFO</span><span>,</span><span>format</span><span>=</span><span>'</span><span>%(asctime)s - %(name)s - %(levelname)s - %(message)s</span><span>'</span><span>)</span><span>logger</span> <span>=</span> <span>logging</span><span>.</span><span>getLogger</span><span>(</span><span>__name__</span><span>)</span><span>class</span> <span>AsyncMessageProcessor</span><span>:</span><span>def</span> <span>__init__</span><span>(</span><span>self</span><span>,</span> <span>max_queue_size</span><span>=</span><span>1000</span><span>,</span> <span>worker_count</span><span>=</span><span>10</span><span>):</span><span>self</span><span>.</span><span>queue</span> <span>=</span> <span>asyncio</span><span>.</span><span>Queue</span><span>(</span><span>maxsize</span><span>=</span><span>max_queue_size</span><span>)</span><span>self</span><span>.</span><span>worker_count</span> <span>=</span> <span>worker_count</span><span>self</span><span>.</span><span>workers</span> <span>=</span> <span>[]</span><span>self</span><span>.</span><span>running</span> <span>=</span> <span>False</span><span>self</span><span>.</span><span>processed_count</span> <span>=</span> <span>0</span><span>self</span><span>.</span><span>start_time</span> <span>=</span> <span>None</span><span>async</span> <span>def</span> <span>enqueue_message</span><span>(</span><span>self</span><span>,</span> <span>message</span><span>):</span><span>await</span> <span>self</span><span>.</span><span>queue</span><span>.</span><span>put</span><span>(</span><span>message</span><span>)</span><span>async</span> <span>def</span> <span>_process_message</span><span>(</span><span>self</span><span>,</span> <span>message</span><span>):</span><span>try</span><span>:</span><span># Example: Process message and send to an API </span> <span>async</span> <span>with</span> <span>aiohttp</span><span>.</span><span>ClientSession</span><span>()</span> <span>as</span> <span>session</span><span>:</span><span>async</span> <span>with</span> <span>session</span><span>.</span><span>post</span><span>(</span><span>'</span><span>https://example.com/api/process</span><span>'</span><span>,</span><span>json</span><span>=</span><span>message</span><span>,</span><span>timeout</span><span>=</span><span>5</span><span>)</span> <span>as</span> <span>response</span><span>:</span><span>if</span> <span>response</span><span>.</span><span>status</span> <span>>=</span> <span>400</span><span>:</span><span>text</span> <span>=</span> <span>await</span> <span>response</span><span>.</span><span>text</span><span>()</span><span>logger</span><span>.</span><span>error</span><span>(</span><span>f</span><span>"</span><span>API error: </span><span>{</span><span>response</span><span>.</span><span>status</span><span>}</span><span> - </span><span>{</span><span>text</span><span>}</span><span>"</span><span>)</span><span>return</span> <span>False</span><span>return</span> <span>True</span><span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span><span>logger</span><span>.</span><span>exception</span><span>(</span><span>f</span><span>"</span><span>Error processing message: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span><span>return</span> <span>False</span><span>async</span> <span>def</span> <span>worker</span><span>(</span><span>self</span><span>,</span> <span>worker_id</span><span>):</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Worker </span><span>{</span><span>worker_id</span><span>}</span><span> started</span><span>"</span><span>)</span><span>while</span> <span>self</span><span>.</span><span>running</span><span>:</span><span>try</span><span>:</span><span>message</span> <span>=</span> <span>await</span> <span>self</span><span>.</span><span>queue</span><span>.</span><span>get</span><span>()</span><span>success</span> <span>=</span> <span>await</span> <span>self</span><span>.</span><span>_process_message</span><span>(</span><span>message</span><span>)</span><span>if</span> <span>not</span> <span>success</span><span>:</span><span># Implement retry or dead-letter logic </span> <span>logger</span><span>.</span><span>warning</span><span>(</span><span>f</span><span>"</span><span>Message processing failed, retrying later</span><span>"</span><span>)</span><span># Could use a separate queue for retries with delay </span><span>self</span><span>.</span><span>queue</span><span>.</span><span>task_done</span><span>()</span><span>self</span><span>.</span><span>processed_count</span> <span>+=</span> <span>1</span><span># Log stats periodically </span> <span>if</span> <span>self</span><span>.</span><span>processed_count</span> <span>%</span> <span>100</span> <span>==</span> <span>0</span><span>:</span><span>self</span><span>.</span><span>_log_stats</span><span>()</span><span>except</span> <span>asyncio</span><span>.</span><span>CancelledError</span><span>:</span><span>break</span><span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span><span>logger</span><span>.</span><span>exception</span><span>(</span><span>f</span><span>"</span><span>Worker </span><span>{</span><span>worker_id</span><span>}</span><span> encountered an error: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Worker </span><span>{</span><span>worker_id</span><span>}</span><span> stopped</span><span>"</span><span>)</span><span>def</span> <span>_log_stats</span><span>(</span><span>self</span><span>):</span><span>now</span> <span>=</span> <span>datetime</span><span>.</span><span>now</span><span>()</span><span>elapsed</span> <span>=</span> <span>(</span><span>now</span> <span>-</span> <span>self</span><span>.</span><span>start_time</span><span>).</span><span>total_seconds</span><span>()</span><span>rate</span> <span>=</span> <span>self</span><span>.</span><span>processed_count</span> <span>/</span> <span>elapsed</span> <span>if</span> <span>elapsed</span> <span>></span> <span>0</span> <span>else</span> <span>0</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Processed </span><span>{</span><span>self</span><span>.</span><span>processed_count</span><span>}</span><span> messages at </span><span>{</span><span>rate</span><span>:</span><span>.</span><span>2</span><span>f</span><span>}</span><span> msg/sec</span><span>"</span><span>)</span><span>async</span> <span>def</span> <span>start</span><span>(</span><span>self</span><span>):</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Starting message processor</span><span>"</span><span>)</span><span>self</span><span>.</span><span>running</span> <span>=</span> <span>True</span><span>self</span><span>.</span><span>start_time</span> <span>=</span> <span>datetime</span><span>.</span><span>now</span><span>()</span><span>self</span><span>.</span><span>workers</span> <span>=</span> <span>[</span><span>asyncio</span><span>.</span><span>create_task</span><span>(</span><span>self</span><span>.</span><span>worker</span><span>(</span><span>i</span><span>))</span><span>for</span> <span>i</span> <span>in</span> <span>range</span><span>(</span><span>self</span><span>.</span><span>worker_count</span><span>)</span><span>]</span><span>async</span> <span>def</span> <span>stop</span><span>(</span><span>self</span><span>):</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Stopping message processor</span><span>"</span><span>)</span><span>self</span><span>.</span><span>running</span> <span>=</span> <span>False</span><span># Cancel all workers </span> <span>for</span> <span>worker</span> <span>in</span> <span>self</span><span>.</span><span>workers</span><span>:</span><span>worker</span><span>.</span><span>cancel</span><span>()</span><span># Wait for workers to finish </span> <span>await</span> <span>asyncio</span><span>.</span><span>gather</span><span>(</span><span>*</span><span>self</span><span>.</span><span>workers</span><span>,</span> <span>return_exceptions</span><span>=</span><span>True</span><span>)</span><span># Wait for queue to be empty </span> <span>if</span> <span>not</span> <span>self</span><span>.</span><span>queue</span><span>.</span><span>empty</span><span>():</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Waiting for queue to drain (</span><span>{</span><span>self</span><span>.</span><span>queue</span><span>.</span><span>qsize</span><span>()</span><span>}</span><span> items remaining)</span><span>"</span><span>)</span><span>await</span> <span>self</span><span>.</span><span>queue</span><span>.</span><span>join</span><span>()</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Message processor stopped. Processed </span><span>{</span><span>self</span><span>.</span><span>processed_count</span><span>}</span><span> messages total</span><span>"</span><span>)</span><span>async</span> <span>def</span> <span>main</span><span>():</span><span># Create processor </span> <span>processor</span> <span>=</span> <span>AsyncMessageProcessor</span><span>(</span><span>worker_count</span><span>=</span><span>20</span><span>)</span><span># Setup signal handlers </span> <span>loop</span> <span>=</span> <span>asyncio</span><span>.</span><span>get_running_loop</span><span>()</span><span>for</span> <span>signame</span> <span>in</span> <span>(</span><span>'</span><span>SIGINT</span><span>'</span><span>,</span> <span>'</span><span>SIGTERM</span><span>'</span><span>):</span><span>loop</span><span>.</span><span>add_signal_handler</span><span>(</span><span>getattr</span><span>(</span><span>signal</span><span>,</span> <span>signame</span><span>),</span><span>lambda</span><span>:</span> <span>asyncio</span><span>.</span><span>create_task</span><span>(</span><span>processor</span><span>.</span><span>stop</span><span>())</span><span>)</span><span># Start processor </span> <span>await</span> <span>processor</span><span>.</span><span>start</span><span>()</span><span># Simulate message production </span> <span>try</span><span>:</span><span>for</span> <span>i</span> <span>in</span> <span>range</span><span>(</span><span>1000</span><span>):</span><span>message</span> <span>=</span> <span>{</span><span>"</span><span>id</span><span>"</span><span>:</span> <span>i</span><span>,</span> <span>"</span><span>timestamp</span><span>"</span><span>:</span> <span>datetime</span><span>.</span><span>now</span><span>().</span><span>isoformat</span><span>(),</span> <span>"</span><span>data</span><span>"</span><span>:</span> <span>f</span><span>"</span><span>Message </span><span>{</span><span>i</span><span>}</span><span>"</span><span>}</span><span>await</span> <span>processor</span><span>.</span><span>enqueue_message</span><span>(</span><span>message</span><span>)</span><span>if</span> <span>i</span> <span>%</span> <span>100</span> <span>==</span> <span>0</span><span>:</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Enqueued </span><span>{</span><span>i</span><span>}</span><span> messages</span><span>"</span><span>)</span><span>await</span> <span>asyncio</span><span>.</span><span>sleep</span><span>(</span><span>0.01</span><span>)</span> <span># Simulate message arrival rate </span> <span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span><span>logger</span><span>.</span><span>exception</span><span>(</span><span>f</span><span>"</span><span>Error producing messages: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span><span>finally</span><span>:</span><span># Wait for all messages to be processed </span> <span>await</span> <span>processor</span><span>.</span><span>stop</span><span>()</span><span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span><span>asyncio</span><span>.</span><span>run</span><span>(</span><span>main</span><span>())</span><span>import</span> <span>asyncio</span> <span>import</span> <span>json</span> <span>import</span> <span>aiohttp</span> <span>import</span> <span>signal</span> <span>import</span> <span>functools</span> <span>import</span> <span>logging</span> <span>from</span> <span>datetime</span> <span>import</span> <span>datetime</span> <span># Configure logging </span><span>logging</span><span>.</span><span>basicConfig</span><span>(</span> <span>level</span><span>=</span><span>logging</span><span>.</span><span>INFO</span><span>,</span> <span>format</span><span>=</span><span>'</span><span>%(asctime)s - %(name)s - %(levelname)s - %(message)s</span><span>'</span> <span>)</span> <span>logger</span> <span>=</span> <span>logging</span><span>.</span><span>getLogger</span><span>(</span><span>__name__</span><span>)</span> <span>class</span> <span>AsyncMessageProcessor</span><span>:</span> <span>def</span> <span>__init__</span><span>(</span><span>self</span><span>,</span> <span>max_queue_size</span><span>=</span><span>1000</span><span>,</span> <span>worker_count</span><span>=</span><span>10</span><span>):</span> <span>self</span><span>.</span><span>queue</span> <span>=</span> <span>asyncio</span><span>.</span><span>Queue</span><span>(</span><span>maxsize</span><span>=</span><span>max_queue_size</span><span>)</span> <span>self</span><span>.</span><span>worker_count</span> <span>=</span> <span>worker_count</span> <span>self</span><span>.</span><span>workers</span> <span>=</span> <span>[]</span> <span>self</span><span>.</span><span>running</span> <span>=</span> <span>False</span> <span>self</span><span>.</span><span>processed_count</span> <span>=</span> <span>0</span> <span>self</span><span>.</span><span>start_time</span> <span>=</span> <span>None</span> <span>async</span> <span>def</span> <span>enqueue_message</span><span>(</span><span>self</span><span>,</span> <span>message</span><span>):</span> <span>await</span> <span>self</span><span>.</span><span>queue</span><span>.</span><span>put</span><span>(</span><span>message</span><span>)</span> <span>async</span> <span>def</span> <span>_process_message</span><span>(</span><span>self</span><span>,</span> <span>message</span><span>):</span> <span>try</span><span>:</span> <span># Example: Process message and send to an API </span> <span>async</span> <span>with</span> <span>aiohttp</span><span>.</span><span>ClientSession</span><span>()</span> <span>as</span> <span>session</span><span>:</span> <span>async</span> <span>with</span> <span>session</span><span>.</span><span>post</span><span>(</span> <span>'</span><span>https://example.com/api/process</span><span>'</span><span>,</span> <span>json</span><span>=</span><span>message</span><span>,</span> <span>timeout</span><span>=</span><span>5</span> <span>)</span> <span>as</span> <span>response</span><span>:</span> <span>if</span> <span>response</span><span>.</span><span>status</span> <span>>=</span> <span>400</span><span>:</span> <span>text</span> <span>=</span> <span>await</span> <span>response</span><span>.</span><span>text</span><span>()</span> <span>logger</span><span>.</span><span>error</span><span>(</span><span>f</span><span>"</span><span>API error: </span><span>{</span><span>response</span><span>.</span><span>status</span><span>}</span><span> - </span><span>{</span><span>text</span><span>}</span><span>"</span><span>)</span> <span>return</span> <span>False</span> <span>return</span> <span>True</span> <span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span> <span>logger</span><span>.</span><span>exception</span><span>(</span><span>f</span><span>"</span><span>Error processing message: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span> <span>return</span> <span>False</span> <span>async</span> <span>def</span> <span>worker</span><span>(</span><span>self</span><span>,</span> <span>worker_id</span><span>):</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Worker </span><span>{</span><span>worker_id</span><span>}</span><span> started</span><span>"</span><span>)</span> <span>while</span> <span>self</span><span>.</span><span>running</span><span>:</span> <span>try</span><span>:</span> <span>message</span> <span>=</span> <span>await</span> <span>self</span><span>.</span><span>queue</span><span>.</span><span>get</span><span>()</span> <span>success</span> <span>=</span> <span>await</span> <span>self</span><span>.</span><span>_process_message</span><span>(</span><span>message</span><span>)</span> <span>if</span> <span>not</span> <span>success</span><span>:</span> <span># Implement retry or dead-letter logic </span> <span>logger</span><span>.</span><span>warning</span><span>(</span><span>f</span><span>"</span><span>Message processing failed, retrying later</span><span>"</span><span>)</span> <span># Could use a separate queue for retries with delay </span> <span>self</span><span>.</span><span>queue</span><span>.</span><span>task_done</span><span>()</span> <span>self</span><span>.</span><span>processed_count</span> <span>+=</span> <span>1</span> <span># Log stats periodically </span> <span>if</span> <span>self</span><span>.</span><span>processed_count</span> <span>%</span> <span>100</span> <span>==</span> <span>0</span><span>:</span> <span>self</span><span>.</span><span>_log_stats</span><span>()</span> <span>except</span> <span>asyncio</span><span>.</span><span>CancelledError</span><span>:</span> <span>break</span> <span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span> <span>logger</span><span>.</span><span>exception</span><span>(</span><span>f</span><span>"</span><span>Worker </span><span>{</span><span>worker_id</span><span>}</span><span> encountered an error: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Worker </span><span>{</span><span>worker_id</span><span>}</span><span> stopped</span><span>"</span><span>)</span> <span>def</span> <span>_log_stats</span><span>(</span><span>self</span><span>):</span> <span>now</span> <span>=</span> <span>datetime</span><span>.</span><span>now</span><span>()</span> <span>elapsed</span> <span>=</span> <span>(</span><span>now</span> <span>-</span> <span>self</span><span>.</span><span>start_time</span><span>).</span><span>total_seconds</span><span>()</span> <span>rate</span> <span>=</span> <span>self</span><span>.</span><span>processed_count</span> <span>/</span> <span>elapsed</span> <span>if</span> <span>elapsed</span> <span>></span> <span>0</span> <span>else</span> <span>0</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Processed </span><span>{</span><span>self</span><span>.</span><span>processed_count</span><span>}</span><span> messages at </span><span>{</span><span>rate</span><span>:</span><span>.</span><span>2</span><span>f</span><span>}</span><span> msg/sec</span><span>"</span><span>)</span> <span>async</span> <span>def</span> <span>start</span><span>(</span><span>self</span><span>):</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Starting message processor</span><span>"</span><span>)</span> <span>self</span><span>.</span><span>running</span> <span>=</span> <span>True</span> <span>self</span><span>.</span><span>start_time</span> <span>=</span> <span>datetime</span><span>.</span><span>now</span><span>()</span> <span>self</span><span>.</span><span>workers</span> <span>=</span> <span>[</span> <span>asyncio</span><span>.</span><span>create_task</span><span>(</span><span>self</span><span>.</span><span>worker</span><span>(</span><span>i</span><span>))</span> <span>for</span> <span>i</span> <span>in</span> <span>range</span><span>(</span><span>self</span><span>.</span><span>worker_count</span><span>)</span> <span>]</span> <span>async</span> <span>def</span> <span>stop</span><span>(</span><span>self</span><span>):</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Stopping message processor</span><span>"</span><span>)</span> <span>self</span><span>.</span><span>running</span> <span>=</span> <span>False</span> <span># Cancel all workers </span> <span>for</span> <span>worker</span> <span>in</span> <span>self</span><span>.</span><span>workers</span><span>:</span> <span>worker</span><span>.</span><span>cancel</span><span>()</span> <span># Wait for workers to finish </span> <span>await</span> <span>asyncio</span><span>.</span><span>gather</span><span>(</span><span>*</span><span>self</span><span>.</span><span>workers</span><span>,</span> <span>return_exceptions</span><span>=</span><span>True</span><span>)</span> <span># Wait for queue to be empty </span> <span>if</span> <span>not</span> <span>self</span><span>.</span><span>queue</span><span>.</span><span>empty</span><span>():</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Waiting for queue to drain (</span><span>{</span><span>self</span><span>.</span><span>queue</span><span>.</span><span>qsize</span><span>()</span><span>}</span><span> items remaining)</span><span>"</span><span>)</span> <span>await</span> <span>self</span><span>.</span><span>queue</span><span>.</span><span>join</span><span>()</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Message processor stopped. Processed </span><span>{</span><span>self</span><span>.</span><span>processed_count</span><span>}</span><span> messages total</span><span>"</span><span>)</span> <span>async</span> <span>def</span> <span>main</span><span>():</span> <span># Create processor </span> <span>processor</span> <span>=</span> <span>AsyncMessageProcessor</span><span>(</span><span>worker_count</span><span>=</span><span>20</span><span>)</span> <span># Setup signal handlers </span> <span>loop</span> <span>=</span> <span>asyncio</span><span>.</span><span>get_running_loop</span><span>()</span> <span>for</span> <span>signame</span> <span>in</span> <span>(</span><span>'</span><span>SIGINT</span><span>'</span><span>,</span> <span>'</span><span>SIGTERM</span><span>'</span><span>):</span> <span>loop</span><span>.</span><span>add_signal_handler</span><span>(</span> <span>getattr</span><span>(</span><span>signal</span><span>,</span> <span>signame</span><span>),</span> <span>lambda</span><span>:</span> <span>asyncio</span><span>.</span><span>create_task</span><span>(</span><span>processor</span><span>.</span><span>stop</span><span>())</span> <span>)</span> <span># Start processor </span> <span>await</span> <span>processor</span><span>.</span><span>start</span><span>()</span> <span># Simulate message production </span> <span>try</span><span>:</span> <span>for</span> <span>i</span> <span>in</span> <span>range</span><span>(</span><span>1000</span><span>):</span> <span>message</span> <span>=</span> <span>{</span><span>"</span><span>id</span><span>"</span><span>:</span> <span>i</span><span>,</span> <span>"</span><span>timestamp</span><span>"</span><span>:</span> <span>datetime</span><span>.</span><span>now</span><span>().</span><span>isoformat</span><span>(),</span> <span>"</span><span>data</span><span>"</span><span>:</span> <span>f</span><span>"</span><span>Message </span><span>{</span><span>i</span><span>}</span><span>"</span><span>}</span> <span>await</span> <span>processor</span><span>.</span><span>enqueue_message</span><span>(</span><span>message</span><span>)</span> <span>if</span> <span>i</span> <span>%</span> <span>100</span> <span>==</span> <span>0</span><span>:</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Enqueued </span><span>{</span><span>i</span><span>}</span><span> messages</span><span>"</span><span>)</span> <span>await</span> <span>asyncio</span><span>.</span><span>sleep</span><span>(</span><span>0.01</span><span>)</span> <span># Simulate message arrival rate </span> <span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span> <span>logger</span><span>.</span><span>exception</span><span>(</span><span>f</span><span>"</span><span>Error producing messages: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span> <span>finally</span><span>:</span> <span># Wait for all messages to be processed </span> <span>await</span> <span>processor</span><span>.</span><span>stop</span><span>()</span> <span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span> <span>asyncio</span><span>.</span><span>run</span><span>(</span><span>main</span><span>())</span>import asyncio import json import aiohttp import signal import functools import logging from datetime import datetime # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class AsyncMessageProcessor: def __init__(self, max_queue_size=1000, worker_count=10): self.queue = asyncio.Queue(maxsize=max_queue_size) self.worker_count = worker_count self.workers = [] self.running = False self.processed_count = 0 self.start_time = None async def enqueue_message(self, message): await self.queue.put(message) async def _process_message(self, message): try: # Example: Process message and send to an API async with aiohttp.ClientSession() as session: async with session.post( 'https://example.com/api/process', json=message, timeout=5 ) as response: if response.status >= 400: text = await response.text() logger.error(f"API error: {response.status} - {text}") return False return True except Exception as e: logger.exception(f"Error processing message: {e}") return False async def worker(self, worker_id): logger.info(f"Worker {worker_id} started") while self.running: try: message = await self.queue.get() success = await self._process_message(message) if not success: # Implement retry or dead-letter logic logger.warning(f"Message processing failed, retrying later") # Could use a separate queue for retries with delay self.queue.task_done() self.processed_count += 1 # Log stats periodically if self.processed_count % 100 == 0: self._log_stats() except asyncio.CancelledError: break except Exception as e: logger.exception(f"Worker {worker_id} encountered an error: {e}") logger.info(f"Worker {worker_id} stopped") def _log_stats(self): now = datetime.now() elapsed = (now - self.start_time).total_seconds() rate = self.processed_count / elapsed if elapsed > 0 else 0 logger.info(f"Processed {self.processed_count} messages at {rate:.2f} msg/sec") async def start(self): logger.info("Starting message processor") self.running = True self.start_time = datetime.now() self.workers = [ asyncio.create_task(self.worker(i)) for i in range(self.worker_count) ] async def stop(self): logger.info("Stopping message processor") self.running = False # Cancel all workers for worker in self.workers: worker.cancel() # Wait for workers to finish await asyncio.gather(*self.workers, return_exceptions=True) # Wait for queue to be empty if not self.queue.empty(): logger.info(f"Waiting for queue to drain ({self.queue.qsize()} items remaining)") await self.queue.join() logger.info(f"Message processor stopped. Processed {self.processed_count} messages total") async def main(): # Create processor processor = AsyncMessageProcessor(worker_count=20) # Setup signal handlers loop = asyncio.get_running_loop() for signame in ('SIGINT', 'SIGTERM'): loop.add_signal_handler( getattr(signal, signame), lambda: asyncio.create_task(processor.stop()) ) # Start processor await processor.start() # Simulate message production try: for i in range(1000): message = {"id": i, "timestamp": datetime.now().isoformat(), "data": f"Message {i}"} await processor.enqueue_message(message) if i % 100 == 0: logger.info(f"Enqueued {i} messages") await asyncio.sleep(0.01) # Simulate message arrival rate except Exception as e: logger.exception(f"Error producing messages: {e}") finally: # Wait for all messages to be processed await processor.stop() if __name__ == "__main__": asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode
This approach works exceptionally well for I/O-bound tasks, as it achieves high concurrency without the overhead of multiple processes or threads. I’ve successfully used this pattern for processing web hooks and API notifications at scale.
Implementing Retry Mechanisms and Dead-Letter Queues
Robust message queue processing requires proper handling of failures through retry mechanisms and dead-letter queues:
<span>import</span> <span>pika</span><span>import</span> <span>json</span><span>import</span> <span>time</span><span>import</span> <span>logging</span><span>from</span> <span>datetime</span> <span>import</span> <span>datetime</span><span># Configure logging </span><span>logging</span><span>.</span><span>basicConfig</span><span>(</span><span>level</span><span>=</span><span>logging</span><span>.</span><span>INFO</span><span>)</span><span>logger</span> <span>=</span> <span>logging</span><span>.</span><span>getLogger</span><span>(</span><span>__name__</span><span>)</span><span>class</span> <span>RetryHandler</span><span>:</span><span>def</span> <span>__init__</span><span>(</span><span>self</span><span>,</span> <span>host</span><span>=</span><span>'</span><span>localhost</span><span>'</span><span>,</span> <span>retry_delays</span><span>=</span><span>None</span><span>,</span> <span>max_retries</span><span>=</span><span>3</span><span>):</span><span>self</span><span>.</span><span>connection</span> <span>=</span> <span>pika</span><span>.</span><span>BlockingConnection</span><span>(</span><span>pika</span><span>.</span><span>ConnectionParameters</span><span>(</span><span>host</span><span>=</span><span>host</span><span>))</span><span>self</span><span>.</span><span>channel</span> <span>=</span> <span>self</span><span>.</span><span>connection</span><span>.</span><span>channel</span><span>()</span><span># Default retry delays (exponential backoff) </span> <span>self</span><span>.</span><span>retry_delays</span> <span>=</span> <span>retry_delays</span> <span>or</span> <span>[</span><span>5</span><span>,</span> <span>15</span><span>,</span> <span>30</span><span>,</span> <span>60</span><span>,</span> <span>120</span><span>]</span><span>self</span><span>.</span><span>max_retries</span> <span>=</span> <span>max_retries</span><span># Declare queues </span> <span>self</span><span>.</span><span>channel</span><span>.</span><span>queue_declare</span><span>(</span><span>queue</span><span>=</span><span>'</span><span>main_queue</span><span>'</span><span>,</span> <span>durable</span><span>=</span><span>True</span><span>)</span><span>self</span><span>.</span><span>channel</span><span>.</span><span>queue_declare</span><span>(</span><span>queue</span><span>=</span><span>'</span><span>retry_queue</span><span>'</span><span>,</span> <span>durable</span><span>=</span><span>True</span><span>)</span><span>self</span><span>.</span><span>channel</span><span>.</span><span>queue_declare</span><span>(</span><span>queue</span><span>=</span><span>'</span><span>dead_letter_queue</span><span>'</span><span>,</span> <span>durable</span><span>=</span><span>True</span><span>)</span><span>def</span> <span>publish_message</span><span>(</span><span>self</span><span>,</span> <span>queue</span><span>,</span> <span>message</span><span>,</span> <span>headers</span><span>=</span><span>None</span><span>):</span><span>properties</span> <span>=</span> <span>pika</span><span>.</span><span>BasicProperties</span><span>(</span><span>delivery_mode</span><span>=</span><span>2</span><span>,</span> <span># Make message persistent </span> <span>headers</span><span>=</span><span>headers</span> <span>or</span> <span>{}</span><span>)</span><span>self</span><span>.</span><span>channel</span><span>.</span><span>basic_publish</span><span>(</span><span>exchange</span><span>=</span><span>''</span><span>,</span><span>routing_key</span><span>=</span><span>queue</span><span>,</span><span>body</span><span>=</span><span>json</span><span>.</span><span>dumps</span><span>(</span><span>message</span><span>),</span><span>properties</span><span>=</span><span>properties</span><span>)</span><span>def</span> <span>process_main_queue</span><span>(</span><span>self</span><span>):</span><span>def</span> <span>callback</span><span>(</span><span>ch</span><span>,</span> <span>method</span><span>,</span> <span>properties</span><span>,</span> <span>body</span><span>):</span><span>try</span><span>:</span><span>message</span> <span>=</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>body</span><span>)</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Processing message: </span><span>{</span><span>message</span><span>}</span><span>"</span><span>)</span><span># Simulate processing that sometimes fails </span> <span>if</span> <span>'</span><span>id</span><span>'</span> <span>in</span> <span>message</span> <span>and</span> <span>message</span><span>[</span><span>'</span><span>id</span><span>'</span><span>]</span> <span>%</span> <span>3</span> <span>==</span> <span>0</span><span>:</span><span>raise</span> <span>ValueError</span><span>(</span><span>"</span><span>Simulated processing failure</span><span>"</span><span>)</span><span># Successfully processed </span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Successfully processed message: </span><span>{</span><span>message</span><span>}</span><span>"</span><span>)</span><span>ch</span><span>.</span><span>basic_ack</span><span>(</span><span>delivery_tag</span><span>=</span><span>method</span><span>.</span><span>delivery_tag</span><span>)</span><span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span><span>logger</span><span>.</span><span>error</span><span>(</span><span>f</span><span>"</span><span>Error processing message: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span><span># Get retry count from headers or default to 0 </span> <span>headers</span> <span>=</span> <span>properties</span><span>.</span><span>headers</span> <span>or</span> <span>{}</span><span>retry_count</span> <span>=</span> <span>headers</span><span>.</span><span>get</span><span>(</span><span>'</span><span>x-retry-count</span><span>'</span><span>,</span> <span>0</span><span>)</span><span>if</span> <span>retry_count</span> <span><</span> <span>self</span><span>.</span><span>max_retries</span><span>:</span><span># Schedule for retry with appropriate delay </span> <span>delay_index</span> <span>=</span> <span>min</span><span>(</span><span>retry_count</span><span>,</span> <span>len</span><span>(</span><span>self</span><span>.</span><span>retry_delays</span><span>)</span> <span>-</span> <span>1</span><span>)</span><span>delay</span> <span>=</span> <span>self</span><span>.</span><span>retry_delays</span><span>[</span><span>delay_index</span><span>]</span><span>new_headers</span> <span>=</span> <span>headers</span><span>.</span><span>copy</span><span>()</span><span>new_headers</span><span>[</span><span>'</span><span>x-retry-count</span><span>'</span><span>]</span> <span>=</span> <span>retry_count</span> <span>+</span> <span>1</span><span>new_headers</span><span>[</span><span>'</span><span>x-original-queue</span><span>'</span><span>]</span> <span>=</span> <span>'</span><span>main_queue</span><span>'</span><span>new_headers</span><span>[</span><span>'</span><span>x-error</span><span>'</span><span>]</span> <span>=</span> <span>str</span><span>(</span><span>e</span><span>)</span><span>new_headers</span><span>[</span><span>'</span><span>x-failed-at</span><span>'</span><span>]</span> <span>=</span> <span>datetime</span><span>.</span><span>now</span><span>().</span><span>isoformat</span><span>()</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Scheduling retry #</span><span>{</span><span>retry_count</span> <span>+</span> <span>1</span><span>}</span><span> after </span><span>{</span><span>delay</span><span>}</span><span>s</span><span>"</span><span>)</span><span># In a real implementation, we'd use a delay queue mechanism </span> <span># For simplicity, we're just sending to a retry queue immediately </span> <span>self</span><span>.</span><span>publish_message</span><span>(</span><span>'</span><span>retry_queue</span><span>'</span><span>,</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>body</span><span>),</span> <span>new_headers</span><span>)</span><span>else</span><span>:</span><span># Move to dead letter queue </span> <span>new_headers</span> <span>=</span> <span>headers</span><span>.</span><span>copy</span><span>()</span><span>new_headers</span><span>[</span><span>'</span><span>x-error</span><span>'</span><span>]</span> <span>=</span> <span>str</span><span>(</span><span>e</span><span>)</span><span>new_headers</span><span>[</span><span>'</span><span>x-failed-at</span><span>'</span><span>]</span> <span>=</span> <span>datetime</span><span>.</span><span>now</span><span>().</span><span>isoformat</span><span>()</span><span>new_headers</span><span>[</span><span>'</span><span>x-original-queue</span><span>'</span><span>]</span> <span>=</span> <span>'</span><span>main_queue</span><span>'</span><span>logger</span><span>.</span><span>warning</span><span>(</span><span>f</span><span>"</span><span>Moving message to dead letter queue after </span><span>{</span><span>retry_count</span><span>}</span><span> retries</span><span>"</span><span>)</span><span>self</span><span>.</span><span>publish_message</span><span>(</span><span>'</span><span>dead_letter_queue</span><span>'</span><span>,</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>body</span><span>),</span> <span>new_headers</span><span>)</span><span># Acknowledge the original message </span> <span>ch</span><span>.</span><span>basic_ack</span><span>(</span><span>delivery_tag</span><span>=</span><span>method</span><span>.</span><span>delivery_tag</span><span>)</span><span>self</span><span>.</span><span>channel</span><span>.</span><span>basic_qos</span><span>(</span><span>prefetch_count</span><span>=</span><span>1</span><span>)</span><span>self</span><span>.</span><span>channel</span><span>.</span><span>basic_consume</span><span>(</span><span>queue</span><span>=</span><span>'</span><span>main_queue</span><span>'</span><span>,</span> <span>on_message_callback</span><span>=</span><span>callback</span><span>)</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Waiting for messages. To exit press CTRL+C</span><span>"</span><span>)</span><span>self</span><span>.</span><span>channel</span><span>.</span><span>start_consuming</span><span>()</span><span>def</span> <span>process_retry_queue</span><span>(</span><span>self</span><span>):</span><span># In a real implementation, this would handle scheduled retries </span> <span># For now, it just moves messages back to the main queue </span> <span>def</span> <span>callback</span><span>(</span><span>ch</span><span>,</span> <span>method</span><span>,</span> <span>properties</span><span>,</span> <span>body</span><span>):</span><span>try</span><span>:</span><span>message</span> <span>=</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>body</span><span>)</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Retrying message: </span><span>{</span><span>message</span><span>}</span><span>"</span><span>)</span><span># Get original queue from headers </span> <span>headers</span> <span>=</span> <span>properties</span><span>.</span><span>headers</span> <span>or</span> <span>{}</span><span>original_queue</span> <span>=</span> <span>headers</span><span>.</span><span>get</span><span>(</span><span>'</span><span>x-original-queue</span><span>'</span><span>,</span> <span>'</span><span>main_queue</span><span>'</span><span>)</span><span># In a real implementation, we'd check if the delay period has passed </span> <span># and only then re-publish the message </span><span>self</span><span>.</span><span>publish_message</span><span>(</span><span>original_queue</span><span>,</span> <span>message</span><span>,</span> <span>headers</span><span>)</span><span>ch</span><span>.</span><span>basic_ack</span><span>(</span><span>delivery_tag</span><span>=</span><span>method</span><span>.</span><span>delivery_tag</span><span>)</span><span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span><span>logger</span><span>.</span><span>error</span><span>(</span><span>f</span><span>"</span><span>Error in retry handler: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span><span>ch</span><span>.</span><span>basic_nack</span><span>(</span><span>delivery_tag</span><span>=</span><span>method</span><span>.</span><span>delivery_tag</span><span>,</span> <span>requeue</span><span>=</span><span>True</span><span>)</span><span>self</span><span>.</span><span>channel</span><span>.</span><span>basic_qos</span><span>(</span><span>prefetch_count</span><span>=</span><span>1</span><span>)</span><span>self</span><span>.</span><span>channel</span><span>.</span><span>basic_consume</span><span>(</span><span>queue</span><span>=</span><span>'</span><span>retry_queue</span><span>'</span><span>,</span> <span>on_message_callback</span><span>=</span><span>callback</span><span>)</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Retry handler started. To exit press CTRL+C</span><span>"</span><span>)</span><span>self</span><span>.</span><span>channel</span><span>.</span><span>start_consuming</span><span>()</span><span>def</span> <span>close</span><span>(</span><span>self</span><span>):</span><span>self</span><span>.</span><span>connection</span><span>.</span><span>close</span><span>()</span><span># Example usage </span><span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span><span># In a real application, you'd run these in separate processes </span><span># Publish some test messages </span> <span>handler</span> <span>=</span> <span>RetryHandler</span><span>()</span><span>for</span> <span>i</span> <span>in</span> <span>range</span><span>(</span><span>10</span><span>):</span><span>handler</span><span>.</span><span>publish_message</span><span>(</span><span>'</span><span>main_queue</span><span>'</span><span>,</span> <span>{</span><span>"</span><span>id</span><span>"</span><span>:</span> <span>i</span><span>,</span> <span>"</span><span>data</span><span>"</span><span>:</span> <span>f</span><span>"</span><span>Test message </span><span>{</span><span>i</span><span>}</span><span>"</span><span>})</span><span>handler</span><span>.</span><span>close</span><span>()</span><span># Process messages </span> <span>handler</span> <span>=</span> <span>RetryHandler</span><span>()</span><span>handler</span><span>.</span><span>process_main_queue</span><span>()</span><span>import</span> <span>pika</span> <span>import</span> <span>json</span> <span>import</span> <span>time</span> <span>import</span> <span>logging</span> <span>from</span> <span>datetime</span> <span>import</span> <span>datetime</span> <span># Configure logging </span><span>logging</span><span>.</span><span>basicConfig</span><span>(</span><span>level</span><span>=</span><span>logging</span><span>.</span><span>INFO</span><span>)</span> <span>logger</span> <span>=</span> <span>logging</span><span>.</span><span>getLogger</span><span>(</span><span>__name__</span><span>)</span> <span>class</span> <span>RetryHandler</span><span>:</span> <span>def</span> <span>__init__</span><span>(</span><span>self</span><span>,</span> <span>host</span><span>=</span><span>'</span><span>localhost</span><span>'</span><span>,</span> <span>retry_delays</span><span>=</span><span>None</span><span>,</span> <span>max_retries</span><span>=</span><span>3</span><span>):</span> <span>self</span><span>.</span><span>connection</span> <span>=</span> <span>pika</span><span>.</span><span>BlockingConnection</span><span>(</span><span>pika</span><span>.</span><span>ConnectionParameters</span><span>(</span><span>host</span><span>=</span><span>host</span><span>))</span> <span>self</span><span>.</span><span>channel</span> <span>=</span> <span>self</span><span>.</span><span>connection</span><span>.</span><span>channel</span><span>()</span> <span># Default retry delays (exponential backoff) </span> <span>self</span><span>.</span><span>retry_delays</span> <span>=</span> <span>retry_delays</span> <span>or</span> <span>[</span><span>5</span><span>,</span> <span>15</span><span>,</span> <span>30</span><span>,</span> <span>60</span><span>,</span> <span>120</span><span>]</span> <span>self</span><span>.</span><span>max_retries</span> <span>=</span> <span>max_retries</span> <span># Declare queues </span> <span>self</span><span>.</span><span>channel</span><span>.</span><span>queue_declare</span><span>(</span><span>queue</span><span>=</span><span>'</span><span>main_queue</span><span>'</span><span>,</span> <span>durable</span><span>=</span><span>True</span><span>)</span> <span>self</span><span>.</span><span>channel</span><span>.</span><span>queue_declare</span><span>(</span><span>queue</span><span>=</span><span>'</span><span>retry_queue</span><span>'</span><span>,</span> <span>durable</span><span>=</span><span>True</span><span>)</span> <span>self</span><span>.</span><span>channel</span><span>.</span><span>queue_declare</span><span>(</span><span>queue</span><span>=</span><span>'</span><span>dead_letter_queue</span><span>'</span><span>,</span> <span>durable</span><span>=</span><span>True</span><span>)</span> <span>def</span> <span>publish_message</span><span>(</span><span>self</span><span>,</span> <span>queue</span><span>,</span> <span>message</span><span>,</span> <span>headers</span><span>=</span><span>None</span><span>):</span> <span>properties</span> <span>=</span> <span>pika</span><span>.</span><span>BasicProperties</span><span>(</span> <span>delivery_mode</span><span>=</span><span>2</span><span>,</span> <span># Make message persistent </span> <span>headers</span><span>=</span><span>headers</span> <span>or</span> <span>{}</span> <span>)</span> <span>self</span><span>.</span><span>channel</span><span>.</span><span>basic_publish</span><span>(</span> <span>exchange</span><span>=</span><span>''</span><span>,</span> <span>routing_key</span><span>=</span><span>queue</span><span>,</span> <span>body</span><span>=</span><span>json</span><span>.</span><span>dumps</span><span>(</span><span>message</span><span>),</span> <span>properties</span><span>=</span><span>properties</span> <span>)</span> <span>def</span> <span>process_main_queue</span><span>(</span><span>self</span><span>):</span> <span>def</span> <span>callback</span><span>(</span><span>ch</span><span>,</span> <span>method</span><span>,</span> <span>properties</span><span>,</span> <span>body</span><span>):</span> <span>try</span><span>:</span> <span>message</span> <span>=</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>body</span><span>)</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Processing message: </span><span>{</span><span>message</span><span>}</span><span>"</span><span>)</span> <span># Simulate processing that sometimes fails </span> <span>if</span> <span>'</span><span>id</span><span>'</span> <span>in</span> <span>message</span> <span>and</span> <span>message</span><span>[</span><span>'</span><span>id</span><span>'</span><span>]</span> <span>%</span> <span>3</span> <span>==</span> <span>0</span><span>:</span> <span>raise</span> <span>ValueError</span><span>(</span><span>"</span><span>Simulated processing failure</span><span>"</span><span>)</span> <span># Successfully processed </span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Successfully processed message: </span><span>{</span><span>message</span><span>}</span><span>"</span><span>)</span> <span>ch</span><span>.</span><span>basic_ack</span><span>(</span><span>delivery_tag</span><span>=</span><span>method</span><span>.</span><span>delivery_tag</span><span>)</span> <span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span> <span>logger</span><span>.</span><span>error</span><span>(</span><span>f</span><span>"</span><span>Error processing message: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span> <span># Get retry count from headers or default to 0 </span> <span>headers</span> <span>=</span> <span>properties</span><span>.</span><span>headers</span> <span>or</span> <span>{}</span> <span>retry_count</span> <span>=</span> <span>headers</span><span>.</span><span>get</span><span>(</span><span>'</span><span>x-retry-count</span><span>'</span><span>,</span> <span>0</span><span>)</span> <span>if</span> <span>retry_count</span> <span><</span> <span>self</span><span>.</span><span>max_retries</span><span>:</span> <span># Schedule for retry with appropriate delay </span> <span>delay_index</span> <span>=</span> <span>min</span><span>(</span><span>retry_count</span><span>,</span> <span>len</span><span>(</span><span>self</span><span>.</span><span>retry_delays</span><span>)</span> <span>-</span> <span>1</span><span>)</span> <span>delay</span> <span>=</span> <span>self</span><span>.</span><span>retry_delays</span><span>[</span><span>delay_index</span><span>]</span> <span>new_headers</span> <span>=</span> <span>headers</span><span>.</span><span>copy</span><span>()</span> <span>new_headers</span><span>[</span><span>'</span><span>x-retry-count</span><span>'</span><span>]</span> <span>=</span> <span>retry_count</span> <span>+</span> <span>1</span> <span>new_headers</span><span>[</span><span>'</span><span>x-original-queue</span><span>'</span><span>]</span> <span>=</span> <span>'</span><span>main_queue</span><span>'</span> <span>new_headers</span><span>[</span><span>'</span><span>x-error</span><span>'</span><span>]</span> <span>=</span> <span>str</span><span>(</span><span>e</span><span>)</span> <span>new_headers</span><span>[</span><span>'</span><span>x-failed-at</span><span>'</span><span>]</span> <span>=</span> <span>datetime</span><span>.</span><span>now</span><span>().</span><span>isoformat</span><span>()</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Scheduling retry #</span><span>{</span><span>retry_count</span> <span>+</span> <span>1</span><span>}</span><span> after </span><span>{</span><span>delay</span><span>}</span><span>s</span><span>"</span><span>)</span> <span># In a real implementation, we'd use a delay queue mechanism </span> <span># For simplicity, we're just sending to a retry queue immediately </span> <span>self</span><span>.</span><span>publish_message</span><span>(</span><span>'</span><span>retry_queue</span><span>'</span><span>,</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>body</span><span>),</span> <span>new_headers</span><span>)</span> <span>else</span><span>:</span> <span># Move to dead letter queue </span> <span>new_headers</span> <span>=</span> <span>headers</span><span>.</span><span>copy</span><span>()</span> <span>new_headers</span><span>[</span><span>'</span><span>x-error</span><span>'</span><span>]</span> <span>=</span> <span>str</span><span>(</span><span>e</span><span>)</span> <span>new_headers</span><span>[</span><span>'</span><span>x-failed-at</span><span>'</span><span>]</span> <span>=</span> <span>datetime</span><span>.</span><span>now</span><span>().</span><span>isoformat</span><span>()</span> <span>new_headers</span><span>[</span><span>'</span><span>x-original-queue</span><span>'</span><span>]</span> <span>=</span> <span>'</span><span>main_queue</span><span>'</span> <span>logger</span><span>.</span><span>warning</span><span>(</span><span>f</span><span>"</span><span>Moving message to dead letter queue after </span><span>{</span><span>retry_count</span><span>}</span><span> retries</span><span>"</span><span>)</span> <span>self</span><span>.</span><span>publish_message</span><span>(</span><span>'</span><span>dead_letter_queue</span><span>'</span><span>,</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>body</span><span>),</span> <span>new_headers</span><span>)</span> <span># Acknowledge the original message </span> <span>ch</span><span>.</span><span>basic_ack</span><span>(</span><span>delivery_tag</span><span>=</span><span>method</span><span>.</span><span>delivery_tag</span><span>)</span> <span>self</span><span>.</span><span>channel</span><span>.</span><span>basic_qos</span><span>(</span><span>prefetch_count</span><span>=</span><span>1</span><span>)</span> <span>self</span><span>.</span><span>channel</span><span>.</span><span>basic_consume</span><span>(</span><span>queue</span><span>=</span><span>'</span><span>main_queue</span><span>'</span><span>,</span> <span>on_message_callback</span><span>=</span><span>callback</span><span>)</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Waiting for messages. To exit press CTRL+C</span><span>"</span><span>)</span> <span>self</span><span>.</span><span>channel</span><span>.</span><span>start_consuming</span><span>()</span> <span>def</span> <span>process_retry_queue</span><span>(</span><span>self</span><span>):</span> <span># In a real implementation, this would handle scheduled retries </span> <span># For now, it just moves messages back to the main queue </span> <span>def</span> <span>callback</span><span>(</span><span>ch</span><span>,</span> <span>method</span><span>,</span> <span>properties</span><span>,</span> <span>body</span><span>):</span> <span>try</span><span>:</span> <span>message</span> <span>=</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>body</span><span>)</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>Retrying message: </span><span>{</span><span>message</span><span>}</span><span>"</span><span>)</span> <span># Get original queue from headers </span> <span>headers</span> <span>=</span> <span>properties</span><span>.</span><span>headers</span> <span>or</span> <span>{}</span> <span>original_queue</span> <span>=</span> <span>headers</span><span>.</span><span>get</span><span>(</span><span>'</span><span>x-original-queue</span><span>'</span><span>,</span> <span>'</span><span>main_queue</span><span>'</span><span>)</span> <span># In a real implementation, we'd check if the delay period has passed </span> <span># and only then re-publish the message </span> <span>self</span><span>.</span><span>publish_message</span><span>(</span><span>original_queue</span><span>,</span> <span>message</span><span>,</span> <span>headers</span><span>)</span> <span>ch</span><span>.</span><span>basic_ack</span><span>(</span><span>delivery_tag</span><span>=</span><span>method</span><span>.</span><span>delivery_tag</span><span>)</span> <span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span> <span>logger</span><span>.</span><span>error</span><span>(</span><span>f</span><span>"</span><span>Error in retry handler: </span><span>{</span><span>e</span><span>}</span><span>"</span><span>)</span> <span>ch</span><span>.</span><span>basic_nack</span><span>(</span><span>delivery_tag</span><span>=</span><span>method</span><span>.</span><span>delivery_tag</span><span>,</span> <span>requeue</span><span>=</span><span>True</span><span>)</span> <span>self</span><span>.</span><span>channel</span><span>.</span><span>basic_qos</span><span>(</span><span>prefetch_count</span><span>=</span><span>1</span><span>)</span> <span>self</span><span>.</span><span>channel</span><span>.</span><span>basic_consume</span><span>(</span><span>queue</span><span>=</span><span>'</span><span>retry_queue</span><span>'</span><span>,</span> <span>on_message_callback</span><span>=</span><span>callback</span><span>)</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Retry handler started. To exit press CTRL+C</span><span>"</span><span>)</span> <span>self</span><span>.</span><span>channel</span><span>.</span><span>start_consuming</span><span>()</span> <span>def</span> <span>close</span><span>(</span><span>self</span><span>):</span> <span>self</span><span>.</span><span>connection</span><span>.</span><span>close</span><span>()</span> <span># Example usage </span><span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span> <span># In a real application, you'd run these in separate processes </span> <span># Publish some test messages </span> <span>handler</span> <span>=</span> <span>RetryHandler</span><span>()</span> <span>for</span> <span>i</span> <span>in</span> <span>range</span><span>(</span><span>10</span><span>):</span> <span>handler</span><span>.</span><span>publish_message</span><span>(</span><span>'</span><span>main_queue</span><span>'</span><span>,</span> <span>{</span><span>"</span><span>id</span><span>"</span><span>:</span> <span>i</span><span>,</span> <span>"</span><span>data</span><span>"</span><span>:</span> <span>f</span><span>"</span><span>Test message </span><span>{</span><span>i</span><span>}</span><span>"</span><span>})</span> <span>handler</span><span>.</span><span>close</span><span>()</span> <span># Process messages </span> <span>handler</span> <span>=</span> <span>RetryHandler</span><span>()</span> <span>handler</span><span>.</span><span>process_main_queue</span><span>()</span>import pika import json import time import logging from datetime import datetime # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class RetryHandler: def __init__(self, host='localhost', retry_delays=None, max_retries=3): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=host)) self.channel = self.connection.channel() # Default retry delays (exponential backoff) self.retry_delays = retry_delays or [5, 15, 30, 60, 120] self.max_retries = max_retries # Declare queues self.channel.queue_declare(queue='main_queue', durable=True) self.channel.queue_declare(queue='retry_queue', durable=True) self.channel.queue_declare(queue='dead_letter_queue', durable=True) def publish_message(self, queue, message, headers=None): properties = pika.BasicProperties( delivery_mode=2, # Make message persistent headers=headers or {} ) self.channel.basic_publish( exchange='', routing_key=queue, body=json.dumps(message), properties=properties ) def process_main_queue(self): def callback(ch, method, properties, body): try: message = json.loads(body) logger.info(f"Processing message: {message}") # Simulate processing that sometimes fails if 'id' in message and message['id'] % 3 == 0: raise ValueError("Simulated processing failure") # Successfully processed logger.info(f"Successfully processed message: {message}") ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: logger.error(f"Error processing message: {e}") # Get retry count from headers or default to 0 headers = properties.headers or {} retry_count = headers.get('x-retry-count', 0) if retry_count < self.max_retries: # Schedule for retry with appropriate delay delay_index = min(retry_count, len(self.retry_delays) - 1) delay = self.retry_delays[delay_index] new_headers = headers.copy() new_headers['x-retry-count'] = retry_count + 1 new_headers['x-original-queue'] = 'main_queue' new_headers['x-error'] = str(e) new_headers['x-failed-at'] = datetime.now().isoformat() logger.info(f"Scheduling retry #{retry_count + 1} after {delay}s") # In a real implementation, we'd use a delay queue mechanism # For simplicity, we're just sending to a retry queue immediately self.publish_message('retry_queue', json.loads(body), new_headers) else: # Move to dead letter queue new_headers = headers.copy() new_headers['x-error'] = str(e) new_headers['x-failed-at'] = datetime.now().isoformat() new_headers['x-original-queue'] = 'main_queue' logger.warning(f"Moving message to dead letter queue after {retry_count} retries") self.publish_message('dead_letter_queue', json.loads(body), new_headers) # Acknowledge the original message ch.basic_ack(delivery_tag=method.delivery_tag) self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume(queue='main_queue', on_message_callback=callback) logger.info("Waiting for messages. To exit press CTRL+C") self.channel.start_consuming() def process_retry_queue(self): # In a real implementation, this would handle scheduled retries # For now, it just moves messages back to the main queue def callback(ch, method, properties, body): try: message = json.loads(body) logger.info(f"Retrying message: {message}") # Get original queue from headers headers = properties.headers or {} original_queue = headers.get('x-original-queue', 'main_queue') # In a real implementation, we'd check if the delay period has passed # and only then re-publish the message self.publish_message(original_queue, message, headers) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: logger.error(f"Error in retry handler: {e}") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume(queue='retry_queue', on_message_callback=callback) logger.info("Retry handler started. To exit press CTRL+C") self.channel.start_consuming() def close(self): self.connection.close() # Example usage if __name__ == "__main__": # In a real application, you'd run these in separate processes # Publish some test messages handler = RetryHandler() for i in range(10): handler.publish_message('main_queue', {"id": i, "data": f"Test message {i}"}) handler.close() # Process messages handler = RetryHandler() handler.process_main_queue()
Enter fullscreen mode Exit fullscreen mode
This implementation demonstrates a comprehensive retry system with dead-letter queue capabilities. For production systems, I typically use message TTL and queue-per-delay pattern for more precise retry scheduling.
Practical Applications and Best Practices
In real-world applications, message queues serve various purposes. For microservices communication, I recommend using a combination of RabbitMQ for synchronous requests and Kafka for event sourcing. When building event-driven architectures, implementing a consistent event schema and message format across the system is crucial.
Some key best practices I’ve learned from experience:
-
Always implement idempotent consumers to handle duplicate message delivery gracefully.
-
Use consumer acknowledgments to ensure reliable message processing.
-
Implement circuit breakers to handle downstream service failures.
-
Consider message ordering requirements carefully—sometimes you need strict ordering, but often you don’t.
-
Monitor queue depths and processing rates to detect processing bottlenecks.
-
Design messages to be self-contained, avoiding dependencies on external state when possible.
By applying these techniques and best practices, you can build robust, scalable systems that effectively utilize message queues for asynchronous processing. Each approach has its strengths, and choosing the right one depends on your specific requirements for throughput, reliability, and complexity.
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
原文链接:6 Powerful Python Techniques for Processing Message Queues
暂无评论内容