首页 > 代码库 > Python:线程、进程与协程(4)——multiprocessing模块(1)

Python:线程、进程与协程(4)——multiprocessing模块(1)

    multiprocessing模块是Python提供的用于多进程开发的包,multiprocessing包提供本地和远程两种并发,通过使用子进程而非线程有效地回避了全局解释器锁。

(一)创建进程Process 类

        创建进程的类,其源码在multiprocessing包的process.py里,有兴趣的可以对照着源码边理解边学习。它的用法同threading.Thread差不多,从它的类定义上就可以看的出来,如下:

class Process(object):
    ‘‘‘
    Process objects represent activity that is run in a separate process

    The class is analagous to `threading.Thread`
    ‘‘‘
    _Popen = None

    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        assert group is None, ‘group argument must be None for now‘
        count = _current_process._counter.next()
        self._identity = _current_process._identity + (count,)
        self._authkey = _current_process._authkey
        self._daemonic = _current_process._daemonic
        self._tempdir = _current_process._tempdir
        self._parent_pid = os.getpid()
        self._popen = None
        self._target = target
        self._args = tuple(args)
        self._kwargs = dict(kwargs)
        self._name = name or type(self).__name__ + ‘-‘ +                      ‘:‘.join(str(i) for i in self._identity)

Process([group [, target [, name [, args [, kwargs]]]]])

group实质上不使用,是保留项,便于以后扩展。

target表示调用对象,

args表示调用对象的位置参数元组

kwargs表示调用对象的字典

name为别名,即进程的名字

它的方法/属性跟threading.Thread也有很多类似的地方,主要有:

start():开始进程活动。

run():表示进程的活动方法,可以在子类中覆盖它。

join([timeout]):是用来阻塞当前上下文,直至该进程运行结束,一个进程可以被join()多次,timeout单位是秒。

terminate():结束进程。在Unix上使用的是SIGTERM,在Windows平台上使用TerminateProcess

is_alive():判断进程是否还活着。

name:一个字符串,表示进程的名字,也可以通过赋值语句利用它来修改进程的名字

ident:进程的ID,如果进程没开始,结果是None

pid:同ident,大家可以看看ident和pid的实现,是利用了os模块的getpid()方法。

authkey:设置/获取进程的授权密码。当初始化多进程时,使用os.urandom()为主进程分配一个随机字符串。当创建一个Process对象时,它将继承其父进程的认证密钥, 但是可以通过将authkey设置为另一个字节字符串来改变。这里authkey为什么既可以设置授权密码又可以获取呢?那是因为它的定义使用了property装饰器,源码如下:

@property
def authkey(self):
   return self._authkey

@authkey.setter
def authkey(self, authkey):
   ‘‘‘
    Set authorization key of process
   ‘‘‘
   self._authkey = AuthenticationString(authkey)

这是property的一个高级用法,如果理解了其实也很简单,有兴趣的去查看其它资料。

daemon:一个布尔值,指示进程是(True)否(False)是一个守护进程。它必须在调用start()之前设置,否则会引发RuntimeError。它的初始值继承自创建它的进程;进程不是一个守护进程,所以在进程中创建的所有进程默认daemon = False。

exitcode:返回进程退出时的代码。进程运行时其值为None,如果为–N,表示被信号N结束。

(1)一个简单的单进程例子

#coding=utf-8
import multiprocessing
import datetime
import time

def worker(interval):
    n = 5
    while n > 0:
        print "The now is %s"% datetime.datetime.now()
        time.sleep(interval)
        n -= 1

if __name__ == "__main__":

    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()#开始进程
    #p.terminate()#结束进程
    #p.join(9)#阻塞当前上下文
    print "p.authkey",p.authkey#获取进程的授权密码
    p.authkey = u"123"#设置进程的授权密码
    print "p.authkey", p.authkey#获取进程的授权密码
    print "p.pid:", p.pid,p.ident#进程ID
    p.name = ‘helloworld‘#修改进程名字
    print "p.name:", p.name#进程名字
    print "p.is_alive:", p.is_alive()#是否是活的

运行结果如下图:

技术分享

上面的代码有两行注释掉的,大家可以把注释去掉,体会、理解这两个方法的用处,在此不贴我的运行结果了。

(2)自定义进程类,并开启多个进程

import multiprocessing
import datetime
import time

class MyProcess(multiprocessing.Process):
    """
    自定义进程类
    """
    def __init__(self,interval,group=None,target=None,name=None,args=(),kwargs={}):
        multiprocessing.Process.__init__(self,group,target,name,args,kwargs=kwargs)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is %s"%datetime.datetime.now())
            time.sleep(self.interval)
            n -= 1


