首页 > 代码库 > RabbitMQ官网教程---工作队列
RabbitMQ官网教程---工作队列
(使用python的pika 0.9.8客户端)
在第一个教程中,我们写了一个从命名的队列中发送和接收消息的程序。在这一个里面,我们将创建一个Work Queue来用于在多个工作者之间分类耗时任务。
Work Queues后面的主要思想是避免理解做一些资源密集的任务并且需要等待它完成。我们用计划任务在后面完成它。我们把一个任务封装为一个消息发送给队列。一个在后台执行的工作者队列将弹出任务并且完全的执行这个工作。当你运行许多工作者的时候,这个任务将在它们之间共享。
这种概念在web应用程序中是特别有用的,因为在那里它不可能在一个短的HTTP请求窗口中处理一个复杂的任务。
准备
在这个教程的前面我们发送了一个包含"Hello World!"的消息。现在我们将发送字符串来代表复杂的任务。我们没有像重置一个图片大小或者渲染pdf文件的工作任务,因此我们就通过伪装假装我们很忙-通过使用time.sleep()方法。我们将在字符串中取许多远点作为复杂性;每个远点将对"work"占用两秒。例如,一个捏造的任务通过Hello...被描述的将花费3秒。
我们将从我们前面的例子里稍微修改send.py代码,允许任意的消息从命令行发送。这个程序将给我们的工作队列计划任务,因此我们把它命名为new_task.py:
import sys message = ‘ ‘.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange=‘‘,routing_key=‘hello‘,body=message) print "[x] sent %r"%(message,)
我们的就receive.py脚本也需要一些改变;它需要给在消息体重的每一个圆点捏造一个2秒的工作。它将从队列中弹出消息并且执行任务,因此我们就叫它worker.py:
import time def callback(ch, method, properties, body): print "[x] Received %r"%(body,) time.sleep(body.count(‘.‘)) print "[x] one"
循环分发
使用任务队列的其中一个高级功能是容易的并行任务。如果我们生成工作后台日志,我们可以添加更多的工作者并且用那种方式收放自如。
首先,我们尝试在相同时间运行两个worker.py脚本。它们将都从队列中获取消息,但是正确的是如何的呢?我们来看看。
你需要打开三个控制台。两个将运行worker.py脚本。这连个控制台僵尸我们的消费者-C1和C2。
shell1$ python worker.py [*] Waiting for messages. To exit press CTRL+C
shell2$ python worker.py [*] Waiting for messages. To exit press CTRL+C
在第三个控制台中我们将发布新任务。一旦你已经开始了消费者你可以发送一些消息了:
shell3$ python new_task.py First message. shell3$ python new_task.py Second message.. shell3$ python new_task.py Third message... shell3$ python new_task.py Fourth message.... shell3$ python new_task.py Fifth message.....
我们来看看传递给我们的工作者的是什么:
shell1$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received ‘First message.‘ [x] Received ‘Third message...‘ [x] Received ‘Fifth message.....‘
shell2$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received ‘Second message..‘ [x] Received ‘Fourth message....‘
默认的,RabbitMQ将给下面的消费者按照顺序发送每一个消息。平均每个消费者将得到相同数量的消息。这种分发消息的方式被叫做round-robin。尝试用三个或者更多的工作者试试。
消息确认
做一些能消耗几秒的任务。你也许想知道如果其中一个消费者开始了一个长任务并且完成了部分之后死掉了会发生什么事情。就当前我们的代码一旦RabbitMQ给消费者传递了消息它会立即从内存中移除。在这个例子中,如果你杀死了一个工作者我们将丢掉它正在处理的消息。我们也将丢掉所有的被分发到这个特别的工作者的任然不会被处理的消息。
但是我们不想丢掉任何任务。如果一个工作者死掉了,我们想让这个任务传递给另一个工作者。
为了确保一个消息永远不会丢失,RabbitMQ支持消息确认。一个ack是从消费者中发送反馈给RabbitMQ告诉它一个特别的消息已经被接收,处理并且RabbitMQ可以自由的删除它。
如果消费者在没有发送一个ack的时候就死掉了,RabbitMQ将认为一个消息没有完全的被处理并且将给另一个消费者重新发送。用这种方式你可以确保没有消息丢失,甚至如果工作者偶尔死掉。
没有任何消息超市;RabbitMQ将重新传递消息仅仅是当工作者连接断掉的时候。甚至如果处理一个消息任务花费了很长时间那也没事。
消息确认默认是打开的。在前面的例子中我们显式的通过no_ack=True标记将它关掉了。现在该移除它并且一旦我们用这个任务完成了,就从一个工作者中发送一个合适的确认了。
def callback(ch, method, properties, body): print "[x] received %x"%(body,) time.sleep(body.count(‘.‘)) print "[x] Done" ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue=‘hello‘)
用这个代码我们可以确保甚至如果当工作者正在处理一个消息的时候你使用CTRL+C杀死了工作者,那也不会丢失任何消息。在工作者死掉之后所有被确认的消息很快将被重新传递。
忘了确认 通常容易犯的错误就是丢掉了basic_ack。这是一个容易的错误,但是结果是严重的。当你的客户端退出的时候消息将被重新传递,但是RabbitMQ由于不能释放未被确认的消息,将吃掉越来越多的内存。 为了debug这种错误你可以使用rabbitmqctl打印messages_unacknowledged域: $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues... hello 0 0 ...done.
消息持久性
我们已经了解了如何确保甚至当消费者死掉的时候任务不被丢失。但是如果RabbitMQ服务器停止了,我们的任务任然将被丢失。
当RabbitMQ退出或者奔溃那将丢掉队列和消息,除非你告诉它不要这样做。有两件事情被要求确保消息不会被丢失:我们需要标记队列和消息为持久的。
首先,我们需要确保RabbitMQ将永远不会丢掉我们的队列。为了这样做,我们需要把它定义为持久的:
channel.queue_declare(queue=‘hello‘,durable=True)
尽管这个命令设置是正确的,但是它不会再我们的设置中生效。那是因为我们还没有定义一个叫做hello的队列。RabbitMQ不允许你用不同的参数重新定义已经存在的队列并且对于尝试这样做的程序会返回一个错误。但是有一种快捷的变通方案-我们用不同的名字定义队列,例如task_queue:
channel.queue_declare(queue=‘task_queue‘,durable=True)
这个queue_declare改变需要被应用到生产者和消费者代码中。
在那时我们确保了甚至如果RabbitMQ重新开始task_queue队列也不会被丢掉。现在我们需要标记我们的消息为持久化-通过应用一个使用值2的delivery_mode属性。
channel.basic_publish(exchange=‘‘,routing_key=‘task_queue‘,body=message,properties=pika.BasicProperties(delivery_mode=2,))
在消息持久化上的注意点 把消息标记为持久化也不能完全确保消息将不会被丢掉。尽管它告诉RabbitMQ把消息保存到磁盘上,当RabbitMQ已经接收了一个消息并且还没有保存的时候任然后一个短时间的空隙。还有RabbitMQ给没有消息不是fsync(2)--它也许被缓存保存并且还没有真正写到磁盘中。这种持久确保不是健壮的,但是对我们的简单任务队列也足够了。如果你需要一个更健壮的担保那么你可以使用publisher confirms。
公平的分发
你也许已经注意到分发任然不是我们完全想要的那种。例如在一种有两个工作者的情况下,当所有的偶数消息是巨大的并且单数消息是轻量的,一个工作者将
不断的忙碌并且另一个将努力的做一些工作。RabbitMQ不知道任何关于这个情况的事情并且任然会均匀的分发消息。
这种意外是因为当消息进入队列的时候RabbitMQ刚刚分发了一个消息。它不会考虑消费者的许多确认消息。它仅仅是给n-th消费者绑定分发n-th消息。
为了避免这种情况我们可以使用basic.qos方法设置prefetch_count=1.这回告诉RabbitMQ不要在同一时间给一个工作者给超过一个消息。或者,换句话说,不要给一个工作者分发一个新消息知道它已经处理并且确认了前面的一个完成了。代替,它将分发给接下来的不忙碌的工作者。
channel.basic_qos(prefetch_count=1)
关于队列大小的注意点 如果所有的工作者都是忙碌的,你的队列可能填满。你就想对这个保持监控,并且也许添加更多工作者或者做一些其它策略。
把它们放一起
new_task.py脚本最后的代码:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.queue_declare(queue=‘task_queue‘, durable=True) message = ‘ ‘.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange=‘‘, routing_key=‘task_queue‘, body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close()
我们的工作者:
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.queue_declare(queue=‘task_queue‘, durable=True) print ‘ [*] Waiting for messages. To exit press CTRL+C‘ def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count(‘.‘) ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=‘task_queue‘) channel.start_consuming()
RabbitMQ官网教程---工作队列