首页 > 代码库 > python操作RabbitMQ
python操作RabbitMQ
RabbitMQ介绍
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue)的开源实现的产品,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接受者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message
- 内部架构:
说明
- Message (消息):RabbitMQ 转发的二进制对象,包括Headers(头)、Properties (属性)和 Data (数据),其中数据部分不是必要的。Producer(生产者): 消息的生产者,负责产生消息并把消息发到交换机
Exhange的应用。
- Consumer (消费者):使用队列 Queue 从 Exchange 中获取消息的应用。
- Exchange (交换机):负责接收生产者的消息并把它转到到合适的队列
Queue (队列):一个存储Exchange 发来的消息的缓冲,并将消息主动发送给Consumer,或者 Consumer 主动来获取消息。见 1.4 部分的描述。
Binding (绑定):队列 和 交换机 之间的关系。Exchange 根据消息的属性和 Binding 的属性来转发消息。绑定的一个重要属性是 binding_key。
Connection (连接)和 Channel (通道):生产者和消费者需要和 RabbitMQ 建立 TCP 连接。一些应用需要多个connection,为了节省TCP 连接,可以使用 Channel,它可以被认为是一种轻型的共享 TCP 连接的连接。连接需要用户认证,并且支持 TLS (SSL)。连接需要显式关闭。
Python操作RabbitMQ
1.实现简单消息队列
一个Product向queue发送一个message,一个Client从该queue接收message并打印
- 发消息 product
import pikacredentials = pika.PlainCredentials(‘alex‘,‘alex3714‘)# 凭证connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.152.134‘,port=5672,credentials=credentials))# 定义连接池channel = connection.channel()# 生成连接通道channel.queue_declare(queue=‘test‘)# 声明队列以向其发送消息channel.basic_publish(exchange=‘‘,routing_key=‘test‘,body=‘Hello World!‘)# 注意当未定义exchange时,routing_key需和queue的值保持一致print(‘send success msg to rabbitmq‘)connection.close()# 关闭连接
- 收消息,client
import pikacredentials = pika.PlainCredentials(‘alex‘,‘alex3714‘)# 凭证connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.152.134‘,port=5672,credentials=credentials))# 连接参数channel = connection.channel()# 生成连接通道channel.queue_declare(queue=‘test‘)# 声明队列。之所以消费者也需要声明队列,是为了防止生产者未声明队列,导致运行报错。def callback(ch, method, properties, body): """ 回调函数,处理从rabbitmq中取出的消息 :param ch: 通道 :param method: 方法 :param properties: 属性 :param body: 内容 :return: 接收到得信息 """ print("[x] Received %r" % body) # print(ch,method,properties,body) """ <pika.adapters.blocking_connection.BlockingChannel object at 0x0000000002F1DB70> <Basic.Deliver([‘consumer_tag=ctag1.3c1d688587c447e5ac3a72ea99e98cac‘, ‘delivery_tag=1‘, ‘exchange=‘, ‘redelivered=False‘, ‘routing_key=test‘])> <BasicProperties> b‘Hello World!‘ """channel.basic_consume(callback, queue=‘test‘, no_ack=True)# no_ack 表示不需要发送ack。默认是False,表示开启状态。print(‘[*] Waiting for messages. To exit press CTRL+C‘)channel.start_consuming()# 开始监听,接收消息
执行效果:
#product端:send success msg to rabbitmq#client端: [*] Waiting for messages. To exit press CTRL+C [x] Received b‘Hello World!‘
python操作RabbitMQ
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。