首页 > 代码库 > python第六十天-----RabbitMQ
python第六十天-----RabbitMQ
RabbitMQ消息队列:默认为消息轮循模式,按client端启动是顺序接收
server端
1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 ‘localhost‘)) 4 channel = connection.channel()#管道 5 6 #声明queue 7 #channel.queue_declare(queue=‘hello‘)#队列名 hello 8 channel.queue_declare(queue=‘hello‘,durable=True)#队列名 hello,持久化队列 9 10 for i in range(10): 11 12 channel.basic_publish(exchange=‘‘, 13 routing_key=‘hello‘, 14 body=‘Hello World!%s‘%i, 15 properties=pika.BasicProperties(delivery_mode=2)) 16 print(" [x] Sent ‘Hello World!‘",i) 17 connection.close()
client端
1 import pika,time 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 ‘localhost‘)) 5 channel = connection.channel() 6 #channel.queue_declare(queue=‘hello‘) 7 channel.queue_declare(queue=‘hello‘,durable=True)#队列名 hello,持久化队列 8 9 10 def callback(ch, method, properties, body):#回调函数 11 print(‘接收消息中…………‘) 12 time.sleep(1) 13 print(" [x] Received %r" % body) 14 ch.basic_ack(delivery_tag=method.delivery_tag) 15 16 17 channel.basic_qos(prefetch_count=1)#同时只处理一个消息 18 channel.basic_consume(callback,#接收到消息调用回调函数 callback 19 queue=‘hello‘, 20 #no_ack=True 21 ) 22 23 print(‘ [*] 接收消息中. To exit press CTRL+C‘) 24 channel.start_consuming()#启动消息接收
消息持久化
server端
1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 ‘localhost‘)) 4 channel = connection.channel()#管道 5 6 #声明queue 7 channel.queue_declare(queue=‘hello2‘,durable=True)#队列名 hello 8 #channel.queue_declare(queue=‘hello2‘,durable=True)#队列名 hello,持久化队列 9 10 11 channel.basic_publish(exchange=‘‘, 12 routing_key=‘hello2‘, 13 body=‘Hello World!%s---->‘) 14 print(" [x] Sent ‘Hello World!‘") 15 connection.close()
client端
1 import pika,time 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 ‘localhost‘)) 5 channel = connection.channel() 6 channel.queue_declare(queue=‘hello2‘)#服务端与客户端的设置需一致,不然会报错 7 #channel.queue_declare(queue=‘hello2‘,durable=True)#队列名 hello,持久化队列 8 9 10 def callback(ch, method, properties, body):#回调函数 11 print(‘接收消息中…………‘) 12 time.sleep(5) 13 print(" [x] Received %r" % body) 14 ch.basic_ack(delivery_tag=method.delivery_tag) 15 16 17 channel.basic_qos(prefetch_count=1)#同时只处理一个消息 18 channel.basic_consume(callback,#接收到消息调用回调函数 callback 19 queue=‘hello2‘, 20 #no_ack=True 21 ) 22 23 print(‘ [*] 接收消息中. To exit press CTRL+C‘) 24 channel.start_consuming()#启动消息接收
fanout广播模式:实时发送,发送时,如果client端没启动将无法收到消息
server端
1 import pika,sys,time 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 ‘localhost‘)) 4 channel = connection.channel()#管道 5 6 #声明queue 广播模式不用声明队列 7 #channel.queue_declare(queue=‘hello‘)#队列名 hello 8 #channel.queue_declare(queue=‘hello‘,durable=True)#队列名 hello,持久化队列 9 10 argv=input(‘输入消息‘) 11 msg=‘‘.join(sys.argv[1:]) or ‘info:消息默认发送………‘ 12 for i in range(10): 13 time.sleep(1) 14 channel.basic_publish(exchange=‘logs‘,#绑定频道 15 #routing_key=‘hello‘, 16 routing_key=‘‘, 17 body=msg+str(i), 18 #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用 19 ) 20 print(msg,i) 21 #connection.close()
client端
1 import pika,time 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 ‘localhost‘)) 5 channel = connection.channel() 6 #channel.queue_declare(queue=‘hello2‘)#服务端与客户端的设置需一致,不然会报错 7 #channel.queue_declare(queue=‘hello2‘,durable=True)#队列名 hello,持久化队列 8 channel.exchange_declare(exchange=‘logs‘,#绑定频道 9 type=‘fanout‘)#接收类型 10 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除 11 queuename=reult.method.queue#队列名 自动生成 12 channel.queue_bind(exchange=‘logs‘,#先要绑定频道 13 queue=queuename 14 ) 15 16 17 def callback(ch, method, properties, body):#回调函数 18 print(‘接收消息中…………‘) 19 #time.sleep(5) 20 print(" [x] Received %r" % body.decode()) 21 ch.basic_ack(delivery_tag=method.delivery_tag) 22 23 24 channel.basic_qos(prefetch_count=1)#同时只处理一个消息 25 channel.basic_consume(callback,#接收到消息调用回调函数 callback 26 queue=queuename, 27 #no_ack=True 28 ) 29 30 print(‘ [*] 接收消息中. To exit press CTRL+C‘) 31 32 channel.start_consuming()#启动消息接收
direct广播模式:分级别发送消——客户端可以按级别接收
server端
1 import pika,sys,time 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 ‘localhost‘)) 4 channel = connection.channel()#管道 5 6 7 8 severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘#启动参数 默认无参数为 info 级别 9 msg=‘‘.join(sys.argv[2:]) or ‘info:消息默认发送………‘#启动参数 为空,发默认消息 10 for i in range(10): 11 time.sleep(1) 12 channel.basic_publish(exchange=‘direct_logs‘,#绑定频道 13 routing_key=severity,#默认的消息队列级别 14 body=msg+str(i), 15 #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用 16 ) 17 print(msg,severity) 18 connection.close()
client端
1 import pika,time,sys 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 ‘localhost‘)) 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange=‘direct_logs‘,#定义一个接收的频道 8 type=‘direct‘) 9 10 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除 11 queuename=reult.method.queue#队列名 自动生成 12 13 14 severities = sys.argv[1:] 15 if not severities: 16 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])#启动接收的消息级别 17 sys.exit(1) 18 19 for severity in severities:#循环接收各级别的消息 20 channel.queue_bind(exchange=‘direct_logs‘, 21 queue=queuename, 22 routing_key=severity) 23 24 def callback(ch, method, properties, body):#回调函数 25 print(‘接收消息中…………‘) 26 #time.sleep(5) 27 print(" [x] Received %r" % body.decode()) 28 ch.basic_ack(delivery_tag=method.delivery_tag) 29 30 31 channel.basic_qos(prefetch_count=1)#同时只处理一个消息 32 channel.basic_consume(callback,#接收到消息调用回调函数 callback 33 queue=queuename, 34 #no_ack=True 35 ) 36 37 print(‘ [*] 接收消息中. To exit press CTRL+C‘) 38 39 channel.start_consuming()#启动消息接收
topic细致消息过滤
server端
1 import pika,sys,time 2 connection = pika.BlockingConnection(pika.ConnectionParameters( 3 ‘localhost‘)) 4 channel = connection.channel()#管道 5 6 7 8 #severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘#启动参数 默认无参数为 info 级别 9 routing_key= sys.argv[1] if len(sys.argv) > 1 else ‘anorrymous.info‘#启动参数 默认无参数为 info 级别 10 msg=‘‘.join(sys.argv[2:]) or ‘info:消息默认发送………‘#启动参数 为空,发默认消息 11 for i in range(10): 12 time.sleep(1) 13 channel.basic_publish(exchange=‘direct_logs‘,#绑定频道 14 #routing_key=severity,#默认的消息队列级别 15 routing_key=routing_key,#默认的消息队列级别 16 body=msg+str(i), 17 #properties=pika.BasicProperties(delivery_mode=2)#持久化 广播不能使用 18 ) 19 #print(msg,severity) 20 print(msg,routing_key) 21 connection.close()
client端
1 import pika,time,sys 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 ‘localhost‘)) 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange=‘direct_logs‘,#定义一个接收的频道 8 type=‘topic‘) 9 10 reult=channel.queue_declare(exclusive=True)#随机生成唯一的队列名,会在消息接收后自动删除 11 queuename=reult.method.queue#队列名 自动生成 12 13 14 #severities = sys.argv[1:] 15 binding_key = sys.argv[1:] 16 #if not severities: 17 if not binding_key: 18 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])#启动接收的消息级别 19 sys.exit(1) 20 21 #for severity in severities:#循环接收各级别的消息 22 for severity in binding_key:#循环接收各级别的消息 23 channel.queue_bind(exchange=‘direct_logs‘, 24 queue=queuename, 25 routing_key=severity) 26 27 def callback(ch, method, properties, body):#回调函数 28 print(‘接收消息中…………‘) 29 #time.sleep(5) 30 print(" [x] Received %r" % body.decode()) 31 ch.basic_ack(delivery_tag=method.delivery_tag) 32 33 34 channel.basic_qos(prefetch_count=1)#同时只处理一个消息 35 channel.basic_consume(callback,#接收到消息调用回调函数 callback 36 queue=queuename, 37 #no_ack=True 38 ) 39 40 print(‘ [*] 接收消息中. To exit press CTRL+C‘) 41 42 channel.start_consuming()#启动消息接收
RPC模式:结果返回
server端
1 import pika 2 import time 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 host=‘localhost‘))#生成消息对队 5 6 channel = connection.channel()#生成管道 7 8 channel.queue_declare(queue=‘rpc_queue‘)#消息收接的模式 9 10 def fib(n):#计算函数——非波那 11 if n == 0: 12 return 0 13 elif n == 1: 14 return 1 15 else: 16 return fib(n-1) + fib(n-2) 17 18 def on_request(ch, method, props, body):#回调函数 19 n = int(body) 20 21 print(" [.] fib(%s)" % n) 22 response = fib(n)#调用计算函数 23 24 ch.basic_publish(exchange=‘‘, 25 routing_key=props.reply_to,#收消息的队列 26 properties=pika.BasicProperties(correlation_id =props.correlation_id),#返回消息的队列 27 body=str(response))#返回结果数据 28 ch.basic_ack(delivery_tag = method.delivery_tag)##确保消息被 客户端接收 29 30 channel.basic_qos(prefetch_count=1)#同时只处理一个消息 31 channel.basic_consume(on_request, queue=‘rpc_queue‘)#接收消息,自动调用回调函数 32 33 print(" [x] Awaiting RPC requests") 34 channel.start_consuming()#开始接收
client端
1 import pika 2 import uuid 3 4 class FibonacciRpcClient(object): 5 def __init__(self): 6 self.connection = pika.BlockingConnection(pika.ConnectionParameters( 7 host=‘localhost‘))#生成连接的服务端 ip 8 9 self.channel = self.connection.channel()#创建一个管道 10 11 result = self.channel.queue_declare(exclusive=True)#随机生成一个队列,收消息后自动删除 12 self.callback_queue = result.method.queue#赋于管道 变量 13 14 self.channel.basic_consume(self.on_response,#回调函数 15 no_ack=True,#不用服务端确认本条消息 16 queue=self.callback_queue)#随机生的队列 17 18 def on_response(self, ch, method, props, body):#回调函数 19 if self.corr_id == props.correlation_id:#判断服务端返回的队列名是否与当前所生成的队列名一致 20 self.response = body# 将服务端的结果赋于返回来的结果变量 21 22 def call(self, n):#发送消息的函数 23 self.response = None#初始返回结果为空 24 self.corr_id = str(uuid.uuid4())#生成一个服务端返回消息的队列名 25 self.channel.basic_publish(exchange=‘‘, 26 routing_key=‘rpc_queue‘,#确认为rpc模式 实时发送 27 properties=pika.BasicProperties( 28 reply_to = self.callback_queue,#发送的管道队列名 29 correlation_id = self.corr_id,#发送给服务端,用于返回消息的队列名 30 ), 31 body=str(n))#发送的内容数据 32 while self.response is None: 33 self.connection.process_data_events()#非阻塞模式接收消息 34 print(‘没有返回消息……‘) 35 return int(self.response)#返回结果 36 37 fibonacci_rpc = FibonacciRpcClient()#生成一个实例 38 39 print(" [x] Requesting fib(10)") 40 response = fibonacci_rpc.call(10)#调用发送消息的函数 41 print(" [.] Got %r" % response)#打印结果
python第六十天-----RabbitMQ
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。