首页 > 代码库 > Python 第五阶段 学习记录之----rabbmit

Python 第五阶段 学习记录之----rabbmit

消息服务器rabbmit

 

 

RabbitMQ 消息队列

     

python里有两个Q, threading queue、不同线程间数据交互

进程Queue: 不同进程间交互这个说法是错误的。 这个是用于父进程与子进程间交互、或者同属于同一父进程下多个子进程进行交互。

两个python程序的进程间是无法通信的。

各个独立进程间通信:

QQ 要发送消息给 world。1、通过socket,这个需要自己去写很多东西(沾包、收到的是什么需要返回的又是什么……) 。2、消息队列,

那两个不同的程序要通信、两台机器要通信?

 

消息队列: 可以跟各种进程通信、且不用再去写什么沾包啊之类的

 

zeromq\activemq 这些都是消息队列

很多事件需要自己去体验一下。

Rabbitmq:  erlang开发的

 

发消息:轮询

 

消息发出去了、需要确认吗? 还是发出去就完了,管你的。

大部份情况都需要回发一下处理完的消息。。万一处理过程中、挂了怎么办?发送端没收到回复是吧

挂了,socket断了,发送端自然知道、这消息立马就重新发给另一个在线的接收端。

no_ack=False# 需要收到了回应才放弃这条消息

回应保证的是:客户端、接收消息端挂掉   channel.basic_ack()

持久化: 保证的是,服务器挂掉之后

 

Rabbitmq安装:

需要安装好环境:erlang

brew install rabbitmq

 看例子:

import time
import pika
import uuid

class MyRabbmit(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            "localhost"
        ))
        self.channel = self.connection.channel()
        # self.channel.queue_declare(queue="rpc_queue")
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response,
                                   queue=self.callback_queue,
                                   # no_ack=True
                                   )

    def on_response(self, ch, method, property, body):
        print(property.reply_to)
        print(body)

        self.response = body.decode()

    def call(self, cmd):
        self.response = None
        self.uuid = str(uuid.uuid4())
        print(self.uuid)
        self.channel.basic_publish(exchange="",
                                   routing_key="rpc_queue",
                                   body = cmd,
                                   properties = pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.uuid
                                   )
                                   )
        while self.response is None:
            time.sleep(0.5)
            self.connection.process_data_events()
            # print("wait...")
        return self.response


my_rpc = MyRabbmit()
cmd = input(">>").strip()
if cmd:
    result = my_rpc.call(cmd)

    print("result: ", result)

 

import os
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    "localhost"
))

channel = connection.channel()
channel.queue_declare("rpc_queue")

def func(cmd):
    result = os.popen(cmd).read()
    print(result)
    return result

def on_response(ch, method, property, body):
    print(body)
    print(property.reply_to, property.correlation_id)
    body = func(body.decode())
    channel.basic_publish(exchange="",
                          routing_key=property.reply_to,
                          body=body
                          )
    ch.basic_ack(delivery_tag=method.delivery_tag) # 回复一下结果

channel.basic_consume(on_response,
                      queue="rpc_queue",
                      no_ack=False,
                      )

print("等待收消息...")
channel.start_consuming()

 

Python 第五阶段 学习记录之----rabbmit