首页 > 代码库 > python学习-day11

python学习-day11

一、Rabbitmq

       RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。RabbitMQ使用的是AMQP协议,它是一种二进制协议。默认启动端口 5672。在 RabbitMQ 中,如下图结构:

技术分享

  • 左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。生产者需要完成的任务:
  • 1 创建RabbitMQ连接
    2 获取信道
    3 声明交换器
    4 创建消息
    5 发布消息
    6 关闭信道
    7 关闭RabbitMQ连接 
  • 中间即是 RabbitMQ,其中包括了 交换机 和 队列。
  • 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。消费者需要完成的任务:
    1 创建RabbitMQ连接
    2 获取信道
    3 声明交换器
    4 声明队列
    5 队列和交换器绑定
    6 消费信息
    7 关闭信道
    8 关闭RabbitMQ连接
  • Exchange: 接受客户端发送的消息,并根据Binding将消息路由给服务器中的队列,Exchange分为direct, fanout, topic三种。
  • Binding: 连接Exchange和Queue,包含路由规则。
  • Queue: 消息队列,存储还未被消费的消息。
  • Message: Header+Body
  • Channel: 通道,执行AMQP的命令;一个连接可创建多个通道以节省资源

1. dircted exchange

     路由键exchange,该交换机收到消息后会把消息发送到指定routing-key的queue中。rabbitmq内部默认有一个特殊的dircted exchange,该交换机的name是空字符串,所有queue都默认binding 到该交换机上。所有binding到该交换机上的queue,routing-key都和queue的name一样。

 生产者:

技术分享
 1 import pika
 2 credentials = pika.PlainCredentials(admin, 123456)
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4     192.168.170.134,5672,/,credentials))
 5 channel = connection.channel()
 6 
 7 # 声明queue
 8 channel.queue_declare(queue=hello)
 9 
10 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
11 channel.basic_publish(exchange=‘‘,
12                       routing_key=hello,
13                       body=Hello World!)
14 print(" [x] Sent ‘Hello World!‘")
15 connection.close()
View Code

消费者:

技术分享
 1 import pika
 2 credentials = pika.PlainCredentials(admin, 123456)
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4     192.168.170.134,5672,/,credentials))
 5 channel = connection.channel()
 6 def callback(ch, method, properties, body):
 7     print(" [x] Received %r" % body)
 8 
 9 
10 channel.basic_consume(callback,
11                       queue=hello,
12                       no_ack=True)
13 
14 print( [*] Waiting for messages. To exit press CTRL+C)
15 channel.start_consuming()
View Code

 队列绑定关键字,发送者将数据关键字发送到消息Exchange,Exchange根据关键字判定应该将数据发送至指定队列。

生产者:

技术分享
 1 import pika,sys
 2 
 3 credentials = pika.PlainCredentials(admin, 123456)
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     192.168.170.134,5672,/,credentials))
 6 channel = connection.channel()
 7 channel.exchange_declare(exchange=direct_logs,
 8                          type=direct)
 9 
10 severity = sys.argv[1] if len(sys.argv) > 1 else info
11 message =  .join(sys.argv[2:]) or Hello World!
12 channel.basic_publish(exchange=direct_logs,
13                       routing_key=severity,
14                       body=message)
15 print(" [x] Sent %r:%r" % (severity, message))
16 connection.close()
View Code

消费者:

技术分享
 1 import pika,sys
 2 
 3 credentials = pika.PlainCredentials(admin, 123456)
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     192.168.170.134,5672,/,credentials))
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange=direct_logs,
 9                          type=direct)
10 
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13 
14 severities = sys.argv[1:]
15 if not severities:
16     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
17     sys.exit(1)
18 
19 for severity in severities:
20     channel.queue_bind(exchange=direct_logs,
21                        queue=queue_name,
22                        routing_key=severity)
23 
24 print( [*] Waiting for logs. To exit press CTRL+C)
25 
26 
27 def callback(ch, method, properties, body):
28     print(" [x] %r:%r" % (method.routing_key, body))
29 
30 
31 channel.basic_consume(callback,
32                       queue=queue_name,
33                       no_ack=True)
34 
35 channel.start_consuming()
View Code

 运行结果:技术分享

2. fanout exchange

      发布/订阅exchange ,发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,二发布者发布消息时,会将消息放置在所有相关队列中。任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。

生产者:

技术分享
 1 import pika,sys
 2 
 3 credentials = pika.PlainCredentials(admin, 123456)
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     192.168.170.134,5672,/,credentials))
 6 channel = connection.channel()
 7 # 声明queue
 8 channel.exchange_declare(exchange=logs,
 9                          type=fanout)
10 
11 message =  .join(sys.argv[1:]) or "info: Hello World!"
12 channel.basic_publish(exchange=logs,
13                       routing_key=‘‘,
14                       body=message)
15 print(" [x] Sent %r" % message)
16 connection.close()
View Code

消费者:

技术分享
 1 import pika
 2 
 3 credentials = pika.PlainCredentials(admin, 123456)
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     192.168.170.134,5672,/,credentials))
 6 channel = connection.channel()
 7 
 8 result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange=logs,
