首页 > 代码库 > python第五十三天--进程,协程.select.异步I/O...

python第五十三天--进程,协程.select.异步I/O...

进程:

技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 import multiprocessing,threading,time
 5 
 6 def run(name):
 7     t=threading.Thread(target=run2)#创建新线程
 8     t.start()
 9     print(进程[%s],打印中...%name)
10     time.sleep(1)
11 
12 def run2():
13     print(threading.get_ident())#打印线程ID
14     time.sleep(2)
15 
16 
17 if __name__ == __main__:
18     for i in range(10):
19         p=multiprocessing.Process(target=run,args=(第[%s]个进程%i,))#创建新进程
20         p.start()
View Code

进程号:

技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 
 5 import  multiprocessing,threading,time,os
 6 from  multiprocessing import Process#从 multprocessing 打开 Process
 7 def info_l(file):
 8     print(当前模块名:,__name__)
 9     print(父进程ID:,os.getppid())
10     print(进程ID:,os.getpid())
11     print(\n\n)
12 
13 def f(name):
14     print(查看:,name)
15     info_l(相关列表)
16 
17 
18 if __name__ == __main__:
19     info_l(主进程相关列表)
20     p=Process(target=f,args=(当前进程,))
21     p.start()
22     p.join()
View Code

进程锁:

技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 
 5 from multiprocessing import Process,Lock # Lock 进程锁通讯中间件
 6 
 7 def f(lock,i):
 8     lock.acquire()#进程锁
 9     print(第[%s]个进程%i)
10     lock.release()#解锁
11 if __name__ ==__main__:
12     lock=Lock()#进程锁对象
13     for i in range(10):
14         p=Process(target=f,args=(lock,i)).start()
View Code

进程之间通讯:

技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 
 5 import multiprocessing,queue,threading
 6 from multiprocessing import Process,Queue # Queue 进程通讯中间件
 7 
 8 
 9 ‘‘‘  线程 之间共享队列
10 def f():
11     q.put([1,None,‘加入数据‘])#队列
12 if __name__ ==‘__main__‘:
13     q=queue.Queue()#线程队列
14     #q=Queue()#进程队列
15     p=threading.Thread(target=f,)#创建线程
16     p.start()
17     print(q.get())#输出,取出的
18     p.join()
19 ‘‘‘
20 def f(q):#存入q对象
21     q.put([1,None,加入数据])#队列
22 if __name__ ==__main__:
23     q=Queue()#进程队列
24     p=Process(target=f,args=(q,))#创建进程
25     p.start()
26     print(q.get())#输出,取出的
27     p.join()
View Code
技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 import os
 5 from multiprocessing import Process,Pipe,Manager # Pipe 管道 进程通讯中间件
 6 
 7 
 8 def f(d,l):
 9     d[os.getpid()]=os.getpid()#修改字典
10     l.append(os.getpid())#添加列表内容
11     print(d)
12     print(l)
13 
14 
15 if __name__ ==__main__:
16     with Manager() as manager:
17         d=manager.dict()#创建一个进程之间可修改的字典
18         l=manager.list(range(5))#创建一个进程之间可修改的列表
19         p_list=[]#join使用
20         for i in range(10):
21             p=Process(target=f,args=(d,l))#创建进程传入数据,
22             p.start()
23             p_list.append(p)
24         for r in p_list:#等待进程完成
25             r.join()
26         print(d)
27         print(l)
View Code
技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 
 5 from multiprocessing import Process,Pipe # Pipe 管道 进程通讯中间件
 6 
 7 def f(conn):#存入conn对象
 8     conn.send([子进程发送信息,....])
 9     print(收到父进程的信息:,conn.recv())
10     conn.close()
11 if __name__ ==__main__:
12     parent_conn,child_conn=Pipe()#生成一个管道,返回两个值,管理双端
13     p=Process(target=f,args=(child_conn,))#创建进程
14     p.start()
15     print(收到子进程的信息:,parent_conn.recv())
16     parent_conn.send([父进程发送信息])
17     p.join()
View Code

进程池:

技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan 
 4 #__author__=2017/6/23
 5 import time,os
 6 from multiprocessing import Process,Lock,Pool # Pool 进程池通讯中间件
 7 
 8 def Foo(i):
 9     print(第[%s]个进程,ID:%i,os.getpid())
10     time.sleep(3)
11     return i+100
12 def Bar(arg):
13     print(回调>>>>:,arg,os.getpid())
14 if __name__ ==__main__:
15     #pool=Pool(processes=5)#定义一个进程池 表示允许进程池同时放入5个进程
16     pool=Pool(5)#定义一个进程池 表示允许进程池同时放入5个进程
17     for i in range(10):
18         #pool.apply(func=Foo,args=(i,))#使用进程池创建进程  串行
19         #pool.apply_async(func=Foo,args=(i,))#使用进程池创建进程 并行
20         pool.apply_async(func=Foo,args=(i,),callback=Bar)#回调
21 
22     print(结束)
23     #pool.join()
24     pool.close()#一定要先关闭进程池
25     pool.join()#后进行join
View Code

 

 

