首页 > 代码库 > 多进程
多进程
python3.6
多进程
多进程
Table of Contents
- 1. 多进程
- 1.1. linux/unix/win 启动方式对比
- 1.1.1. spawn win的默认方法,支持unix/win
- 1.1.2. fork unix的默认方法,仅支持unix
- 1.1.3. forkserver 当平台支持unix的管道文件时该方法可用
- 1.1.4. 启动子进程
- 1.1.5. 选择启动的方法
- 1.1.6. 其他内容
- 1.2. 进程通信
- 1.2.1. 队列 Queue
- 1.2.2. TODO 管道 Pipes
- 1.2.3. 实例
- 1.3. 进程同步
- 1.3.1. 锁Lock
- 1.3.2. 实例
- 1.4. 进程池 Pool
- 1.4.1. 创建Pool([processes[,initalizer[,initargs[,maxtasksperchild[,context]]]])
- 1.4.2. AsyncResult applyasync,mapasync与starmapasync的返回对象
- 1.4.3. 实例
- 1.5. 资源共享
- 1.5.1. 使用Array,Value作为存储空间来保存需要共享的资源
- 1.5.2. 实例
- 1.5.3. 使用服务进程server process
- 1.1. linux/unix/win 启动方式对比
1 多进程
1.1 linux/unix/win 启动方式对比
在不同平台上系统使用的多进程机制是不一样的,所以在python实现中有三种不同的开启多进程的方式
1.1.1 spawn win的默认方法,支持unix/win
父进程开启一个新的python解释程序 子进程只获得足够运行run()方法的资源 父进程的文件描述符和句柄不被继承 此方式的速度在三种方式中最慢
- run() 该方法是target的参数,参数是一个可调用对象即可
经测试,当run调用的是function时,资源仍无法保留,只有调用一个可调用对象且初始化方法(init)除self外有其他参数才行
def hello(): pass class Hello(): def __init__(self,name): print(name) hello.__init__() # 不需要传参,所以资源无法保留 hello_cla = Hello(‘name‘) hello_cla.__init__(‘name‘) # 有传参,资源才能保留
实例:
from multiprocessing import Process, Queue import multiprocessing as mp import os def hello_procs(name): print("这是{}{}进程".format(name,os.getpid())) print("父进程ID%s"%os.getppid()) try: print(sourceA) print("调用成功,因为未使用spawn方式") except Exception as e: print("调用失败,因为该资源不是必须资源") print(e) class Sou(): def __init__(self,soua): self.soua = soua def show_soua(self): print(self.soua) def __call__(self): self.show_soua() if __name__ == ‘__main__‘: sourceA = "AA" sourceB = "BB" # 使用forkserver在资源继承方面与spawn表现一致 mp.set_start_method(‘spawn‘) print("当前进程的进程id是%s"%os.getpid()) #p = Process(target=hello_procs, args=(‘第一个进程‘,)) a = Sou(sourceA) p = Process(target=a) """ 上面的是不带参数的传递方式,如需要给可调用函数传递参数,那么需要修改为__call__(self,参数) 再接下来就是修改Process参数为(target=a,args=(‘参数‘,)) """ print(‘开始进入子进程‘) p.start() p.join() print("进程结束")
1.1.2 fork unix的默认方法,仅支持unix
使用unix的fork()[os.fork()]来创建一个当前解释器的子进程 子进程获得父进程全部的资源 此方式的安全问题不好控制
1.1.3 forkserver 当平台支持unix的管道文件时该方法可用
在使用这种方式时,开启多进程会开启一个额外的服务进程 当需要一个子进程时父进程去请求服务进程并得到一个子进程 由于服务进程是单线程的,所以该方式是线程安全的
1.1.4 启动子进程
- 创建进程对象 表示在单独进程中运行的活动
Process(group=None,target=None,name=None,args=(),kwargs={},*,daemon=None) group一直为None即可,只是为了和threading.Thread兼容 target 是run()要调用的对象,需要是可执行的 name 无实意,名字 args 可调用对象的位置参数 kwargs 可调用对象的关键字参数 daemon True/False/None 与过程继承有关
- run() 表示进程活动的方法
- start() 启动进程的活动
- isalive() 检测子进程是否存活,只能检测子进程
- join([timeout]) 阻塞调用该方法的进程
None 阻塞,直到进程完毕 正数 阻塞timeout秒
- daemon 标识该进程是否为守护进程,True是,False不是,None从上层继承
主进程不是守护进程,所以只要不明确指定为True,那么创建的所有进程都不是守护进程
from multiprocessing import Process as ps import sys def hello(*,a=None): if a == None: pass else: print(a) def not_sh(): global pish,pfsh,ex print("非守护进程%s"%ex) pish = ps(target=is_sh, args=(1,), daemon=True) pfsh = ps(target=is_sh, args=(‘非守护进程‘,)) #daemon为None从上层继承属性 pish.start() pfsh.start() for i in range(1): #当该值为1时可以看到守护进程在打印出消息前就退出了 print("第二个主进程") ex="有变化" def is_sh(jc_type): if jc_type == 1: print("守护进程") for i in range(1000): print("守护进程") else: print() print("不是守护进程") for i in range(100): print("非守护进程") if __name__ == ‘__main__‘: p = ps(target=hello, name=‘hello‘, args=(), kwargs={‘a‘:‘A‘}) print("进程是否存活%s"%p.is_alive()) p.start() print("守护进程daemon? %s"%p.daemon) print("进程是否存活%s"%p.is_alive()) # 创建一个非守护进程 pish, pfsh =None,None ex = "原始" pnsh = ps(target=not_sh) pnsh.start() pnsh.join() print(ex)
1.1.5 选择启动的方法
import multiprocessing multiprocessing.set_start_method(‘spawn‘) #传入方式的名字 # 该方法在程序中至多使用一次
1.1.6 其他内容
- 使用global可以将数据发送到子进程,但是子进程对数据的修改不会反馈到父进程
1.2 进程通信
1.2.1 队列 Queue
- Queue([maxsize]) 创建并可以设置最大值
- qsize() 返回队列大致大小(不准确是因为并发) 在MAC上回引发异常NotImplementedError
- empty() 是否为空,不准确
- full() 队列是否已满,不准确
- put(obj[,block[,timeout]])
- putnowait(obj) 相当于put(obj,False)
- get()
- getnowite()
- close() 没有返回值,关闭队列,后台线程将数据一次性刷新到管道当关闭后仍去操作会抛出异常OSError: handle is closed
- jointhread() 只能在close()后调用,他阻塞直到后台线程退出,确保数据刷新到管道
- canceljointhread() 立即关闭队列,不等待后台线程将数据刷新到管道
1.2.2 TODO 管道 Pipes
1.2.3 实例
"""进程间通信""" from multiprocessing import Process, Queue, Pipe import os, random def write(m): print(‘进程:%s‘%os.getpid()) m.put(‘数据A‘) """ put(obj[,block[,timeout]]) 将值放入队列 当block为True(默认值),且timeout为None(默认值)时,不会抛出异常,会一直等到可以入队时将值入队 当timeout为正值时,等待timeout秒,超时则抛出queue.Full异常 当block为False时,一旦无法入队立即抛出异常 """ def read(m): print(‘进程:%s‘%os.getpid()) try: print(m.qsize()) except Exception as e: print("在MAC上会引发异常") finally: value = http://www.mamicode.com/m.get()""" get([block[,timeout]]) 获取一个值后删除 当block为True(默认值)且timeout为None(默认值),那么只有当队列中有内容时获取值 timeout为正数时,当队列中无值时阻塞timeout秒,而后仍无值则抛出queue.Empty异常 block为False时一旦无值立即抛出异常 """ print(value) if __name__ == ‘__main__‘: q = Queue() pw = Process(target=write, name=‘写进程‘, args=(q,)) print(‘开始写入数据 %s‘%pw.name, end=‘ : ‘) pw.start() pr = Process(target=read, name=‘读进程‘, args=(q,)) print(‘开始读取数据 %s‘%pr.name, end=‘ : ‘) pr.start() # TODO Pipe 通过管道传递消息
1.3 进程同步
1.3.1 锁Lock
一旦进程,线程获得了锁,那么随后的任何进程,线程在获取锁时将阻塞
- acquire(block=True, timeout=None)
当block为True,方法调用将阻塞,直到解锁 当block为True时,timeout为正数,那么最多只能被阻塞timeout秒,当timeout为负数,阻塞时长为0,当为None一直阻塞
- release() 释放锁
一把锁可以被任意对象释放,未必是上锁的对象来解锁
1.3.2 实例
from multiprocessing import Process, Lock def show_lock(l): #l.release() 在try_get_lock中上的锁可以在这里解开 l.acquire(True,-1) # 超时时长为负数,即使被锁定也会执行 print("函数正常执行") def try_get_lock(l): l.acquire() print("获得了锁") # l.release() if __name__ == ‘__main__‘: l = Lock() #l.acquire(True) pg = Process(target=try_get_lock,args=(l,)) pg.start() ps = Process(target=show_lock,args=(l,)) ps.start()
1.4 进程池 Pool
1.4.1 创建Pool([processes[,initalizer[,initargs[,maxtasksperchild[,context]]]])
processes 进程的数量 initializer 如果不为None,则在每个工作进程启动时调用initializer(*initargs) maxtasksperchild context 工作进程的上下文
该类实现了上下文管理
- apply(func[,args[,kwds]])
使用args,kwds调用func,直到结果完成
- applyasync(func[,args[,kwds[,callback[,errorcallback]]]])
返回一个结果对象 返回的对象是AsyncResult 当指定callback(一个接受单参数的可调用对象)时,完成时会调用callback,调用失败则调用errorcallback 回调应该立即完成,否则线程将会阻塞
- map(func,iterable[,chunksize])
与内置函数map()相同,它阻塞直到map完成
- mapasync(func,iterable[,chunksize[,callback[,errorcallback]]])
返回结果的map()
- imap(func,iterable[,chunkszie])
惰性map() chunkszie参数与map()方法的参数相同
- starmap(func,iterable[,chunksize]) iterable必须为可迭代对象
需要注意‘abc‘也是可迭代对象,一旦加上(),(‘abc‘)更不行 func,(‘abc‘,) 会给func传入三个参数,而不是一个整体 正确做法 传入((‘abc‘,),),同理,传入其他可迭代内容也可以这样做
- starmapasync(func,iterable[,chuunksize[,callback[,errorback]]])
将iterable拆分后调用func并返回一个结果对象
- close()
一旦任务完成,退出进程
- terminate()
立即停止进程并退出
- join()
等待进程结束,在此之前必须调用close或terminate
1.4.2 AsyncResult applyasync,mapasync与starmapasync的返回对象
- get([timeout]) 返回结果,并要求在timeout秒内到达,当timeout不为None时,N秒内未到达抛出异常TimeoutError
- wait([timeout]) 等待结果或直到N秒超时
- ready() 判断返回是否就绪
- successful() 返回调用是否完成且无异常
1.4.3 实例
from multiprocessing import Pool, TimeoutError, Process import time import os def proc_pool(name): print("asd") for i in range(5): print(str(i)+‘ : %s‘) #return "返回的结果值","有两个会怎样?" 不要返回一个以上的值,会导致map调用产生歧义(使用map(func,[1,2])时会返回[返回值1,返回值2]而不是[(返回值1,返回值2),(返回值1,返回值2)]) 当需要返回两个值要显式的返回一个元组 #return ("返回的结果值","第二个值") return "返回值" def proc_err(name): raise Exception def proc_mm(name): print(‘该函数被调用了%s%s‘%(name,type(name))) return name if __name__ == ‘__main__‘: print("开始启动线程池") p = Pool(4) for i in range(5): p.apply_async(proc_pool, args=(‘cc‘,)) #p.map(proc_pool,[‘cc‘,‘dd‘]) def callback(name): print("回调函数%s") def err_callback(err): try: print("yc") except Exception as e: print(‘发生异常‘) finally: print("ww") mapr = p.map_async(proc_err, ‘ee‘, 3, callback, err_callback) #mapr.get() 获得可调用对象的返回值 #print("返回的结果值%s"%mapr.get()) #mmap = p.starmap(proc_mm,[(‘abcd‘),(‘a‘)]) mmap = p.starmap(proc_mm,(((‘abc‘,),),)) mmaps = p.starmap_async(proc_mm,(((‘abc‘,),),)) list(mmap) print(mmaps.get()) p.close() p.join()
1.5 资源共享
1.5.1 使用Array,Value作为存储空间来保存需要共享的资源
Value(typecodeortype, *args, lock=True) Array(typecodeortype, sizeorinitializer, *, lock=True)
- Array,Value的共通之处
在创建存储空间时,在lock的参数选取上,默认情况是自己创建一个资源锁 但是也可以选择使用一个已经存在的锁,当lock被传入一个已存在的锁时受该锁影响 当设置为False时,资源不被锁保护,导致线程不安全
tyoecodeortype 都是array模块使用的类型代码 array 表示基本类型的数组,有:字符,整数,浮点数
- 区别
Array存储一个队列,Value存储一个值 Array的sizeofinitializer就是保存的数组,同时该数组的长度也是Array的长度
1.5.2 实例
""" 进程共享内容 使用Value,Array使内容共享 """ from multiprocessing import Process, Value, Array, Lock def f(n, a): n.value = http://www.mamicode.com/3.1415927>
1.5.3 使用服务进程server process
- 使用Manager()会返回一个管理对象
该管理对象支持的类型更广泛,有: list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array
该类实现了上下文管理
- Manager的两个子类,Manager()返回的就是SyncManager
- BaseManager([adress[,authkey]])
adress是管理器进程侦听新链接的地址,None为随机选一个 authkey是认证密匙,None为使用currentprocess().authkey,否则使用authkey,必须为字符串 currentprocess() 返回当前Process对象 authkey 进程的认证密钥(字节字符串) 当初始化multiprocessing时,使用os.urandom()为主进程分配一个随机字符串 当创建Process对象时,它将继承其父进程的认证密钥 但可以通过将authkey设置为另一个字节字符串来更改。
- start([initializer[,initargs]])
启动子过程以启动管理器
- getserver()
返回Server对象,他表示在manger控制下的实际服务器
- connect()
本地管理器对象链接到远程管理器进程
- shutdown()
停止manager进程,仅当启动使用start()时可用
- register(typeid[, callable[, proxytype[, exposed[, methodtotypeid[, createmethod]]]]])
向Manager注册类型或可调用的类方法
typeid 用于标识特定类型的共享对象的类型标识符,必须是字符串 callable 用于typeid类型的可调用选项, proxytype 是BaseProxy的子类,用于创建typeid的共享对象代理,None,自动创建 exposed 用于指定代理类型所使用的方法 methodtotypeid 返回代理类型的公开方法 createmethod 确定是否使用typeid创建方法,默认为True
- start([initializer[,initargs]])
- SyncManager
BaseManager 主要用来创建自定义的Manager
- Queue([maxsize]) 创建queue.Queue对象返回其代理
在进程通信中展示了部分Queue队列的使用方法
- Array(typecode,sequence) 创建一个数组并返回其代理
在进程共享中展示了部分Array的用法
- Value(typecode,value) 创建一个值并返回其代理
在进程共享中展示了部分Value的用法
- dict([dict]) 创建一个dict,并返回其代理
- list([list]) 创建一个list,并返回其代理
- Lock() 创建一个threading.Lock对象并返回其代理
- Queue([maxsize]) 创建queue.Queue对象返回其代理
- 实例
from multiprocessing import Process, Manager def f(d, l, q, a, v, lo): d[1] = ‘1‘ d[‘2‘] = 2 d[0.25] = None q.put(100) lo.acquire(True,3) for i in range(len(a)): a[i]=1 v,value = http://www.mamicode.com/100"********") print(q.get()) print(a[:]) print(v.value)
- BaseManager([adress[,authkey]])
多进程