首页 > 代码库 > Python多进程(multiprocessing)

Python多进程(multiprocessing)

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程

由于Windows没有fork调用,上面的代码在Windows上无法运行。由于Mac系统是基于BSD(Unix的一种)内核,所以,在Mac下运行是没有问题的,

有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。

在Windows下通过multiprocessing模块实现

multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

window系统下,需要注意的是要想启动一个子进程,必须加上那句if __name__ == "main",进程相关的要写在这句下面。

例子

直接调用

 1 from multiprocessing import Process
 2 import time
 3 def f(name):
 4     time.sleep(1)
 5     print(hello, name,time.ctime())
 6 
 7 if __name__ == __main__:
 8     p_list=[]
 9     for i in range(3):
10         p = Process(target=f, args=(alvin,))
11         p_list.append(p)
12         p.start()
13     for i in p_list:
14         p.join()
15     print(end)

继承(类)式调用

 1 from multiprocessing import Process
 2 import time
 3 
 4 class MyProcess(Process):
 5     def __init__(self):
 6         super(MyProcess, self).__init__()
 7         #self.name = name
 8 
 9     def run(self):
10         time.sleep(1)
11         print (hello, self.name,time.ctime())
12 
13 
14 if __name__ == __main__:
15     p_list=[]
16     for i in range(3):
17         p = MyProcess()
18         p.start()
19         p_list.append(p)
20 
21     for p in p_list:
22         p.join()
23 
24     print(end)

获取进程pid

 1 from multiprocessing import Process
 2 import time
 3 import os
 4 
 5 def info(title):
 6     print(title)
 7     print(module name:, __name__)
 8     print(parent process:, os.getppid())  # 获取父进程pid
 9     print(process id:, os.getpid())     # 获取子进程pid
10 
11 
12 def f(name):
13     info(\033[31;1mfunction f\033[0m)
14     print(hello, name)
15 
16 if __name__ == __main__:    # windows下多进程必须有这句
17     info(\033[32;1mmain process line\033[0m)
18     time.sleep(5)
19     p = Process(target=info, args=(bob,))
20     p.start()
21     p.join()
22 print(end, __name__)

输出为

 1 main process line
 2 module name: __main__
 3 parent process: 10704
 4 process id: 216
 5 end __mp_main__
 6 bob
 7 module name: __mp_main__
 8 parent process: 216
 9 process id: 7612
10 end __main__

Process类

Process类的__init__

class Process(object):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        self.name = ‘‘
        self.daemon = False
        self.authkey = None
        self.exitcode = None
        self.ident = 0
        self.pid = 0
        self.sentinel = None

  target:要执行的方法

  name:进程名

  args/kwargs:要传入的方法

实例方法

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

属性

  daemon:和线程的setDeamon功能一样

  exitcode(进程在运行时为None、如果为–N,表示被信号N结束)

  name:进程名字。

  pid:进程号。

进程间通信

 

不同进程间内存是不共享的,要实现两个进程间的数据交换

Queues

使用方法与threading里的queue类似

 1 from multiprocessing import Process, Queue
 2 
 3 def f(q,n):
 4     q.put([42, n, hello])
 5 
 6 if __name__ == __main__:
 7     q = Queue()
 8     p_list=[]
 9     for i in range(3):
10         p = Process(target=f, args=(q,i))
11         p_list.append(p)
12         p.start()
13     print(q.get())
14     print(q.get())
15     print(q.get())
16     for i in p_list:
17             i.join()

Pipe(管道)

multiprocessing.Pipe([duplex])

返回2个连接对象(conn1, conn2),代表管道的两端,默认是双向通信.如果duplex=False,conn1只能用来接收消息,conn2只能用来发送消息.默认duplex = True

可用于多个对象通信,即建立多个子进程

 1 import os
 2 
 3 from multiprocessing import Process, Pipe
 4 
 5 
 6 def f(conn):
 7     conn.send(约吗)
 8     print(conn.recv(),in the %s % os.getpid())
 9     conn.close()
10 
11 if __name__ == __main__:
12     parent_conn, child_conn = Pipe()
13     p = Process(target=f, args=(child_conn,))
14     p2 = Process(target=f, args=(child_conn,))
15     p.start()
16     p2.start()
17     print(parent_conn.recv())  # prints "[42, None, ‘hello‘]"
18     print(parent_conn.recv())  # prints "[42, None, ‘hello‘]"
19     parent_conn.send(hello)
20     parent_conn.send(hello2)
21 
22     p.join()

Manager

用于数据共享

Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。

Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。 

 1 from multiprocessing import Process, Manager
 2 
 3 def f(d,l,n):
 4     d[n] = 1
 5     d[2] = 2
 6     d[0.25] = None
 7     l.append(n)
 8     #print(l)
 9     print(sub,id(d))
10 
11 if __name__ == __main__:
12     with Manager() as manager:#  with open() as f==   f=open()       manager=Manager()
13         d = manager.dict()
14 
15         l = manager.list(range(5))
16         p_list = []
17 
18         print(main,id(d))
19         for i in range(10):
20             p = Process(target=f, args=(d,l,i))
21             p.start()
22             p_list.append(p)
23 
24         for res in p_list:
25             res.join()
26 
27         print(d)
28         print(l)

进程池

 进程池可用于一下子启动多个子进程

pool = Pool()

Pool()里面可以添加数字,默认是cpu核心数

 1 from  multiprocessing import Process, Pool
 2 import time
 3 import os
 4 
 5 def Foo(i):
 6     time.sleep(2)
 7     print(sub %s%os.getpid())
 8 
 9     return i + 100
10 
11 
12 def Bar(arg):
13     print(Bar:,os.getpid())
14     print(-->exec done:, arg)
15 
16 if __name__==__main__:
17     pool = Pool()
18     print(main:,os.getpid())
19     for i in range(10):
20         pool.apply_async(func=Foo, args=(i,),callback=Bar)
21         #pool.apply(func=Foo, args=(i,))
22     print(end)
 1 from multiprocessing import Pool
 2 import os, time, random
 3 
 4 def long_time_task(name):
 5     print(Run task %s (%s)... % (name, os.getpid()))
 6     start = time.time()
 7     time.sleep(random.random() * 3)
 8     end = time.time()
 9     print(Task %s runs %0.2f seconds. % (name, (end - start)))
10 
11 if __name__==__main__:
12     print(Parent process %s. % os.getpid())
13     p = Pool(4)
14     for i in range(5):
15         p.apply_async(long_time_task, args=(i,))
16     print(Waiting for all subprocesses done...)
17     p.close()
18     p.join()
19     print(All subprocesses done.)

 

Python多进程(multiprocessing)