首页 > 代码库 > python -- 异步编程

python -- 异步编程

我们在生产中,常用的处理任务模型有三种:
  单线程
  多线程
  异步(单线程内,串行,特点是遇到阻塞(或IO之类的)就切换到其他任务)

其中一般如果都符合要求,那么异步是最好的选择。
  单线程:遇到阻塞整个程序都等待
  多线程:以空间换取时间,且有时候伴随着数据安全问题(通常加锁来处理)
  异步:在单个线程内,且是串行执行,但是一旦遇到阻塞(IO之类的),就会切换到线程内的其他任务(把IO操作交给操作系统处理)

当我们面对如下的环境时,事件驱动模型(异步模型)通常是一个好的选择(and):
  1、程序中有许多任务
  2、任务之间高度独立(因此它们不需要互相通信,或者等待彼此)
  3、在等待事件到来时,某些任务会阻塞。

常用的异步IO模型:select、poll、Epoll    (windows下只支持select)
nginx就是Epoll模型实现的。单线程,多进程(为了利用多核,单线程只能跑在单个cup核心上)

前面我们说了多线程与多进程,总结其特点就是:

  单线程串行执行,遇到IO就阻塞,效率低。

  多线程并发执行,遇到IO就切换(用空间换取执行时间),效率上去了,但是耗费资源,操作复杂。

针对以上问题,出现了一种新的替代品,协程。

协程  

  协程又名微线程,纤程。协程是一种用户态的轻量级线程(协程由用户切换,线程由cpu时间片控制切换)协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。跟yield过程差不多


协程的定义标准:
  1、必须在一个单线程里面实现并发
  2、修改共享数据不需要加锁(因为协程是串行的)
  3、用户程序里自己保存多个控制流的上下文栈
  4、一个协程遇到IO操作(阻塞也一样)就自动切换到其他协程

协程的好处:
  1、无需线程上下文切换的开销
  2、无需原子操作锁定及同步的开销
       "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。

  3、方便切换控制流,简化编程模型
  4、高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

缺点:
  1、无法利用多核资源:协程的本质是个单线程,它不能同时将单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  2、进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

 

协程例子

  1、用yield实现协程操作的例子:

 1 import time
 2 
 3 def consumer(name):
 4     print("\033[32;1m ---> starting eating baozi... \033[0m")
 5     while True:
 6         new_baozi = yield
 7         print("[%s] is eating baozi %s " %(name,new_baozi))
 8         # time.sleep(1)  #在yield里面并没有实现阻塞切换
 9 
10 def producer():
11     c = con1.__next__()
12     c = con2.__next__()
13     n = 0
14     while n < 5:
15         n += 1
16         print("\033[32;1m [producer]\033[0m is making baozi %s " % n)
17         con1.send(n)
18         con2.send(n)     #用send可以想yield发送数据
19 
20 if __name__ == __main__:
21     con1 = consumer(c1)
22     con2 = consumer(c2)
23     p = producer()

  执行结果

 ---> starting eating baozi... 
 ---> starting eating baozi... 
 [producer] is making baozi 1 
[c1] is eating baozi 1 
[c2] is eating baozi 1 
 [producer] is making baozi 2 
[c1] is eating baozi 2 
[c2] is eating baozi 2 
 [producer] is making baozi 3 
[c1] is eating baozi 3 
[c2] is eating baozi 3 
 [producer] is making baozi 4 
[c1] is eating baozi 4 
[c2] is eating baozi 4 
 [producer] is making baozi 5 
[c1] is eating baozi 5 
[c2] is eating baozi 5 

 

2、使用gevent实现协程

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 import gevent
 5 import time
 6 
 7 def func1():
 8     print("\033[32;1m func1函数的第1部分... \033[0m")
 9     gevent.sleep(2)
