首页 > 代码库 > Python多进程(multiprocessing)学习总结

Python多进程(multiprocessing)学习总结

简介

multiprocessing模块使用和threading包类似的API接口来产生多进程,multiprocessing包提供本地和远程的并发,通过使用subprocesses(子进程)代替threads(线程)有效的避开了GIL(Global Interpreter Lock)。由于这一点,multiprocessing模块允许程序充分的利用多处理器。可以跨平台使用,包括Unix和Windows!----https://docs.python.org/2/library/multiprocessing.html

提醒: Some of this package’s functionality requires a functioning shared semaphore implementation on the host operating system. Without one, the multiprocessing.synchronize module will be disabled, and attempts to import it will result in an ImportError. See issue 3770 for additional information.

简单介绍完毕!开始学习Function。

FUNCTIONS

    Array
        (typecode_or_type, size_or_initializer, **kwds)
        Returns a synchronized shared array返回一个同步共享队列
    BoundedSemaphore(value=http://www.mamicode.com/1)
        Returns a bounded semaphore object返回一个有界信号对象   
    Condition(lock=None)
        Returns a condition object返回一个状态对象    
    Event()
        Returns an event object返回一个时间对象   
    JoinableQueue(maxsize=0)
        Returns a queue object返回一个队列对象  
    Lock()
        Returns a non-recursive lock object返回一个非递归锁对象 
    Manager()
        Returns a manager associated with a running server process返回一个与正在运行的服务器进程相关联的进程  The managers methods such as `Lock()`, `Condition()` and `Queue()` can be used to create shared objects.管理方法如lock() ,condition() 和queue() 可用于创建共享对象。  
    Pipe(duplex=True)
        Returns two connection object connected by a pipe返回通过管道连接的两个连接对象   
    Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
        Returns a process pool object返回一个进程池对象    
    Queue(maxsize=0)
        Returns a queue object返回一个队列对象  
    RLock()
        Returns a recursive lock object返回一个递归锁对象
    RawArray(typecode_or_type, size_or_initializer)
        Returns a shared array返回一个共享队列 
    RawValue(typecode_or_type, *args)
        Returns a shared object返回一个共享对象    
    Semaphore(value=http://www.mamicode.com/1)
        Returns a semaphore object返回一个信号量对象  
    Value(typecode_or_type, *args, **kwds)
        Returns a synchronized shared object返回一个同步共享对象
    active_children()
        Return list of process objects corresponding to live child processes返回进程对象相对应活跃的的子进程列表 
    allow_connection_pickling()
        Install support for sending connections and sockets between processes支持进程间通信 
    cpu_count()
        Returns the number of CPUs in the system返回CPU的核心数(实际是线程数,我的就是四核心八线程返回为8)
    current_process()
        Return process object representing the current process返回当前进程的进程对象  
    freeze_support()
        Check whether this is a fake forked process in a frozen executable. If so then run code specified by commandline and exit.
    
    get_logger()
        Return package logger -- if it does not already exist then it is created
    
    log_to_stderr(level=None)
        Turn on logging and add a handler which prints to stderr
    
    set_executable(executable)
        Sets the path to a python.exe or pythonw.exe binary used to run child processes on Windows instead of sys.executable. Useful for people embedding Python.

例子

multiprocessing用Process类来无数个创建进程对象,并用start()方法启动这个进程,一个简单的例子:
# -*- coding: utf-8 -*-

from multiprocessing import Process

def f(name, Blog):
    print "hello", name
    print "Blog:", Blog

if __name__ == '__main__':
    p = Process(target=f, args=("The_Third_Wave", "http://blog.csdn.net/zhanh1218"))
    p.start()
    p.join()
结果如下:
hello The_Third_Wave
Blog: http://blog.csdn.net/zhanh1218
注意事项:请不要再IDLE中运行这段代码,否则你什么也看不到。请在自己的IDE中运行,我用的是eclipse。具体原因我还没搞明白。待补充

那么要创建多个进程怎么办呢?使用线程池pool
# -*- coding: utf-8 -*-

from multiprocessing import Process
import os, time, datetime, random
from multiprocessing import Pool

def task_1(name):
    print 'Run task_1 %s (%s)...' % (name, os.getpid())
    time.sleep(random.randint(0, 3))
    print 'id = %s over at %s' % (name, datetime.datetime.now())

def task_2(x):
    time.sleep(1)
    print "x = %s is run at %s" %(x, datetime.datetime.now())
    return x*x

if __name__ == '__main__':
    print 'Parent process pid is %s. Start at %s' % (os.getpid(), datetime.datetime.now())
    p = Pool()
    for i in range(5):
        p.apply_async(task_1, args=(i,))  
    result = p.apply_async(task_2, [100])    
    print result.get(timeout=10)          
    print p.map(task_2, range(10))          
    print "HERE Time is %s"  %datetime.datetime.now()
    p.map(task_1, range(6, 10))  
    p.close()
    p.join()
    print 'All subprocesses done at %s' %datetime.datetime.now()
结果为:
Parent process pid is 4852. Start at 2014-06-12 16:27:13.824000
10000
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
HERE Time is 2014-06-12 16:27:17.105000
Run task_1 4 (9104)...
id = 4 over at 2014-06-12 16:27:16.047000
x = 6 is run at 2014-06-12 16:27:17.047000
x = 1 is run at 2014-06-12 16:27:16.105000
x = 8 is run at 2014-06-12 16:27:17.105000
Run task_1 0 (8604)...
id = 0 over at 2014-06-12 16:27:14.044000
Run task_1 2 (8604)...
id = 2 over at 2014-06-12 16:27:15.046000
x = 4 is run at 2014-06-12 16:27:16.105000
x = 7 is run at 2014-06-12 16:27:17.105000
x = 0 is run at 2014-06-12 16:27:16.105000
x = 9 is run at 2014-06-12 16:27:17.105000
x = 100 is run at 2014-06-12 16:27:15.097000
x = 5 is run at 2014-06-12 16:27:16.105000
Run task_1 7 (4152)...
id = 7 over at 2014-06-12 16:27:18.105000
x = 2 is run at 2014-06-12 16:27:16.105000
Run task_1 6 (7524)...
id = 6 over at 2014-06-12 16:27:18.105000
x = 3 is run at 2014-06-12 16:27:16.105000
Run task_1 8 (7500)...
id = 8 over at 2014-06-12 16:27:18.105000
Run task_1 1 (7392)...
id = 1 over at 2014-06-12 16:27:14.045000
Run task_1 3 (7392)...
id = 3 over at 2014-06-12 16:27:17.046000
Run task_1 9 (7392)...
id = 9 over at 2014-06-12 16:27:20.105000
All subprocesses done at 2014-06-12 16:27:20.149000
join([timeout])阻碍线程调用直到进程的join()方法被调用!调用join前必须前调用close方法。close方法阻止向进程池中提交任务,也就是不能继续添加新进程!

进程间通信

multiprocessing提供两种方式【Queues,Pipes】来进行进程间通信。

Queues

from multiprocessing import Process, Queue

def f(q):
    q.put("Hello The_Third_Wave")

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    
    p.join()
<pre name="code" class="python">结果为:Hello The_Third_Wave
Queues are thread and process safe.

Pipes

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). 
from multiprocessing import Process, Pipe

def f(conn):
    conn.send("Hello The_Third_Wave")
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print parent_conn.recv()  
    p.join()
The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time. 要注意同时读写数据可能会出错!
我们可以用锁来保持同步!改写上面一个例子立马看出差别
# -*- coding: utf-8 -*-

from multiprocessing import Process, Lock
import os, time, datetime, random

def task_1(lock, name):
    lock.acquire()
    print 'Run task_1 %s (%s)...' % (name, os.getpid())
    time.sleep(random.randint(1, 3))
    print 'id = %s over at %s' % (name, datetime.datetime.now())
    lock.release()
    
if __name__ == '__main__':
    print 'Parent process pid is %s. Start at %s' % (os.getpid(), datetime.datetime.now())
    lock = Lock()
    for i in range(5):
        Process(target=task_1, args=(lock, i)).start() 
    print 'Parent process done at %s' %datetime.datetime.now()
结果为:
Parent process pid is 7908. Start at 2014-06-12 17:09:12.726000
Parent process done at 2014-06-12 17:09:12.861000
Run task_1 1 (7104)...
id = 1 over at 2014-06-12 17:09:14.892000
Run task_1 0 (2440)...
id = 0 over at 2014-06-12 17:09:15.892000
Run task_1 2 (8812)...
id = 2 over at 2014-06-12 17:09:16.892000
Run task_1 3 (2552)...
id = 3 over at 2014-06-12 17:09:19.892000
Run task_1 4 (2172)...
id = 4 over at 2014-06-12 17:09:22.893000
可以看到主进程执行完毕,因为锁的存在,子进程还在逐步执行!

进程之间共享状态

进程间最好不要有共享的东西!

As mentioned above, when doing concurrent programming it is usually best to avoid using shared state as far as possible. This is particularly true when using multiple processes.

However, if you really do need to use some shared data then multiprocessing provides a couple of ways of doing so.

未完待续。。。

本文由@The_Third_Wave(Blog地址:http://blog.csdn.net/zhanh1218)原创。不定期更新,有错误请指正。

Sina微博关注:@The_Third_Wave 

如果这篇博文对您有帮助,为了好的网络环境,不建议转载,建议收藏!如果您一定要转载,请带上后缀和本文地址。