首页 > 代码库 > RabbitMQ--work queues(二)

RabbitMQ--work queues(二)

封装一个task到一个message,并发送到queue。consumer会去除task并执行这个task。

这里我们简化了操作,发送消息到队列中,consumer取出消息计算里面‘.‘号有几个就sleep几秒。

task.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=localhost))
channel = connection.channel()

channel.queue_declare(queue=task_queue, durable=True)

message =  .join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange=‘‘,
                      routing_key=task_queue,
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

work.py

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=localhost))
channel = connection.channel()

channel.queue_declare(queue=task_queue, durable=True)
print( [*] Waiting for messages. To exit press CTRL+C)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b.))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue=task_queue)

channel.start_consuming()

代码解释

channel.queue_declare(queue=task_queue, durable=True)
告诉rabbitmq永不丢失queue,即使rabbitmq server挂掉。
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count(.) )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue=hello)

在先前例子中,如果consumer突然死掉,回丢失掉正在处理的信息。如何避免呢?如果consumer死掉,怎么将这条信息发送的其他consumer呢?就是使用上面代码
channel.basic_qos(prefetch_count=1)

This tells RabbitMQ not to give more than one message to a worker at a time. 
Or, in other words, dont dispatch a new message to a worker until it has processed and acknowledged the previous one.
Instead, it will dispatch it to the next worker that is not still busy.
直到消费完这个消息再给consumer派送新的消息,如果没消费完,将消息发送给另一个consumer.

 

RabbitMQ--work queues(二)