首页 > 代码库 > rabbitmq 学习记录 -- ACK和数据持久化

rabbitmq 学习记录 -- ACK和数据持久化

派猴子来的救兵

 

为了数据不丢失, 需要在两个层面上做一些配置. 一个是ACK, 一个是数据持久化.

ACK

如果没有启用的话, 消费者拿走消息的时候, queue就把它删除了.

消费者拿走一条消息之后, 还没有处理完就crash了. 那么这条消息就丢失了. 为了保证消息一定被处理完了才从queue中被删掉, 就要启用Message acknowledgment .

启用之后, queue会在收到ack之后把消息删掉.

在这里没有timeout的概念, 哪怕这个任务执行很久, 不管多久, 会一直等ack. 或者是tcp链接断了, 才会把消息再给另外一个消费者.

ack默认是开启的, 也可以显示显示地关闭

channel.basic_consume(callback, queue=queue_name, no_ack=True) 

callbak里面要记得发送ack,否则消息要被一次又一次的处理,然后再次回到队列 ... ...

def callback(ch, method, properties, body):    print " [x] Received %r" % (body,)    time.sleep( 10 )    raise SystemExit(1) # message will put back to the original queue    ch.basic_ack(delivery_tag = method.delivery_tag)    print " [x] Done"

来跑几个例子测试一下

生产者:

#!/usr/bin/env python# -*- coding: utf-8 -*-import pikaimport sysdef main():    body = ‘ ‘.join(sys.argv[1:]) or ‘Hello World‘    connection = pika.BlockingConnection(        pika.ConnectionParameters(            host=‘localhost‘))    channel = connection.channel()    channel.queue_declare(queue=‘hello‘)    channel.basic_publish(exchange=‘‘,                          routing_key=‘hello‘,                          body=body,                          )    connection.close()if __name__ == ‘__main__‘:    main()

消费者:

#!/usr/bin/env python# -*- coding: utf-8 -*-import pikaimport timedef callback(ch, method, properties, body):    print " [x] Received %r" % (body,)    time.sleep(10)    raise SystemExit(1)    ch.basic_ack(delivery_tag = method.delivery_tag)    print " [x] Done"def main():    connection = pika.BlockingConnection(        pika.ConnectionParameters(            host=‘localhost‘))    channel = connection.channel()    channel.queue_declare(queue=‘hello‘)    channel.basic_consume(callback,                          queue=‘hello‘,                          )    channel.start_consuming()if __name__ == ‘__main__‘:    main()

发送一条消息到队列 , 然后消费. 观察一下状态

# rabbitmqctl list_queues name messages_ready messages_unacknowledgedListing queues ...hello   0   1

等10秒, 再看, 消息没有被消费成功, 再次回到队列中.

# rabbitmqctl list_queues name messages_ready messages_unacknowledgedListing queues ...hello   1   0

REJECT

可以用ACK告诉rabbitmq任务处理完了, 但是如果没有成功的话, 也可以再把消息塞回队列. 就是Negative Acknowledgement. pika对应的方法是basic_reject

但可得注意, 不要搞成死循环了

数据持久化

启用ack之后, 消费者死掉不会丢失数据, 但rabbitmq进程死掉的话, 消息就丢掉了. 为保证数据不丢失, 还需要启动数据持久化. 需要在两个层面上做持久化: 1. 队列的持久化 2. 消息的持久化

channel.queue_declare(queue=‘hello‘, durable=True)

这样就申明了一个持久化的队列, durable的属性是不会变的, 如果之前hello队列已经申明过且不是持久化的, 这个再次申明会失败. 这个队列不会因为rabbitmq重启而丢失, 接下来还要继续做消息的持久化.

channel.basic_publish(exchange=‘‘,                     routing_key="task_queue",                     body=message,                     properties=pika.BasicProperties(                        delivery_mode = 2, # make message persistent                     ))

Q: 如果在一个非持久化的队列上发送数据时, 指明要持久化, 为发生什么情况?

A: 可以正常发送, 但重启rabbitmq之后, 队列丢失, 当然消息找不到了.

rabbitmq 学习记录 -- ACK和数据持久化