首页 > 代码库 > 关于multiprocessing,我也来聊几句

关于multiprocessing,我也来聊几句

起因:最近需要从hbase中向 ES中导一批数据,使用multiprocessing 启动多个程序同时向ES导数据,可以大大提高效率,因为导数的任务是可以按照时间分割的。

一段简单的代码如下:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    result = pool.apply_async(f, (10,))    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow

    print pool.map(f, range(10))


令我十分不解的,multiprocessing 是如何实现任务的分发,以及结果的回传的。我希望能够把它的实现机制与操作系统的进程机制对应起来。

经过阅读代码,得出的结论如下:

1. 父进程作为整个任务的分发器,每个worker是一个子进程

2. 子进程和父进程之间通过管道通讯,包括任务的分发和结果的回传(2个【管道】) ,管道通过【信号量】加锁


下面罗列部分核心代码,加入我自己的注释,方便大家阅读代码时参考:

1. 管道的创建

    def _setup_queues(self):
        from .queues import SimpleQueue
        self._inqueue = SimpleQueue()       # 管道1 用于分发任务
        self._outqueue = SimpleQueue()      # 管道2 用于推送结果
        self._quick_put = self._inqueue._writer.send
        self._quick_get = self._outqueue._reader.recv

再查看 SimpleQueue

class SimpleQueue(object):

    def __init__(self):
        self._reader, self._writer = Pipe(duplex=False)
        self._rlock = Lock()
        if sys.platform == 'win32':
            self._wlock = None
        else:
            self._wlock = Lock()
        self._make_methods()

在查看 Pipe

def Pipe(duplex=True):
    '''
    Returns two connection object connected by a pipe
    '''
    from multiprocessing.connection import Pipe
    return Pipe(duplex)
if sys.platform != 'win32':

    def Pipe(duplex=True):  # duplex 是否是全双工
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
            c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
            s1.close()
            s2.close()
        else:
            fd1, fd2 = os.pipe()
            c1 = _multiprocessing.Connection(fd1, writable=False)
            c2 = _multiprocessing.Connection(fd2, readable=False)

        return c1, c2

非常用意思的是,如果是全双工的话,直接用socket 来实现

SimpleQueue 是加过锁的,可以用于多进程间并发读写,来看看锁的实现

class Lock(SemLock):  # 到这里python 代码已经无法再跳入,SemLock 引自 _multiprocessing.so 

    def __init__(self):
        SemLock.__init__(self, SEMAPHORE, 1, 1

由SemLock 可以推断,锁就是通过信号量实现的。

2. worker的创建

    def _repopulate_pool(self):
        """Bring the number of pool processes up to the specified number,
        for use after reaping workers which have exited.
        """
        for i in range(self._processes - len(self._pool)):
            w = self.Process(target=worker,
                             args=(self._inqueue, self._outqueue,           # 管道作为参数被传入
                                   self._initializer,
                                   self._initargs, self._maxtasksperchild)
                            )
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            debug('added worker')
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
    put = outqueue.put      # ***留意这里***
    get = inqueue.get       # ***留意这里***

# 省略部分代码
    completed = 0
    while maxtasks is None or (maxtasks and completed < maxtasks):
        try:
            task = get()   # ***留意这里***  任务是子进程自己从【管道】中取回的,这里的管道相当于消息队列了
        except (EOFError, IOError):
            debug('worker got EOFError or IOError -- exiting')
            break

        if task is None:  # 如果任务是None worker就退出了
            debug('worker got sentinel -- exiting')
            break

        job, i, func, args, kwds = task
        try:
            result = (True, func(*args, **kwds))
        except Exception, e:
            result = (False, e)
        try:
            put((job, i, result))    # ***留意这里*** 结果推回队列
        except Exception as e:
            wrapped = MaybeEncodingError(e, result[1])
            debug("Possible encoding error while sending result: %s" % (
                wrapped))
            put((job, i, (False, wrapped)))
        completed += 1
    debug('worker exiting after %d tasks' % completed


明白了这些对于multiprocessing 的使用,我明显就有了底气





关于multiprocessing,我也来聊几句