首页 > 代码库 > Python--线程队列(queue)、multiprocessing模块(进程对列Queue、管道(pipe)、进程池)、协程

Python--线程队列(queue)、multiprocessing模块(进程对列Queue、管道(pipe)、进程池)、协程

队列(queue)

队列只在多线程里有意义,是一种线程安全的数据结构。

get与put方法

 

‘‘‘

创建一个“队列”对象

import queue
q = queue.Queue(maxsize = 10)
queue.Queue类即是一个队列的同步实现。
队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。 将一个值放入队列中: q.put()
调用队列对象的put()方法在队尾插入一个项目。
put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为True。如果队列当前为空且block为True,put()方法就使调用线程暂停,直到空出一个数据单元
如果block为False,put方法将引发Full异常。

import queue

q=queue.Queue(3)

q.put(11)
q.put(22)
q.put(33)
q.put(44,False)  #queue.Full      ==q.put_nowait()
 
将一个值从队列中取出
q.get()

调用队列对象的get()方法从队头删除并返回一个项目。
可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
import queue

q=queue.Queue(3)

q.put(11)
q.put(22)
q.put(33)

q.get()
q.get()
q.get()
q.get(False) # queue.Empty        == q.get_nowait() 
‘‘‘

 

  join与task_done方法

join() 阻塞进程,直到所有任务完成,需要配合另一个方法task_done。

task_done() 表示某个任务完成。每一条get语句后需要一条task_done。


import queue,threading

q=queue.Queue()

def foo():
    q.put(11)
    q.put(22)
    q.put(33)
    q.join()
    print(‘ok‘)

def bar():
    print(q.get())
    q.task_done()
    print(q.get())
    q.task_done()
    print(q.get())
    q.task_done()

t1=threading.Thread(target=foo)
t1.start()

t2=threading.Thread(target=bar)
t2.start()
运行效果:
11
22
33
ok

‘‘‘

此包中的常用方法(q = Queue.Queue()):

q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)非阻塞
q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作

‘‘‘ 

其他模式

‘‘‘

Python Queue模块有三种队列及构造函数: 

1、Python Queue模块的FIFO队列先进先出。  class queue.Queue(maxsize) 
2、LIFO类似于堆,即先进后出。           class queue.LifoQueue(maxsize) 
3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) 


import queue

#先进后出

q=queue.LifoQueue()

q.put(34)
q.put(56)
q.put(12)

#优先级
q=queue.PriorityQueue()
q.put([5,100])
q.put([7,200])
q.put([3,"hello"])
q.put([4,{"name":"alex"}])

while 1:
  data=http://www.mamicode.com/q.get()>

 生产者消费者模型

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

生产者:生产数据的模型

消费者:获取数据的模型

 生产者消费者模型优点:

  1.解耦合

  2.实现并发

 

import threading,queue,random,time

def producer():
    count=1
    while count<10:
        baozi=random.randint(1,100)
        q.put(baozi)
        print(‘baozi %s 做好了‘%baozi)
        time.sleep(1)
        count+=1

def consumer(id):
    while 1:
        baozi=q.get()
        time.sleep(2)
        print(‘顾客%s吃了包子%s‘%(id,baozi))

if __name__ ==‘__main__‘:
    q=queue.Queue()
    t1=threading.Thread(target=producer)
    t1.start()

    for i in range(3):
        t=threading.Thread(target=consumer,args=(i,))
        t.start()
运行效果:(部分)
baozi 4 做好了
baozi 79 做好了
顾客0吃了包子4
baozi 58 做好了
顾客1吃了包子79
baozi 28 做好了

 

multiprocessing模块

由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。

multiprocessing包是Python中的多进程管理包。

它可以利用multiprocessing.Process对象来创建一个进程。也有start(), run(), join()的方法。

多进程优缺点:

  优点:可以利用多核,实现并行运算

  缺点:

    1.开销太大

    2.通信困难

需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

import multiprocessing 
import time

def foo():
    print(‘ok‘)
    time.sleep(2)

if __name__ ==‘__main__‘:
    p=multiprocessing.Process(target=foo)
    p.start()
    print(‘ending‘)
运行结果:
ending
ok

 

python的进程调用


# Process类调用

from multiprocessing import Process import os import time def info(name): print(‘name:‘,name) print(‘parent process:‘,os.getppid()) print(‘process id:‘,os.getpid()) print(‘-------------‘) time.sleep(1) def foo(name): info(name) if __name__ ==‘__main__‘: info(‘main process line‘) p1=Process(target= info,args=(‘alex‘,)) p2=Process(target=foo,args=(‘egon‘,)) p1.start() p2.start() p1.join() p2.join() print(‘ending‘) 运行效果: name: main process line parent process: 3012 process id: 6836 ------------- name: alex parent process: 6836 process id: 8028 ------------- name: egon parent process: 6836 process id: 1540 ------------- ending

# 继承Process类调用



from multiprocessing import Process
import time

class Myprocess(Process):
def __init__(self):
super(Myprocess,self).__init__()

