首页 > 代码库 > python操作RabbitMQ

python操作RabbitMQ

RabbitMQ介绍

RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue)的开源实现的产品,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接受者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message

  • 内部架构:

     技术分享

  • 说明

    • Message (消息):RabbitMQ 转发的二进制对象,包括Headers(头)、Properties (属性)和 Data (数据),其中数据部分不是必要的。Producer(生产者): 消息的生产者,负责产生消息并把消息发到交换机
    • Exhange的应用。

      • Consumer (消费者):使用队列 Queue 从 Exchange 中获取消息的应用。
      • Exchange (交换机):负责接收生产者的消息并把它转到到合适的队列
      • Queue (队列):一个存储Exchange 发来的消息的缓冲,并将消息主动发送给Consumer,或者 Consumer 主动来获取消息。见 1.4 部分的描述。

      • Binding (绑定):队列 和 交换机 之间的关系。Exchange 根据消息的属性和 Binding 的属性来转发消息。绑定的一个重要属性是 binding_key。

      • Connection (连接)和 Channel (通道):生产者和消费者需要和 RabbitMQ 建立 TCP 连接。一些应用需要多个connection,为了节省TCP 连接,可以使用 Channel,它可以被认为是一种轻型的共享 TCP 连接的连接。连接需要用户认证,并且支持 TLS (SSL)。连接需要显式关闭。

 

Python操作RabbitMQ

1.实现简单消息队列

一个Product向queue发送一个message,一个Client从该queue接收message并打印

技术分享

  • 发消息 product 
    import pikacredentials = pika.PlainCredentials(alex,alex3714)# 凭证connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.152.134,port=5672,credentials=credentials))# 定义连接池channel = connection.channel()# 生成连接通道channel.queue_declare(queue=test)# 声明队列以向其发送消息channel.basic_publish(exchange=‘‘,routing_key=test,body=Hello World!)# 注意当未定义exchange时,routing_key需和queue的值保持一致print(send success msg to rabbitmq)connection.close()# 关闭连接
  • 收消息,client 
    import pikacredentials = pika.PlainCredentials(alex,alex3714)# 凭证connection = pika.BlockingConnection(pika.ConnectionParameters(host=192.168.152.134,port=5672,credentials=credentials))# 连接参数channel = connection.channel()# 生成连接通道channel.queue_declare(queue=test)# 声明队列。之所以消费者也需要声明队列,是为了防止生产者未声明队列,导致运行报错。def callback(ch, method, properties, body):    """    回调函数,处理从rabbitmq中取出的消息    :param ch: 通道    :param method: 方法    :param properties: 属性    :param body: 内容    :return: 接收到得信息    """    print("[x] Received %r" % body)    # print(ch,method,properties,body)    """    <pika.adapters.blocking_connection.BlockingChannel object at 0x0000000002F1DB70>    <Basic.Deliver([consumer_tag=ctag1.3c1d688587c447e5ac3a72ea99e98cac, delivery_tag=1, exchange=, redelivered=False, routing_key=test])>     <BasicProperties> bHello World!    """channel.basic_consume(callback, queue=test, no_ack=True)# no_ack 表示不需要发送ack。默认是False,表示开启状态。print([*] Waiting for messages. To exit press CTRL+C)channel.start_consuming()# 开始监听,接收消息

     

执行效果:

#product端:send success msg to rabbitmq#client端: [*] Waiting for messages. To exit press CTRL+C [x] Received bHello World!

 

python操作RabbitMQ