10     # time.sleep(2)   #用time.sleep(2)是不行的
11     print("\033[32;1m func1函数的第2部分... \033[0m")
12 
13 def func2():
14     print("\033[31;1m func2函数的第1部分... \033[0m")
15     gevent.sleep(1)
16     # time.sleep(1)
17     print("\033[31;1m func2函数的第2部分... \033[0m")
18 
19 def func3():
20     print("\033[34;1m func3函数的第1部分... \033[0m")
21     gevent.sleep(3)
22     # time.sleep(3)
23     print("\033[34;1m func3函数的第2部分... \033[0m")
24 
25 
26 if __name__ == __main__:
27     gevent.joinall([
28         gevent.spawn(func1),
29         gevent.spawn(func2),
30         gevent.spawn(func3),
31     ])

  执行结果

 func1函数的第1部分... 
 func2函数的第1部分... 
 func3函数的第1部分... 
 func2函数的第2部分... 
 func1函数的第2部分... 
 func3函数的第2部分... 

 

3、协程结合urllib模块爬网站

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 from gevent import monkey
 5 monkey.patch_all()  #遇到阻塞就切换全靠它(作用:把要用到的接口全部变为非阻塞模式)
 6 
 7 import gevent
 8 from urllib.request import urlopen
 9 
10 def f(url):
11     print("GET: %s " %url)
12     resp = urlopen(url)
13     data =http://www.mamicode.com/ resp.read()
14     print("%s bytes received from %s " %(len(data),url))
15 
16 if __name__ == __main__:
17     gevent.joinall([
18         gevent.spawn(f, http://www.cdu.edu.cn/),
19         gevent.spawn(f,https://www.python.org/),
20         gevent.spawn(f,https://www.jd.com/),
21         gevent.spawn(f,https://www.vip.com/)
22     ])

  执行结果

GET: http://www.cdu.edu.cn/ 
GET: https://www.python.org/ 
GET: https://www.jd.com/ 
GET: https://www.vip.com/ 
137584 bytes received from https://www.jd.com/ 
69500 bytes received from https://www.vip.com/ 
47984 bytes received from http://www.cdu.edu.cn/ 
47695 bytes received from https://www.python.org/ 

Process finished with exit code 0

 

4、利用协程实现高并发服务器

技术分享
 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 import gevent
 5 import socket
 6 from gevent import monkey; monkey.patch_all()
 7 
 8 def server(port):
 9     s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
10     s.bind((0.0.0.0,port))
11     s.listen(5000)
12 
13     while True:
14         # print("\033[32;1m server is waiting... \033[0m")
15         print(" server is waiting... ")
16         conn, addr = s.accept()
17         gevent.spawn(handle_request,conn)
18 
19 def handle_request(conn):
20     try:
21         while True:
22             data = http://www.mamicode.com/conn.recv(1024)
23             print("recvied:",data.decode(utf8))
24             conn.send(data)
25             if not data:
26                 conn.shutdown(socket.SHUT_WR)
27 
28     except Exception as ex:
29         print(ex)
30 
31     finally:
32         conn.close()
33 
34 
35 if __name__ == __main__:
36     server(9999)
server
技术分享
 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 import socket
 5 import threading
 6 
 7 def run(n):
 8     ‘‘‘这里是启动多线程,然后每个线程死循环发包给服务器,测试协程服务器的高并发和稳定性‘‘‘
 9     while True:
10         # msg = input(">>:").strip()
11         # if len(msg) == 0:continue
12         # if msg == ‘q‘:break
13         msg = hello %s %n
14         sk.send(bytes(msg,utf8))
15         data = http://www.mamicode.com/sk.recv(1024)
16         print("Received:",data.decode(utf8))
17 
18     sk.close()
19 
20 if __name__ == __main__:
21     IP_Port = (127.0.0.1, 9999)
22     sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
23     sk.connect(IP_Port)
24 
25     thread_list = []
26     for i in range(2000):
27         t = threading.Thread(target=run,args=[i,])
28         t.start()
29         thread_list.append(t)
30 
31     for thread in thread_list:
32         thread.join()
client

 

python -- 异步编程