def run(self):
print(‘hello‘,self.name,time.ctime())
time.sleep(1)

if __name__ == ‘__main__‘:
l=[]
for i in range(3):
p=Myprocess()
p.start()
l.append(p)

for p in l:
p.join()

print(‘ending‘)

运行效果:
hello Myprocess-1 Fri Jul 21 16:58:36 2017
hello Myprocess-2 Fri Jul 21 16:58:36 2017
hello Myprocess-3 Fri Jul 21 16:58:36 2017
ending

 

process类

注意:在windows中Process()必须放到# if __name__ == ‘__main__‘:下

原因:

由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。
如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。
这是隐藏对Process()内部调用的原因,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前还没有实现,库引用中提示必须是None; 
  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。(args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号, kwargs表示调用对象的字典,kwargs={‘name‘:‘egon‘,‘age‘:18})

实例方法:

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():启动进程,并调用该子进程中的p.run() 

  run():strat()调用run方法,如果实例进程时未制定传入target,这start默认执行run()方法。

  terminate():不管任务是否完成,立即停止工作进程

属性:

  daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

  name:进程名字。

  pid:进程号。

 通过tasklist(Win)或者ps -elf |grep(linux)命令检测每一个进程号(PID)对应的进程名

进程间通讯 

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

进程对列Queue(队列就是管道加锁实现的

from multiprocessing import Queue,Process

def foo(q):
    q.put([11,‘hello‘,True])

if __name__ ==‘__main__‘:
    q=Queue()

    p=Process(target=foo,args=(q,))
    p.start()

    print(q.get())
运行效果:
[11, ‘hello‘, True]

 

 管道(pipe)

Pipe()返回的两个连接对象代表管道的两端。

每个连接对象都有send()和recv()方法(等等)。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。

注意:send()和recv()方法使用pickle模块对对象进行序列化。

conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法

请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。

Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道

dumplex:默认管道是全双工的,如果将duplex设置成False,conn1只能用于接收,conn2只能用于发送。

from multiprocessing import Pipe,Process

def foo(sk):
    sk.send(‘hello world‘)
    print(sk.recv())

if __name__ == ‘__main__‘:
    sock,conn=Pipe()
    p=Process(target=foo,args=(sock,))
    p.start()

    print(conn.recv())
    conn.send(‘hi son‘)
运行效果:
hello world
hi son

 manager对象实现数据共享

Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据

进程间通信应该尽量避免使用本节所讲的共享数据的方式

 

 

from multiprocessing import Manager,Process

def foo(l,n):
    l.append(n**2)

if __name__ == ‘__main__‘:
    manager=Manager()
    mlist=manager.list([11,22,33])

    l=[]
    for i in range(5):
        p=Process(target=foo,args=(mlist,i))
        p.start()
        l.append(p)

    for i in l:
        i.join()

    print(mlist)
运行效果:
[11, 22, 33, 1, 0, 9, 4, 16]

 

from multiprocessing import Manager,Process

def foo(dic,new_dic):
    dic.update(new_dic)

if __name__ == ‘__main__‘:
    manager=Manager()
    mdict=manager.dict({‘a‘:1,‘b‘:2})

    l=[]
    for i in range(3):
        p=Process(target=foo,args=(mdict,dict.fromkeys([‘c‘,‘d‘,‘e‘],i)))
        p.start()
        l.append(p)

    for p in l:
        p.join()

    print(mdict)
运行效果:
{‘a‘: 1, ‘b‘: 2, ‘c‘: 2, ‘d‘: 2, ‘e‘: 2}

 

进程池

开多进程的目的是为了并发,如果有多核,通常有几个核就开几个进程,进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行

但很明显需要并发执行的任务要远大于核数,这时我们就可以通过维护一个进程池来控制进程数目

当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

from multiprocessing import Pool
import time

def foo(n,):
    print(n)
    time.sleep(1)

if __name__ == ‘__main__‘:
    pool=Pool(5)
    for i in range(100):
        pool.apply_async(func=foo,args=(i,))

    pool.close()
    pool.join()

    print(‘ending‘)
运行效果:
可以理解该程序为:5人共搬100块砖,动作同步,5人每次共搬5块,休息1秒,继续下一次...共搬20次

协程(重点)

英文名Coroutine。

协程,又称微线程,是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的

yield与协程

首先回顾yield用法:

def foo():
    print(‘foo‘)
    yield
    print(‘foo2‘)
    
    return

def bar():
    print(bar)
    yield 
    print(‘bar2‘)
    
foo() #创建了一个生成器对象,不会执行代码
bar() #创建了一个生成器对象,不会执行代码

 

def foo():
    print(‘foo‘)
    yield 5
    print(‘foo2‘)
    
    yield 8

gen=foo()

ret=next(gen)
print(ret)
res=next(gen)
print(res)
运行效果:
foo
5
foo2
8

 

def foo():
    print(‘foo‘)
    n=yield 5
    print(‘n‘,n) #123
    print(‘foo2‘)

    yield 8

gen=foo()

ret=next(gen)
print(ret) #5
res=gen.send(‘123‘)
print(res) #8
运行效果:
foo
5
n 123
foo2
8

 基于yield生成器函数实现生产者消费者模型

"""
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
"""
import time

def consumer():
    r = ‘‘
    while True:
        n = yield r  # 3、consumer通过yield拿到消息,处理,又通过yield把结果传回;
        if not n:
            return
        print(‘[CONSUMER] ←← Consuming %s...‘ % n)
        time.sleep(1)
        r = ‘200 OK‘
def produce(c):
    next(c)  #1、首先调用c.next()启动生成器
    n = 0
    while n < 5:
        n = n + 1
        print(‘[PRODUCER] →→ Producing %s...‘ % n)
        cr = c.send(n)  #2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
                        #4、produce拿到consumer处理的结果,继续生产下一条消息;
        print(‘[PRODUCER] Consumer return: %s‘ % cr)
    c.close()  #5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
if __name__==‘__main__‘:
    # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
    c = consumer()
    produce(c)
运行效果:
[PRODUCER] →→ Producing 1...
[CONSUMER] ←← Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 2...
[CONSUMER] ←← Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 3...
[CONSUMER] ←← Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 4...
[CONSUMER] ←← Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 5...
[CONSUMER] ←← Consuming 5...
[PRODUCER] Consumer return: 200 OK

 greenlet

from greenlet import greenlet

def foo():
    print(‘ok1‘)
    gr2.switch()
    print(‘ok3‘)
    gr2.switch()

def bar():
    print(‘ok2‘)
    gr1.switch()
    print(‘ok4‘)

gr1=greenlet(foo)
gr2=greenlet(bar)
gr1.switch()
运行效果:
ok1
ok2
ok3
ok4

 gevent模块实现协程(yield、greenlet都无法实现遇到IO操作自动切换到其它协程,就用到了gevent模块(select机制)

由于IO操作非常耗时,经常使程序处于等待状态,gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成

import gevent

def foo():
    print(‘running in foo‘)
    gevent.sleep(2) #模拟IO操作
    print(‘switch to foo again‘)

def bar():
    print(‘switch to bar‘)
    gevent.sleep(1)
    print(‘switch to bar again‘)

gevent.joinall([gevent.spawn(foo),gevent.spawn(bar)])
运行效果:
running in foo
switch to bar
switch to bar again
switch to foo again

 

g=gevent.spawn()创建一个协程对象g

spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的

对比普通函数执行和使用协程后的效果:

from gevent import monkey
monkey.patch_all()
# import gevent
import requests
import time

def f(url):
    response = requests.get(url)
    response_str=response.text
    print(‘get data %s---url[%s]‘ % (len(response_str), url))

start=time.time()

f(‘https://itk.org/‘)
f(‘https://www.github.com/‘)

print(time.time()-start)
运行效果:
get data 12323---url[https://itk.org/]
get data 56751---url[https://www.github.com/]
5.5523176193237305

 

from gevent import monkey
monkey.patch_all()
import gevent
import requests
import time

def f(url):
    response = requests.get(url)
    response_str=response.text
    print(‘get data %s---url[%s]‘ % (len(response_str), url))

start=time.time()

gevent.joinall([
        gevent.spawn(f, ‘https://itk.org/‘),
        gevent.spawn(f, ‘https://www.github.com/‘),
])

print(time.time()-start)
运行效果:
get data 12323---url[https://itk.org/]
get data 56751---url[https://www.github.com/]
3.939225435256958

 

需要强调的是:

  1. python的线程属于内核级别的,即由操作系统控制调度(如单线程一旦遇到io就被迫交出cpu执行权限,切换其他线程运行)

  2. 单线程内开启协程,一旦遇到io,从应用程序级别(而非操作系统)控制切换

对比操作系统控制线程的切换,用户在单线程内控制协程的切换,优点如下:

  1.  协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

  2. 单线程内就可以实现并发的效果,最大限度地利用cpu

缺点:

  1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
  2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

作业:

   

第一个作业:使用进程池爬取网页内容,自己往进程池累加进程,找到可以实现效果的最短时间

import re import requests
from multiprocessing import Pool import time def run(n): url=http://www.budejie.com/pic/ + str(n) response = requests.get(url).text ret = re.compile(<div class="j-r-list-c-desc">.*?<a href=http://www.mamicode.com/.*?>(?P
.*?).*?
, re.S) obj = ret.findall(response) print(obj) if __name__ ==__main__: s=time.time() pool=Pool(4) for i in range(1,11): pool.apply_async(func=run,args=(i,)) pool.close() pool.join() print(cost time:%s%(time.time()-s))






第二个作业:基于协程的爬虫示例:建议使用requests模块(urllib模块不太方便)(基于gevent库)
import re
import requests
import gevent
import time

def run(url):

    response = requests.get(url).text
    ret = re.compile(‘<div class="j-r-list-c-desc">.*?<a href=http://www.mamicode.com/.*?>(?P
.*?).*?
‘, re.S)>

 

Python--线程队列(queue)、multiprocessing模块(进程对列Queue、管道(pipe)、进程池)、协程