首页 > 代码库 > python第六十天-----RabbitMQ

python第六十天-----RabbitMQ

RabbitMQ消息队列:默认为消息轮循模式,按client端启动是顺序接收

server端

技术分享
 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(
 3                localhost))
 4 channel = connection.channel()#管道
 5 
 6 #声明queue
 7 #channel.queue_declare(queue=‘hello‘)#队列名 hello
 8 channel.queue_declare(queue=hello,durable=True)#队列名 hello,持久化队列
 9 
10 for i in range(10):
11 
12     channel.basic_publish(exchange=‘‘,
13                           routing_key=hello,
14                           body=Hello World!%s%i,
15                           properties=pika.BasicProperties(delivery_mode=2))
16     print(" [x] Sent ‘Hello World!‘",i)
17 connection.close()
View Code

client端

技术分享
 1 import pika,time
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4                localhost))
 5 channel = connection.channel()
 6 #channel.queue_declare(queue=‘hello‘)
 7 channel.queue_declare(queue=hello,durable=True)#队列名 hello,持久化队列
 8 
 9 
10 def callback(ch, method, properties, body):#回调函数
11     print(接收消息中…………)
12     time.sleep(1)
13     print(" [x] Received %r" % body)
14     ch.basic_ack(delivery_tag=method.delivery_tag)
15 
16 
17 channel.basic_qos(prefetch_count=1)#同时只处理一个消息
18 channel.basic_consume(callback,#接收到消息调用回调函数 callback
19                       queue=hello,
20                       #no_ack=True
21                        )
22 
23 print( [*] 接收消息中. To exit press CTRL+C)
24 channel.start_consuming()#启动消息接收
View Code

消息持久化

server端

技术分享
 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(
 3                localhost))
 4 channel = connection.channel()#管道
 5 
 6 #声明queue
 7 channel.queue_declare(queue=hello2,durable=True)#队列名 hello
 8 #channel.queue_declare(queue=‘hello2‘,durable=True)#队列名 hello,持久化队列
 9 
10 
11 channel.basic_publish(exchange=‘‘,
12                         routing_key=hello2,
13                         body=Hello World!%s---->)
14 print(" [x] Sent ‘Hello World!‘")
15 connection.close()
View Code

client端

技术分享
 1 import pika,time
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4                localhost))
 5 channel = connection.channel()
 6 channel.queue_declare(queue=hello2)#服务端与客户端的设置需一致,不然会报错
 7 #channel.queue_declare(queue=‘hello2‘,durable=True)#队列名 hello,持久化队列
 8 
 9 
10 def callback(ch, method, properties, body):#回调函数
11     print(接收消息中…………)
12     time.sleep(5)
13     print(" [x] Received %r" % body)
14     ch.basic_ack(delivery_tag=method.delivery_tag)
15 
16 
17 channel.basic_qos(prefetch_count=1)#同时只处理一个消息
18 channel.basic_consume(callback,#接收到消息调用回调函数 callback
19                       queue=hello2,
20                       #no_ack=True
21                        )
22 
23 print( [*] 接收消息中. To exit press CTRL+C)
24 channel.start_consuming()#启动消息接收
View Code

fanout广播模式:实时发送,发送时,如果client端没启动将无法收到消息

server端

技术分享
 1 import pika,sys,time
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(
 3                localhost))
 4 channel = connection.channel()#管道
 5 
 6 #声明queue 广播模式不用声明队列
 7 #channel.queue_declare(queue=‘hello‘)#队列名 hello
 8 #channel.queue_declare(queue=‘hello‘,durable=True)#队列名 hello,持久化队列
 9 
10 argv=input(输入消息)
11 msg=‘‘.join(sys.argv[1:]) or  info:消息默认发送………
12 for i in range(10):
13     time.sleep(1)
14     channel.basic_publish(exchange=logs,#绑定频道
15                           #routing_key=‘hello‘,
16                           routing_key=‘‘,
17                           body=msg+str(i),
18                           #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用
19                            )
20     print(msg,i)
21 #connection.close()
View Code

client端

