首页 > 代码库 > rabbitmq 学习记录 -- ACK和数据持久化
rabbitmq 学习记录 -- ACK和数据持久化
派猴子来的救兵
为了数据不丢失, 需要在两个层面上做一些配置. 一个是ACK, 一个是数据持久化.
ACK
如果没有启用的话, 消费者拿走消息的时候, queue就把它删除了.
消费者拿走一条消息之后, 还没有处理完就crash了. 那么这条消息就丢失了. 为了保证消息一定被处理完了才从queue中被删掉, 就要启用Message acknowledgment .
启用之后, queue会在收到ack之后把消息删掉.
在这里没有timeout的概念, 哪怕这个任务执行很久, 不管多久, 会一直等ack. 或者是tcp链接断了, 才会把消息再给另外一个消费者.
ack默认是开启的, 也可以显示显示地关闭
channel.basic_consume(callback, queue=queue_name, no_ack=True)
callbak里面要记得发送ack,否则消息要被一次又一次的处理,然后再次回到队列 ... ...
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( 10 ) raise SystemExit(1) # message will put back to the original queue ch.basic_ack(delivery_tag = method.delivery_tag) print " [x] Done"
来跑几个例子测试一下
生产者:
#!/usr/bin/env python# -*- coding: utf-8 -*-import pikaimport sysdef main(): body = ‘ ‘.join(sys.argv[1:]) or ‘Hello World‘ connection = pika.BlockingConnection( pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.queue_declare(queue=‘hello‘) channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, body=body, ) connection.close()if __name__ == ‘__main__‘: main()
消费者:
#!/usr/bin/env python# -*- coding: utf-8 -*-import pikaimport timedef callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(10) raise SystemExit(1) ch.basic_ack(delivery_tag = method.delivery_tag) print " [x] Done"def main(): connection = pika.BlockingConnection( pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.queue_declare(queue=‘hello‘) channel.basic_consume(callback, queue=‘hello‘, ) channel.start_consuming()if __name__ == ‘__main__‘: main()
发送一条消息到队列 , 然后消费. 观察一下状态
# rabbitmqctl list_queues name messages_ready messages_unacknowledgedListing queues ...hello 0 1
等10秒, 再看, 消息没有被消费成功, 再次回到队列中.
# rabbitmqctl list_queues name messages_ready messages_unacknowledgedListing queues ...hello 1 0
REJECT
可以用ACK告诉rabbitmq任务处理完了, 但是如果没有成功的话, 也可以再把消息塞回队列. 就是Negative Acknowledgement. pika对应的方法是basic_reject
但可得注意, 不要搞成死循环了
数据持久化
启用ack之后, 消费者死掉不会丢失数据, 但rabbitmq进程死掉的话, 消息就丢掉了. 为保证数据不丢失, 还需要启动数据持久化. 需要在两个层面上做持久化: 1. 队列的持久化 2. 消息的持久化
channel.queue_declare(queue=‘hello‘, durable=True)
这样就申明了一个持久化的队列, durable的属性是不会变的, 如果之前hello队列已经申明过且不是持久化的, 这个再次申明会失败. 这个队列不会因为rabbitmq重启而丢失, 接下来还要继续做消息的持久化.
channel.basic_publish(exchange=‘‘, routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
Q: 如果在一个非持久化的队列上发送数据时, 指明要持久化, 为发生什么情况?
A: 可以正常发送, 但重启rabbitmq之后, 队列丢失, 当然消息找不到了.
rabbitmq 学习记录 -- ACK和数据持久化