首页 > 代码库 > Python多进程(multiprocessing)
Python多进程(multiprocessing)
Unix/Linux操作系统提供了一个fork()
系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()
调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。
子进程永远返回0
,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()
就可以拿到父进程的ID。
Python的os
模块封装了常见的系统调用,其中就包括fork
,可以在Python程序中轻松创建子进程
由于Windows没有fork
调用,上面的代码在Windows上无法运行。由于Mac系统是基于BSD(Unix的一种)内核,所以,在Mac下运行是没有问题的,
有了fork
调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。
在Windows下通过multiprocessing模块实现
multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。
window系统下,需要注意的是要想启动一个子进程,必须加上那句if __name__ == "main",进程相关的要写在这句下面。
例子
直接调用
1 from multiprocessing import Process 2 import time 3 def f(name): 4 time.sleep(1) 5 print(‘hello‘, name,time.ctime()) 6 7 if __name__ == ‘__main__‘: 8 p_list=[] 9 for i in range(3): 10 p = Process(target=f, args=(‘alvin‘,)) 11 p_list.append(p) 12 p.start() 13 for i in p_list: 14 p.join() 15 print(‘end‘)
继承(类)式调用
1 from multiprocessing import Process 2 import time 3 4 class MyProcess(Process): 5 def __init__(self): 6 super(MyProcess, self).__init__() 7 #self.name = name 8 9 def run(self): 10 time.sleep(1) 11 print (‘hello‘, self.name,time.ctime()) 12 13 14 if __name__ == ‘__main__‘: 15 p_list=[] 16 for i in range(3): 17 p = MyProcess() 18 p.start() 19 p_list.append(p) 20 21 for p in p_list: 22 p.join() 23 24 print(‘end‘)
获取进程pid
1 from multiprocessing import Process 2 import time 3 import os 4 5 def info(title): 6 print(title) 7 print(‘module name:‘, __name__) 8 print(‘parent process:‘, os.getppid()) # 获取父进程pid 9 print(‘process id:‘, os.getpid()) # 获取子进程pid 10 11 12 def f(name): 13 info(‘\033[31;1mfunction f\033[0m‘) 14 print(‘hello‘, name) 15 16 if __name__ == ‘__main__‘: # windows下多进程必须有这句 17 info(‘\033[32;1mmain process line\033[0m‘) 18 time.sleep(5) 19 p = Process(target=info, args=(‘bob‘,)) 20 p.start() 21 p.join() 22 print(‘end‘, __name__)
输出为
1 main process line 2 module name: __main__ 3 parent process: 10704 4 process id: 216 5 end __mp_main__ 6 bob 7 module name: __mp_main__ 8 parent process: 216 9 process id: 7612 10 end __main__
Process类
Process类的__init__
class Process(object): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): self.name = ‘‘ self.daemon = False self.authkey = None self.exitcode = None self.ident = 0 self.pid = 0 self.sentinel = None
target:要执行的方法
name:进程名
args/kwargs:要传入的方法
实例方法
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():不管任务是否完成,立即停止工作进程
属性
daemon:和线程的setDeamon功能一样
exitcode(进程在运行时为None、如果为–N,表示被信号N结束)
name:进程名字。
pid:进程号。
进程间通信
不同进程间内存是不共享的,要实现两个进程间的数据交换
Queues
使用方法与threading里的queue类似
1 from multiprocessing import Process, Queue 2 3 def f(q,n): 4 q.put([42, n, ‘hello‘]) 5 6 if __name__ == ‘__main__‘: 7 q = Queue() 8 p_list=[] 9 for i in range(3): 10 p = Process(target=f, args=(q,i)) 11 p_list.append(p) 12 p.start() 13 print(q.get()) 14 print(q.get()) 15 print(q.get()) 16 for i in p_list: 17 i.join()
Pipe(管道)
multiprocessing.Pipe([duplex])
返回2个连接对象(conn1, conn2),代表管道的两端,默认是双向通信.如果duplex=False,conn1只能用来接收消息,conn2只能用来发送消息.默认duplex = True
可用于多个对象通信,即建立多个子进程
1 import os 2 3 from multiprocessing import Process, Pipe 4 5 6 def f(conn): 7 conn.send(‘约吗‘) 8 print(conn.recv(),‘in the %s‘ % os.getpid()) 9 conn.close() 10 11 if __name__ == ‘__main__‘: 12 parent_conn, child_conn = Pipe() 13 p = Process(target=f, args=(child_conn,)) 14 p2 = Process(target=f, args=(child_conn,)) 15 p.start() 16 p2.start() 17 print(parent_conn.recv()) # prints "[42, None, ‘hello‘]" 18 print(parent_conn.recv()) # prints "[42, None, ‘hello‘]" 19 parent_conn.send(‘hello‘) 20 parent_conn.send(‘hello2‘) 21 22 p.join()
Manager
用于数据共享
Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。
Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
1 from multiprocessing import Process, Manager 2 3 def f(d,l,n): 4 d[n] = ‘1‘ 5 d[‘2‘] = 2 6 d[0.25] = None 7 l.append(n) 8 #print(l) 9 print(‘sub‘,id(d)) 10 11 if __name__ == ‘__main__‘: 12 with Manager() as manager:# with open() as f== f=open() manager=Manager() 13 d = manager.dict() 14 15 l = manager.list(range(5)) 16 p_list = [] 17 18 print(‘main‘,id(d)) 19 for i in range(10): 20 p = Process(target=f, args=(d,l,i)) 21 p.start() 22 p_list.append(p) 23 24 for res in p_list: 25 res.join() 26 27 print(d) 28 print(l)
进程池
进程池可用于一下子启动多个子进程
pool = Pool()
Pool()里面可以添加数字,默认是cpu核心数
1 from multiprocessing import Process, Pool 2 import time 3 import os 4 5 def Foo(i): 6 time.sleep(2) 7 print(‘sub %s‘%os.getpid()) 8 9 return i + 100 10 11 12 def Bar(arg): 13 print(‘Bar:‘,os.getpid()) 14 print(‘-->exec done:‘, arg) 15 16 if __name__==‘__main__‘: 17 pool = Pool() 18 print(‘main:‘,os.getpid()) 19 for i in range(10): 20 pool.apply_async(func=Foo, args=(i,),callback=Bar) 21 #pool.apply(func=Foo, args=(i,)) 22 print(‘end‘)
1 from multiprocessing import Pool 2 import os, time, random 3 4 def long_time_task(name): 5 print(‘Run task %s (%s)...‘ % (name, os.getpid())) 6 start = time.time() 7 time.sleep(random.random() * 3) 8 end = time.time() 9 print(‘Task %s runs %0.2f seconds.‘ % (name, (end - start))) 10 11 if __name__==‘__main__‘: 12 print(‘Parent process %s.‘ % os.getpid()) 13 p = Pool(4) 14 for i in range(5): 15 p.apply_async(long_time_task, args=(i,)) 16 print(‘Waiting for all subprocesses done...‘) 17 p.close() 18 p.join() 19 print(‘All subprocesses done.‘)
Python多进程(multiprocessing)