技术分享
 1 import pika,time
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4                localhost))
 5 channel = connection.channel()
 6 #channel.queue_declare(queue=‘hello2‘)#服务端与客户端的设置需一致,不然会报错
 7 #channel.queue_declare(queue=‘hello2‘,durable=True)#队列名 hello,持久化队列
 8 channel.exchange_declare(exchange=logs,#绑定频道
 9                          type=fanout)#接收类型
10 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除
11 queuename=reult.method.queue#队列名 自动生成
12 channel.queue_bind(exchange=logs,#先要绑定频道
13                    queue=queuename
14                    )
15 
16 
17 def callback(ch, method, properties, body):#回调函数
18     print(接收消息中…………)
19     #time.sleep(5)
20     print(" [x] Received %r" % body.decode())
21     ch.basic_ack(delivery_tag=method.delivery_tag)
22 
23 
24 channel.basic_qos(prefetch_count=1)#同时只处理一个消息
25 channel.basic_consume(callback,#接收到消息调用回调函数 callback
26                       queue=queuename,
27                       #no_ack=True
28                        )
29 
30 print( [*] 接收消息中. To exit press CTRL+C)
31 
32 channel.start_consuming()#启动消息接收
View Code

 

direct广播模式:分级别发送消——客户端可以按级别接收

server端

技术分享
 1 import pika,sys,time
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(
 3                localhost))
 4 channel = connection.channel()#管道
 5 
 6 
 7 
 8 severity = sys.argv[1] if len(sys.argv) > 1 else info#启动参数 默认无参数为 info 级别
 9 msg=‘‘.join(sys.argv[2:]) or  info:消息默认发送………#启动参数 为空,发默认消息
10 for i in range(10):
11     time.sleep(1)
12     channel.basic_publish(exchange=direct_logs,#绑定频道
13                           routing_key=severity,#默认的消息队列级别
14                           body=msg+str(i),
15                           #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用
16                            )
17     print(msg,severity)
18 connection.close()
View Code

client端

技术分享
 1 import pika,time,sys
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4                localhost))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange=direct_logs,#定义一个接收的频道
 8                          type=direct)
 9 
10 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除
11 queuename=reult.method.queue#队列名 自动生成
12 
13 
14 severities = sys.argv[1:]
15 if not severities:
16     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])#启动接收的消息级别
17     sys.exit(1)
18 
19 for severity in severities:#循环接收各级别的消息
20     channel.queue_bind(exchange=direct_logs,
21                        queue=queuename,
22                        routing_key=severity)
23 
24 def callback(ch, method, properties, body):#回调函数
25     print(接收消息中…………)
26     #time.sleep(5)
27     print(" [x] Received %r" % body.decode())
28     ch.basic_ack(delivery_tag=method.delivery_tag)
29 
30 
31 channel.basic_qos(prefetch_count=1)#同时只处理一个消息
32 channel.basic_consume(callback,#接收到消息调用回调函数 callback
33                       queue=queuename,
34                       #no_ack=True
35                        )
36 
37 print( [*] 接收消息中. To exit press CTRL+C)
38 
39 channel.start_consuming()#启动消息接收
View Code

 

topic细致消息过滤

server端

技术分享
 1 import pika,sys,time
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(
 3                localhost))
 4 channel = connection.channel()#管道
 5 
 6 
 7 
 8 #severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘#启动参数 默认无参数为 info 级别
 9 routing_key= sys.argv[1] if len(sys.argv) > 1 else anorrymous.info#启动参数 默认无参数为 info 级别
10 msg=‘‘.join(sys.argv[2:]) or  info:消息默认发送………#启动参数 为空,发默认消息
11 for i in range(10):
12     time.sleep(1)
13     channel.basic_publish(exchange=direct_logs,#绑定频道
14                           #routing_key=severity,#默认的消息队列级别
15                           routing_key=routing_key,#默认的消息队列级别
16                           body=msg+str(i),
17                           #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用
18                            )
19     #print(msg,severity)
20     print(msg,routing_key)
21 connection.close()
View Code

client端

技术分享
 1 import pika,time,sys
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4                localhost))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange=direct_logs,#定义一个接收的频道
 8                          type=topic)
 9 
