首页 > 代码库 > python学习道路(day11note)(协程,同步与异步的性能区别,url爬网页,select,RabbitMq)
python学习道路(day11note)(协程,同步与异步的性能区别,url爬网页,select,RabbitMq)
1.协程
1 #协程 又称微线程 是一种用户的轻量级线程 程序级别代码控制 就不用加机器 2 #不同函数 = 不同任务 A函数切到B函数没有进行cpu级别的切换,而是程序级别的切换就是协程 yelied 3 4 #单线程下多个任务流用协程,比如打电话可以切换,nginx 5 #爽妹给你打电话的时候,她不说话,刘征电话过来时候你可以切过去,这时候要是爽妹说话,就会bibi响 6 ‘‘‘ 7 8 协程的好处: 9 无需线程上下文切换的开销 10 无需原子操作锁定及同步的开销 11 "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 12 context 13 switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。 14 方便切换控制流,简化编程模型 15 高并发 + 高扩展性 + 低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。 16 17 缺点: 18 无法利用多核资源:协程的本质是个单线程, 它不能同时将 19 单个CPU 20 的多个核用上, 协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。 21 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
2.Greenlet and Gevent
1 #greenlet 模块 2 #greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator 3 from greenlet import greenlet 4 5 from greenlet import greenlet 6 def test1(): 7 print(12) 8 #time.sleep(1) #但是遇到IO就会阻塞了,这里延迟了一秒,如果自动切换的话,应该立马执行gr2 9 gr2.switch() 10 print(34) 11 gr2.switch() 12 13 def test2(): 14 print(56) 15 gr1.switch() 16 print(78) 17 18 gr1 = greenlet(test1) #生成协程 19 gr2 = greenlet(test2) #生成协程 20 gr1.switch() #启动协程 21 #但是遇到IO会不会自动切换呢?上面是手动切换的 引出 Gevent
1 #Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 2 #(接着上面一条)它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。 3 import gevent 4 5 def func1(): 6 print(‘\033[31;1m李闯在跟海涛搞...\033[0m‘) #1 1 7 gevent.sleep(2) #相当于io time.sleep 卡住,看看会不会自动切换,还是等待? 8 print(‘\033[31;1m李闯又回去跟继续跟海涛搞...\033[0m‘) #4 6 9 10 def func2(): 11 print(‘\033[32;1m李闯切换到了跟海龙搞...\033[0m‘) #2 2 12 gevent.sleep(1) #这里自动切换的时候上面还在阻塞呢,所以又回来了,所以按 1234标识了走向 13 print(‘\033[32;1m李闯搞完了海涛,回来继续跟海龙搞...\033[0m‘) #3 4 14 15 def func3(): 16 print(‘3333‘) #0 3 17 gevent.sleep(1) 18 print(‘4444‘) #0 5 19 20 gevent.joinall([ #joinall等待所有协程结束 这是一个列表 21 gevent.spawn(func1), #产生协程 22 gevent.spawn(func2), 23 gevent.spawn(func3), 24 ])
3.同步与异步的性能区别
1 #同步与异步的性能区别 2 import gevent 3 4 def task(pid): 5 """ 6 Some non-deterministic task 7 """ 8 gevent.sleep(0.5) 9 print(‘Task %s done‘ % pid) 10 11 def synchronous(): #同步就是串行的效果 12 for i in range(1, 10): 13 task(i) 14 15 def asynchronous(): #异步就是并发的效果 16 threads = [gevent.spawn(task, i) for i in range(10)] 17 gevent.joinall(threads) 18 19 print(‘Synchronous:‘) #同步 20 synchronous() 21 print(‘Asynchronous:‘) #异步 22 asynchronous()
4.爬网页
1 #url爬网页 2 import gevent 3 from urllib.request import urlopen #现在还是阻塞的模式,因为urllib遇到Io不知道这是Io操作,所以需要导入一个gevevt插件, 4 #相当于打个补丁,就会把urllib 里面涉及IO操作的都改成异步的模式,不阻塞的模式 5 from gevent import monkey #补丁 6 monkey.patch_all() #补丁 注意顺序 7 import time 8 9 def pa_web_page(url): 10 print(‘get url‘,url) 11 req = urlopen(url) #抓取url 12 data = http://www.mamicode.com/req.read() #读取结果 13 print(data) 14 print(‘%d bytes received from %s.‘ % (len(data), url)) 15 16 t1_start = time.time() #开始时间 17 pa_web_page("https://www.baidu.com") 18 pa_web_page("http://www.xiaohuar.com") 19 print(‘time close t1‘,time.time()-t1_start) #做减法 20 21 22 #下面是协程gevent写法,遇到阻塞就会自动切换,节省了时间 23 t2_start = time.time() #开始时间 24 gevent.joinall([ 25 gevent.spawn(pa_web_page, ‘https://www.baidu.com‘), #pa_web_page,函数名 https://www.baidu.com url 26 gevent.spawn(pa_web_page, ‘http://www.xiaohuar.com‘), 27 ]) 28 print(‘time close t2‘,time.time()-t2_start) #做减法
5.通过gevent实现单线程下的多socket并发
server code
1 import sys 2 import socket 3 import time 4 import gevent 5 6 from gevent import socket, monkey 7 monkey.patch_all() 8 9 def server(port): 10 s = socket.socket() 11 s.bind((‘0.0.0.0‘, port)) 12 s.listen(500) 13 while True: 14 cli, addr = s.accept() 15 gevent.spawn(handle_request, cli) #之前写线程sockserver的时候是起一个线程,这里是起协程 16 # handle_request自己写的方法 所有请求到这个函数区处理 17 18 def handle_request(conn): 19 try: 20 while True: 21 data = http://www.mamicode.com/conn.recv(1024) 22 print("recv:", data) 23 conn.send(data) 24 if not data: 25 conn.shutdown(socket.SHUT_WR) #相当于断开连接,清空了 26 27 except Exception as ex: 28 print(ex) 29 finally: 30 conn.close() 31 32 if __name__ == ‘__main__‘: 33 server(8001)
client code
1 #并发100个链接 如果连接报错,就说明开不起线程了,确实支持大并发了 2 import socket 3 import threading 4 5 def sock_conn(): 6 client = socket.socket() 7 client.connect(("localhost",8001)) 8 count = 0 9 while True: 10 #msg = input(">>:").strip() 11 #if len(msg) == 0:continue 12 client.send( ("hello %s" %count).encode("utf-8")) 13 14 data = http://www.mamicode.com/client.recv(1024) 15 16 print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果 17 count +=1 18 client.close() 19 20 for i in range(100): 21 t = threading.Thread(target=sock_conn) 22 t.start()
6.Select\Poll\Epoll IO多路复用
select
1 import socket 2 import select 3 import queue 4 server = socket.socket() 5 server.bind(("localhost",8001)) 6 server.listen(5) 7 server.setblocking(0) #设置为非堵塞 8 inputs = [server] #select 维护的列表,也是是传过来的链接 首先是监听自己 9 msg_queues = {} #字典,为了收取数据作用,理论上应该有2个,一个是收,一个是取 10 outputs = [] 11 12 while True: 13 r_list,w_list,exception_list = select.select(inputs,outputs,inputs) #inputs检测所有socket有没有消息古来 outputs不知道 inputs检测哪些socket有没有错(错误) 14 #针对 inputs 来返回哪些就绪的列表,所以r_list里面的就已经是就绪的 相当于链接 15 # print("r_list",r_list) 16 # print("w_list",w_list) 17 # print("e_list",exception_list) 18 for s in r_list: #数据流 19 if s is server: #这是一个新链接 20 conn,addr = s.accept() #接收请求 同时可以监听多个请求了 21 print("got a new conn",conn,addr) 22 inputs.append(conn) #让select去监测客户端是否有数据过来 23 msg_queues[conn] = queue.Queue() #为了给客户端返回数据,先创建的数据字典 24 else: 25 try: 26 data = http://www.mamicode.com/s.recv(1024) 27 print("recv data from [%s]:[%s]" % (s.getpeername(),data.decode())) 28 msg_queues[s].put(data) 29 if s not in outputs: 30 outputs.append(s) #等下次select的时候,确保w_list的数据能返回给客户端 31 except ConnectionResetError as e: 32 print("conn closed.",s.getpeername(),e) 33 34 inputs.remove(s) #链接出问题,或意外终止 35 if s in outputs: 36 outputs.remove(s) 37 del msg_queues[s] 38 39 for s in w_list: #给客户端返回追备好的数据 40 try: 41 data =http://www.mamicode.com/ msg_queues[s].get_nowait() 42 s.send(data.upper()) 43 except queue.Empty as e: 44 outputs.remove(s)
selectors select的升级版,自动适应版本执行epool效率更高
1 #selectors模块 这是自适应的,你系统默认支持的话,就会epool 》pool 》select 相比select更方便 写这个代码默认epool 2 3 import selectors 4 import socket 5 6 def accept(sock, mask): 7 conn, addr = sock.accept() # Should be ready 8 print(‘accepted‘, conn, ‘from‘, addr) 9 conn.setblocking(False) 10 sel.register(conn, selectors.EVENT_READ, read) 11 #监听数据流,如果消息事件来了,调用read方法 注册conn用于监控流了 12 13 def read(conn, mask): 14 data = http://www.mamicode.com/conn.recv(1000) # Should be ready 15 if data: 16 print(‘echoing‘, repr(data), ‘to‘, conn) 17 conn.send(data) # Hope it won‘t block 18 else: 19 print(‘closing‘, conn) 20 sel.unregister(conn) #删除链接清空 跟select remove一样 21 conn.close() 22 23 sock = socket.socket() 24 sock.bind((‘localhost‘, 8001)) #端口是0-65535 1024系统保留 25 sock.listen(100) 26 sock.setblocking(False) 27 28 sel = selectors.DefaultSelector() 29 sel.register(sock, selectors.EVENT_READ, accept) #sock相当于注册,注册一个什么呢,注册一个EVENT_READ读事件 这只是注册呢没有实际监听 30 #相当于 select.select(inputs,outputs.... EVENT_READ 监听,如果有请求就会调用accept) 31 32 while True: 33 events = sel.select() #如果没有事件就会卡这里,select监听, 34 for key, mask in events: 35 callback = key.data #相当于accept内存对象 36 print(key,mask) 37 callback(key.fileobj, mask)
7.RabbitMq 进程队列
server
1 # !/usr/bin/env python 2 import pika 3 4 # credentials = pika.PlainCredentials(‘alex‘,‘alex3714‘) 假如需要验证的时候用这2条就可以连接 5 # connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.10.140‘,credentials=credentials)) 6 7 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.10.140‘)) 8 channel = connection.channel() 9 10 # 声明queue 11 channel.queue_declare(queue=‘hello1‘,durable=True)#durable=True queue队列持久化,rabbitmq重启不会丢失,但是消息会丢 12 #如果之前这里生命过durable,在recv端也要这样声明 13 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 14 channel.basic_publish(exchange=‘‘, 15 routing_key=‘hello1‘, 16 body=‘Hello World!1‘, 17 properties=pika.BasicProperties( #消息持久化 rabbit重启消息不会丢 18 delivery_mode=2, # make message persistent #消息持久化abbit重启消息不会丢 19 )) 20 print(" [x] Sent ‘Hello World1!‘") 21 connection.close()
client
1 # _*_coding:utf-8_*_ 2 __author__ = ‘Alex Li‘ 3 import pika 4 import time 5 6 # credentials = pika.PlainCredentials(‘alex‘,‘alex3714‘) 假如需要验证的时候用这2条就可以连接 7 # connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.10.140‘,credentials=credentials)) 8 9 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘10.10.10.140‘)) 10 channel = connection.channel() 11 12 # You may ask why we declare the queue again ? we have already declared it in our previous code. 13 # We could avoid that if we were sure that the queue already exists. For example if send.py program 14 # was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good 15 # practice to repeat declaring the queue in both programs. 16 channel.queue_declare(queue=‘hello1‘,durable=True) #如果确定这个queue声明过了,可以不用写,但是写上最好,因为不确定send端还是recv先启动 17 18 def callback(ch, method, properties, body): #body消息 19 #ch channel对象 method 声明的一推参数,消息里面的一些属性信息 properties跟随消息传一些参数会在这个里面 20 print(" [x] Received %r" % body) 21 # time.sleep(10) #用于测试work queue 22 23 channel.basic_qos(prefetch_count=1) #消息公平化,如果有一个消息没有处理完就别给我发新的 24 channel.basic_consume(callback, #在hello queue里面收取消息执行callback函数 25 queue=‘hello1‘, 26 #no_ack=True 27 ) #true 默认开启 work queue 这样可以确保即使消息发送的时候中断,也会受到信息no_ack=True这个是关闭了 28 29 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 30 channel.start_consuming() #有消息就收,没有消息就会卡住 监听
python学习道路(day11note)(协程,同步与异步的性能区别,url爬网页,select,RabbitMq)
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。