首页 > 代码库 > Python之RabbitMQ

Python之RabbitMQ

RabbitMQ

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。


MQ特点

MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。


Send and Receive

技术分享

send:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Jerry Shi


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=localhost))
channel = connection.channel()

channel.queue_declare(queue=hello)

channel.basic_publish(exchange=‘‘,
                      routing_key=hello,
                      body=Hello World!)
print("已发送 Hello World!")
connection.close()

receive:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Jerry Shi

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=localhost))
channel = connection.channel()

channel.queue_declare(queue=hello)

print
 [*] Waiting for messages. To exit press CTRL+C


def callback(ch, method, properties, body):
    print(" 已收到 %r" %body)


channel.basic_consume(callback,
                      queue=hello,
                      no_ack=True)

channel.start_consuming()

消息分发轮询实例

send:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Jerry Shi

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    localhost))   ##相当建立一个socket
channel = connection.channel()   ##声明一个管道

channel.queue_declare(queue=hello)    #声明queue

channel.basic_publish(exchange=‘‘,
                      routing_key=hello,   #发给一个queue名字
                      body=Hello World!)   #发送的内容
print(" 已发送 ‘Hello World!‘")   #发送完成后打印消息出来
connection.close()

receive:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Jerry Shi

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    localhost))
channel = connection.channel()

channel.queue_declare(queue=hello)   #声明从哪个队列收消息

def callback(ch, method, properties, body):
    print("-->", ch, method, properties)
    print(" [x] Received %r" % body)

#开始消费消息
channel.basic_consume(callback,   #如果收到消息就调用callback函数来处理消息
                      queue=hello,
                      no_ack=True)

print( [*] Waiting for messages. To exit press CTRL+C)
channel.start_consuming()    #运行后一直等待收取消息

 RabbitMQ RPC

技术分享

rpc_client:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Jerry Shi

import pika,uuid,time

class RpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=localhost))   #建立远程连接

        self.channel = self.connection.channel()   #建立声明隧道

        result = self.channel.queue_declare(exclusive=True)   #生成一个随机queue
        self.callback_queue = result.method.queue   #随机queue

        self.channel.basic_consume(self.on_response,   #只要收到消息就调用op_response
                                   no_ack=True,
                                   queue=self.callback_queue)   #声明要收callback_queue

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:   #判断是否是上次我发送出去的数据
            self.response = body   #body是队列的返回

    def call(self, n):
        self.response = None   #开始设置response为none
        self.corr_id = str(uuid.uuid4())   #生成一个随机的字符串
        self.channel.basic_publish(exchange=‘‘,
                                   routing_key=rpc_queue,   #发消息到rpc_queue里
                                   properties=pika.BasicProperties(   #消息持久化pika.BasicProperties
                                       reply_to=self.callback_queue,   #生成完随机queue后发给我,我告诉服务端返回信息发送的这个queue
                                       correlation_id=self.corr_id,   #发送uuid字符串给服务端
                                   ),
                                   body=str(n))   #信息必须是字符串
        while self.response is None:   #respnse如果是空就执行本循环
            self.connection.process_data_events()   #非阻塞版的start_consuming(),有消息返回,没有消息也返回。
            print("")
            time.sleep(0.5)
        return int(self.response)


rpc = RpcClient()   #实例化

print(" 您要做的是:")
response = rpc.call(6)   #调用call方法,传参一个30
print(" 您的结果是: %r" % response)

rpc_server:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Jerry Shi

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=localhost))

channel = connection.channel()

channel.queue_declare(queue=rpc_queue)   #声明一个rpe_queue


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)   #收到信息

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange=‘‘,
                     routing_key=props.reply_to,   #客户单随机生成的那个随机queue
                     properties=pika.BasicProperties(correlation_id=                                                          props.correlation_id),   #收到客户端生成的uuid后,自己再生成一个uuid并返回给客户端
                     body=str(response))   #发送消息给客户端
    ch.basic_ack(delivery_tag=method.delivery_tag)   #确认消息被消费


channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue=rpc_queue)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

 

Python之RabbitMQ