10 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除
11 queuename=reult.method.queue#队列名 自动生成
12 
13 
14 #severities = sys.argv[1:]
15 binding_key = sys.argv[1:]
16 #if not severities:
17 if not binding_key:
18     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])#启动接收的消息级别
19     sys.exit(1)
20 
21 #for severity in severities:#循环接收各级别的消息
22 for severity in binding_key:#循环接收各级别的消息
23     channel.queue_bind(exchange=direct_logs,
24                        queue=queuename,
25                        routing_key=severity)
26 
27 def callback(ch, method, properties, body):#回调函数
28     print(接收消息中…………)
29     #time.sleep(5)
30     print(" [x] Received %r" % body.decode())
31     ch.basic_ack(delivery_tag=method.delivery_tag)
32 
33 
34 channel.basic_qos(prefetch_count=1)#同时只处理一个消息
35 channel.basic_consume(callback,#接收到消息调用回调函数 callback
36                       queue=queuename,
37                       #no_ack=True
38                        )
39 
40 print( [*] 接收消息中. To exit press CTRL+C)
41 
42 channel.start_consuming()#启动消息接收
View Code

 

 

RPC模式:结果返回

server端

技术分享
 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4         host=localhost))#生成消息对队
 5 
 6 channel = connection.channel()#生成管道
 7 
 8 channel.queue_declare(queue=rpc_queue)#消息收接的模式
 9 
10 def fib(n):#计算函数——非波那
11     if n == 0:
12         return 0
13     elif n == 1:
14         return 1
15     else:
16         return fib(n-1) + fib(n-2)
17 
18 def on_request(ch, method, props, body):#回调函数
19     n = int(body)
20 
21     print(" [.] fib(%s)" % n)
22     response = fib(n)#调用计算函数
23 
24     ch.basic_publish(exchange=‘‘,
25                      routing_key=props.reply_to,#收消息的队列
26                      properties=pika.BasicProperties(correlation_id =props.correlation_id),#返回消息的队列
27                      body=str(response))#返回结果数据
28     ch.basic_ack(delivery_tag = method.delivery_tag)##确保消息被 客户端接收
29 
30 channel.basic_qos(prefetch_count=1)#同时只处理一个消息
31 channel.basic_consume(on_request, queue=rpc_queue)#接收消息,自动调用回调函数
32 
33 print(" [x] Awaiting RPC requests")
34 channel.start_consuming()#开始接收
View Code

client端

技术分享
 1 import pika
 2 import uuid
 3 
 4 class FibonacciRpcClient(object):
 5     def __init__(self):
 6         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
 7                 host=localhost))#生成连接的服务端 ip
 8 
 9         self.channel = self.connection.channel()#创建一个管道
10 
11         result = self.channel.queue_declare(exclusive=True)#随机生成一个队列,收消息后自动删除
12         self.callback_queue = result.method.queue#赋于管道 变量
13 
14         self.channel.basic_consume(self.on_response,#回调函数
15                                    no_ack=True,#不用服务端确认本条消息
16                                    queue=self.callback_queue)#随机生的队列
17 
18     def on_response(self, ch, method, props, body):#回调函数
19         if self.corr_id == props.correlation_id:#判断服务端返回的队列名是否与当前所生成的队列名一致
20             self.response = body#  将服务端的结果赋于返回来的结果变量
21 
22     def call(self, n):#发送消息的函数
23         self.response = None#初始返回结果为空
24         self.corr_id = str(uuid.uuid4())#生成一个服务端返回消息的队列名
25         self.channel.basic_publish(exchange=‘‘,
26                                    routing_key=rpc_queue,#确认为rpc模式 实时发送
27                                    properties=pika.BasicProperties(
28                                          reply_to = self.callback_queue,#发送的管道队列名
29                                          correlation_id = self.corr_id,#发送给服务端,用于返回消息的队列名
30                                          ),
31                                    body=str(n))#发送的内容数据
32         while self.response is None:
33             self.connection.process_data_events()#非阻塞模式接收消息
34             print(没有返回消息……)
35         return int(self.response)#返回结果
36 
37 fibonacci_rpc = FibonacciRpcClient()#生成一个实例
38 
39 print(" [x] Requesting fib(10)")
40 response = fibonacci_rpc.call(10)#调用发送消息的函数
41 print(" [.] Got %r" % response)#打印结果
View Code

 

python第六十天-----RabbitMQ