12                    queue=queue_name)
13 
14 print( [*] Waiting for logs. To exit press CTRL+C)
15 
16 
17 def callback(ch, method, properties, body):
18     print(" [x] %r" % body)
19 
20 
21 channel.basic_consume(callback,
22                       queue=queue_name,
23                       no_ack=True)
24 
25 channel.start_consuming()
View Code

 3. topic exchange

      在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入"路由值"和"关键字"进行匹配,匹配成功,则将数据发送到指定队列。

  • # :表示可以匹配0个或多个单词;

  • * :表示只能匹配一个单词。

生产者:

技术分享
 1 import pika,sys
 2 
 3 credentials = pika.PlainCredentials(admin, 123456)
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     192.168.170.134,5672,/,credentials))
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange=topic_logs,
 9                          type=topic)
10 
11 routing_key = sys.argv[1] if len(sys.argv) > 1 else anonymous.info
12 message =  .join(sys.argv[2:]) or Hello World!
13 channel.basic_publish(exchange=topic_logs,
14                       routing_key=routing_key,
15                       body=message)
16 print(" [x] Sent %r:%r" % (routing_key, message))
17 connection.close()
View Code

消费者:

技术分享
 1 import pika,sys
 2 
 3 credentials = pika.PlainCredentials(admin, 123456)
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     192.168.170.134,5672,/,credentials))
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange=topic_logs,
 9                          type=topic)
10 
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13 
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
17     sys.exit(1)
18 
19 for binding_key in binding_keys:
20     channel.queue_bind(exchange=topic_logs,
21                        queue=queue_name,
22                        routing_key=binding_key)
23 
24 print( [*] Waiting for logs. To exit press CTRL+C)
25 
26 
27 def callback(ch, method, properties, body):
28     print(" [x] %r:%r" % (method.routing_key, body))
29 
30 
31 channel.basic_consume(callback,
32                       queue=queue_name,
33                       no_ack=True)
34 
35 channel.start_consuming()
View Code

 二、基于rabbitmq的RPC

      基于rabbitmq的rpc实现流程:

    (1)首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;

    (2)服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致

    (3)客户端从回调Queue中得到先前correlation_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。

  对于上面所提到的回调Queue中的消费处理使用的是BasicProperties类。

服务端:

技术分享
 1 import pika
 2 
 3 cre_publiser = pika.PlainCredentials(admin, 123456)
 4 conn_para = pika.ConnectionParameters(192.168.170.134,5672,/,cre_publiser)
 5 connection = pika.BlockingConnection(conn_para)
 6 
 7 # 建立会话
 8 channel = connection.channel()
 9 
10 # 声明RPC请求队列
11 channel.queue_declare(queue=rpc_queue)
12 
13 # 数据处理方法
14 def fib(n):
15     if n == 0:
16         return 0
17     elif n == 1:
18         return 1
19     else:
20         return fib(n-1) + fib(n-2)
21 
22 # 对RPC请求队列中的请求进行处理
23 def on_request(ch, method, props, body):
24     n = int(body)
25 
26     print(" [.] fib(%s)" % n)
27 
28     # 调用数据处理方法
29     response = fib(n)
30 
31     # 将处理结果(响应)发送到回调队列
32     ch.basic_publish(exchange=‘‘,
33                      routing_key=props.reply_to,
34                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
35                      body=str(response))
36     ch.basic_ack(delivery_tag = method.delivery_tag)
37 
38 # 负载均衡,同一时刻发送给该服务器的请求不超过一个
39 channel.basic_qos(prefetch_count=1)
40 
41 channel.basic_consume(on_request,
42                       queue=rpc_queue)
43 
44 print(" [x] Awaiting RPC requests")
45 channel.start_consuming()
View Code

客户端:

技术分享
 1 import pika
 2 import uuid
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.cre_publiser = pika.PlainCredentials(admin, 123456)
 6         self.conn_para = pika.ConnectionParameters(192.168.170.134,5672,/,self.cre_publiser)
 7         self.connection = pika.BlockingConnection(self.conn_para)
 8 
 9         self.channel = self.connection.channel()
10 
11         result = self.channel.queue_declare(exclusive=True)
12         self.callback_queue = result.method.queue
13 
14         self.channel.basic_consume(self.on_response,
15                                    no_ack=True,
16                                    queue=self.callback_queue)
17 
18     def on_response(self, ch, method, props, body):
19         if self.corr_id == props.correlation_id:
20             self.response = body
21 
22     def call(self, n):
23         self.response = None
24         self.corr_id = str(uuid.uuid4())
25         self.channel.basic_publish(exchange=‘‘,
26                                    routing_key=rpc_queue,
27                                    properties=pika.BasicProperties(
28                                    reply_to=self.callback_queue,
29                                    correlation_id=self.corr_id,
30                                    ),
31                                    body=str(n))
32         while self.response is None:
33             self.connection.process_data_events()
34         return int(self.response)
35 
36 
37 fibonacci_rpc = FibonacciRpcClient()
38 
39 print(" [x] Requesting fib(6)")
40 response = fibonacci_rpc.call(6)
41 print(" [.] Got %r" % response)
View Code

 

python学习-day11