def worker_1(interval):
    print "worker_1"
    time.sleep(interval)
    print "end worker_1"

def worker_2(interval):
    print "worker_2"
    time.sleep(interval)
    print "end worker_2"

def worker_3(interval):
    print "worker_3"
    time.sleep(interval)
    print "end worker_3"


if __name__ == "__main__":
    p1 = MyProcess(interval=2,target = worker_1, args = (2,))
    p2 = MyProcess(interval=2,target = worker_2, args = (3,))
    p3 = MyProcess(interval=2,target = worker_3, args = (4,))

    p1.start()
    p2.start()
    p3.start()
    print "current process",multiprocessing.current_process(),multiprocessing.active_children()
    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print "END!!!!!!!!!!!!!!!!!"

运行结果如下:

技术分享

看看打印出来的时间,三个进程应该是并行执行的。

(二)进程间通信

    multiprocessing模块支持两种进程间的通信方式:Queue(队列)和Pipe(管道)。

(1)Queue

    multiprocessing中的Queue类的定义在queues.py文件里。和Queue.Queue差不多,multiprocessing中的Queue类实现了Queue.Queue的大部分方法(可以参考上篇博文Python:线程、进程与协程(3)——Queue模块及源码分析),但task_done()和join()没有实现,主要方法和属性有

  qsize():返回Queue的大小

  empty():返回一个布尔值,表示Queue是否为空

  full():返回一个布尔值,表示Queue是否满

  put(item[, block[, timeout]]):向队列里添加元素item,block设置为False的时候,如果队列满了则抛出Full异常。如果block设置为True,timeout设置为None时,则会一种等到有空位的时候再添加进队列;否则会根据timeout设定的超时值抛出Full异常。     

  put_nowait(item):等价与put(item,False)。

  get([block[, timeout]]):从队列中删除元素并返回该元素的值,如果timeout是一个正数,它会阻塞最多超时秒数,并且如果在该时间内没有可用的项目,则引发Empty异常。

  get_nowait():等价于get(False)

  close():表示该Queue不在加入新的元素

  join_thread():加入后台线程。这只能在调用close()之后使用。它阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道。默认情况下,如果进程不是队列的创建者,则退出, 它将尝试加入队列的后台线程。 该进程可以调用cancel_join_thread()来做

cancel_join_thread():在阻塞中阻止join_thread(),防止后台线程在进程退出时被自动连接 ,肯能会导致数据丢失。


(2)Pipe

Pipe不是类,是函数,该函数定义在 multiprocessing中的connection.py里,函数原型Pipe(duplex=True),

返回一对通过管道连接的连接对象conn1和conn2。

 如果duplex是True(默认值),则管道是双向的。

 如果duplex是False,则管道是单向的:conn1只能用于接收消息,conn2只能用于发送消息。

Pipe()返回的两个连接对象表示管道的两端,每个连接对象都有send()和recv()方法(还有其它方法),分别是发送和接受消息。下面举个简单的例子,一个发送数据,一个接受数据

#coding=utf-8
import multiprocessing
import time

def proc1(pipe):
    """
    发送数据
    """
    while True:
        for i in xrange(100):
            print "send: %s" %(i)
            pipe.send(i)#发送数据
            time.sleep(1)

def proc2(pipe):
    """
    接收数据
    """
    while True:
        print "proc2 rev:", pipe.recv()#接受数据
        time.sleep(1)
if __name__ == "__main__":
    pipe1,pipe2 = multiprocessing.Pipe()#返回两个连接对象
    p1 = multiprocessing.Process(target=proc1, args=(pipe1,))
    p2 = multiprocessing.Process(target=proc2, args=(pipe2,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

运行结果如下:

技术分享

(三)进程间的同步

        multiprocessing包含与threading中所有同步原语等同的原语,它也有Lock,RLock,Even,Condition,Semaphore,BoundedSemaphore。用法都差不多,它们的定义在 multiprocessing包的synchronize.py文件里,在此不过多介绍,有兴趣的可以参考Python:线程、进程与协程(2)——threading模块里相关的概念理解。如果理解了相关概念,在 multiprocessing模块中使用是一样的,看下面这个简单的例子吧,有两个进程要向某个文件中写入内容,为了避免访问冲突,可以使用锁。

#coding=utf-8
import multiprocessing
def worker_with(lock, f):
    with lock:#Lock等对象也是支持上下文管理器协议的。
        fs = open(f, ‘a+‘)
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()

def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, ‘a+‘)
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()#定义锁
    f = "/home/liulonghua/files.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print "end"

   

    multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。

    多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。下篇博文再接着讲进程共享和进程池等。



Python:线程、进程与协程(4)——multiprocessing模块(1)