协程:

技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan 
 4 #python 
 5 #2017/6/24    10:10
 6 #__author__=‘Administrator‘
 7 
 8 import time
 9 import queue
10 def consumer(name):#消费者函数
11     print([%s]消费产品中.......%name)
12     while True:
13         new_b=yield #跳转点
14         print([%s] 消费 [%s]%(name,new_b))
15 def producer():#生产者函数
16     r=con.__next__()
17     r2=con2.__next__()
18     n=0
19     while n<10:
20         n+=1
21         con.send(n)#发送给消费者
22         con2.send(n)
23         print(\033[32;1m[生产者]\033[0m生产产品[%s]%n)
24 
25 if __name__==__main__:
26     con=consumer(消费者A)
27     con2=consumer(消费者B)
28     p=producer()
View Code
技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan 
 4 #python 
 5 #2017/6/24    10:31
 6 #__author__=‘Administrator‘
 7 #import greenlet
 8 from greenlet import greenlet
 9 
10 def test1():
11     print(函数一: 12)
12     ger2.switch()#进行协程切换
13     print(函数一: 34)
14     ger2.switch()
15 
16 def test2():
17     print(函数二: 56)
18     ger1.switch()
19     print(函数二: 78)
20     ger1.switch()
21 
22 ger1=greenlet(test1)#创建协程
23 ger2=greenlet(test2)
24 ger1.switch()
View Code

 

技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan 
 4 #python 
 5 #2017/6/24    10:47
 6 #__author__=‘Administrator‘
 7 
 8 import gevent
 9 def func1():
10     print(func1 reading....)
11     gevent.sleep(2)
12     print(func1 reading two.. end)
13 
14 def func2():
15     print(func2 reading....)
16     gevent.sleep(0)
17     print(func2 reading two.. end)
18 def func3():
19     print(func3 reading....)
20     gevent.sleep(1)
21     print(func3 reading two.. end)
22 
23 gevent.joinall([
24     gevent.spawn(func1),
25     gevent.spawn(func2),
26     gevent.spawn(func3)
27 ])
View Code

技术分享

 

技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan 
 4 #python 
 5 #2017/6/24    14:03
 6 #__author__=‘Administrator‘
 7 from urllib import request
 8 import gevent,time
 9 from gevent import monkey
10 monkey.patch_all()#对所有的I/O操作进行标记
11 
12 def f(url):#爬取网页
13     print(网址: ,url)
14     resp=request.urlopen(url)#打开网页
15     data=http://www.mamicode.com/resp.read()#读取网页
16     print(网址:[%s]的网页数据大小:[%s]%(url,len(data)))
17 
18 urls=[https://www.python.org/,
19       https://hao.360.cn/,
20       https://www.yahoo.com/]
21 
22 time_start=time.time()#同步 串行开始时间
23 for url in urls:
24     f(url)
25 print(同步时长:,time.time()-time_start)
26 
27 time_start_asy=time.time()#异步 并行开始时间
28 gevent.joinall([
29     gevent.spawn(f,https://www.python.org/),
30     gevent.spawn(f,https://hao.360.cn/),
31     gevent.spawn(f,https://www.yahoo.com/)
32 ])
33 print(异域步时长:,time.time()-time_start_asy)
View Code

 

 

 

协程socket_server 实现并发

技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan 
 4 #python 
 5 #2017/6/24    14:42
 6 #__author__=‘Administrator‘
 7 
 8 import sys
 9 import socket
10 import time
11 import gevent
12 
13 from gevent import socket,monkey
14 monkey.patch_all()
15 
16 
17 def server(port):
18     s = socket.socket()#socket 对象
19     s.bind((0.0.0.0, port))#服务端,bind IP 端口
20     s.listen(500)
21     print(监听中....)
22     while True:
23         cli, addr = s.accept()
24         gevent.spawn(handle_request, cli)#创建一个新协程来
25 
26 
27 
28 def handle_request(conn):
29     try:
30         while True:
31             data = http://www.mamicode.com/conn.recv(1024)
32             print("recv:", data)
33             conn.send(data)
34             if not data:
35                 conn.shutdown(socket.SHUT_WR)
36 
37     except Exception as  ex:
38         print(ex)
39     finally:
40         conn.close()
41 if __name__ == __main__:
42     server(8001)
View Code

 

select :

socket_server

技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan 
 4 #python 
 5 #2017/6/24    19:34
 6 #__author__=‘Administrator‘
 7 
 8 import select,socket,sys ,queue
 9 
10 s=socket.socket()#实例化一个连接对象
11 s.setblocking(0)#设置成非阻塞
12 server_addr=(localhost,9500)#设置绑定的 IP 端口
13 s.bind(server_addr)#连接对象绑定IP 端口
14 s.listen(100)#队列  可连接数量
15 inputs=[s,]#首先要监测本身
16 
17 outputs=[]#发送列表
18 
19 meg_queues={} #发送 连接对象的队列集合  字典
20 
21 while True:
22     print(监听中......)
23     readable,writeable,exeptional=select.select(inputs,outputs,inputs)#生成select 对象,返回三个列表 连接,发关,错误
24 
25     for i in readable: #i为一个socket
26         if i is s:#如果i 是s 表示有新 连接 进来
27             conn,client_addr=i.accept()#建立一个新连接
28             print(接入一个新连接...,client_addr)
29             conn.setblocking(0)#也设成非阻塞
30             inputs.append(conn)#加入select,的连接列表,避免出现阻塞
31             meg_queues[conn]=queue.Queue()#创建一个队列  添加到字典
32         else:
33             try:
34                 data=http://www.mamicode.com/i.recv(1024)#如果不是新连接就收数据
35             except Exception as e:
36                 print(e)
37             if data: #如果数据不为空
38                 print([%s] 发来的数据 [%s]%(i.getpeername,data))
39                 meg_queues[i].put(data)#当前连接的消息队列加入数据
40                 if i not in outputs:#如果当前连接没有在发送列表内,就加入发送列表
41                     outputs.append(i)
42             else:
43                 print(客户端已经断开了....)#开始清理工作
44                 if i in outputs:#在发送列表
45                     outputs.remove(i)#在发送列表内删除
46                 inputs.remove(i)#在连接列表内删除
47                 del meg_queues[i]#在队列字典内删除
48 
49     for w in writeable:#循环发送列表
50         try:
51             msg=meg_queues[w].get_nowait()#取出队列中的数据,判断
52         except queue.Empty:#如果数据为空
53             outputs.remove(w)##从发送列表内删除
54         else:
55             w.send(msg)#发送
56 
57     for e in exeptional:#循环错误列表
58         print(连接[%s]出错!%e.getpeername)
59         inputs.remove(e)##从发送列表内删除
60         if e in outputs:#在发送列表
61             outputs.remove(e)#在发送列表内删除
62         e.close()
63         del meg_queues[e]#在队列字典内删除
View Code

 

 

 

socket_client

技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan 
 4 #python 
 5 #2017/6/24    19:23
 6 #__author__=‘Administrator‘
 7 
 8 import socket
 9 
10 server_addr=(localhost,9500)#设置绑定的 IP 端口
11 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
12 s.connect(server_addr)
13 while True:
14     msg = bytes(input(">>:"),encoding="utf8")#定义一个数据 消息
15     if msg:
16         s.sendall(msg)#发送数据
17     else:
18         print(不能为空)
19         continue
20     data = http://www.mamicode.com/s.recv(1024)#收数据(读数据)
21     #print(data)
22 
23     print(Received, repr(data.decode()))
24 s.close()
View Code

 

selectors :

技术分享
 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan 
 4 #python 
 5 #2017/6/24    21:58
 6 #__author__=‘Administrator‘
 7 import selectors
 8 import socket
 9 
10 sel = selectors.DefaultSelector()#生成一个创建一个selectors对象
11 
12 def accept(sock, mask):
13     conn, addr = sock.accept()  # 建立新连接
14     print(accepted, conn, from, addr)
15     conn.setblocking(False)#设成非阻塞
16     sel.register(conn, selectors.EVENT_READ, read)#注册 连接,回调函数 read
17 
18 def read(conn, mask):
19     data = http://www.mamicode.com/conn.recv(1024)  # 接收数据
20     if data:#不为空
21         print(echoing, repr(data), to, conn)
22         conn.send(data)  # 发送数据
23     else:#如果为空
24         print(closing, conn)
25         sel.unregister(conn)#取消注册
26         conn.close()#关闭连接
27 
28 server_addr=(localhost,9501)#设置绑定的 IP 端口
29 sock = socket.socket()#创建一个sock对象
30 sock.bind(server_addr)#绑定IP 端口
31 sock.listen(100)
32 print(监听中...)
33 sock.setblocking(False)#非阻塞
34 sel.register(sock, selectors.EVENT_READ, accept)#注册连接  返调函数为accept
35 
36 while True:
37     events = sel.select()#默认为阻塞模式
38     for key, mask in events:#如果有连接,接入
39         callback = key.data#新建连接句柄
40         callback(key.fileobj, mask)
View Code

 

 

事件驱动与异步IO

服务器处理模型的程序时,有以下几种模型:
(1)每收到一个请求,创建一个新的进程,来处理该请求;
(2)每收到一个请求,创建一个新的线程,来处理该请求;
(3)每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求
 
第(1)中方法,由于创建新的进程的开销比较大,所以,会导致服务器性能比较差,但实现比较简单。
第(2)种方式,由于要涉及到线程的同步,有可能会面临死锁等问题。
第(3)种方式,在写应用程序代码时,逻辑比前面两种都复杂。
综合考虑各方面因素,一般普遍认为第(3)种方式是大多数网络服务器采用的方式
 
事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。

 

python第五十三天--进程,协程.select.异步I/O...