首页 > 代码库 > 队列,event,multiprocess
队列,event,multiprocess
队列:queue
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
有三种队列模式
- class
queue.
Queue
(maxsize=0) #先入先出
- class
queue.
LifoQueue
(maxsize=0) #last in fisrt out
- class
queue.
PriorityQueue
(maxsize=0) #存储数据时可设置优先级的队列
用法
>>> import queue >>> q=queue.Queue(maxsize=0) #如果maxsize小于0或者等于0表示队列无穷大 >>> q.qsize() 0 >>> q.put("1111") >>> q.qsize() #获得几个槽被占用了 1 >>> q.full() #如果队列的槽没有可利用的,则返回True False >>> q.empty() False >>> q.get() # 将queue中的item取出来 ‘1111‘ >>> q.empty() #如果队列的槽里没有item,则返回True True >>>
q.put(block=False) 相当于q.put_nowait()
q.get(block=False) 相当于q.get_nowait()
消费者生产者模式
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
import queue import threading q=queue.Queue(maxsize =0) #maxsize设置这个队列一共有无穷大 def producer(): for i in range(10): q.put("骨头%s" %i) #put后面可以接收很多基本的数据类型 print("开始等待所有的骨头被领走") q.join() #阻塞,当get任务全部被完成后才执行join后面的语句 print("骨头被领完了") def consumer(name): while not q.empty(): #如果槽里有东西 print("%s 吃了%s" %(name,q.get())) q.task_done()#告知任务完成,每一个get后面都需要加一个这个,如果程序里有join t= threading.Thread(target=producer) #获得一个线程对象 t.start() #启动线程 consumer(‘egon‘)
吃包子的例子
import random,queue,threading import time q=queue.Queue() def producer(name): count = 0 while count < 20: if q.qsize() < 8: time.sleep(random.randrange(2)) q.put(count) print("%s have make %s baozi" %(name,count)) else: print("baozi remain more than 8") count+=1 def consumer(name): count =0 while count < 8: time.sleep(random.randrange(3)) if not q.empty(): data = q.get() time.sleep(0.5) print(‘\033[32;1mConsumer %s has eat %s baozi...\033[0m‘ % (name, data)) else: print("no more baozi to eat") count+=1 t1=threading.Thread(target=producer,args=("lidachu1",)) t2=threading.Thread(target=producer,args=("lidachu2",)) c1=threading.Thread(target=consumer,args=("yuyang",)) c2=threading.Thread(target=consumer,args=("xiaowang",)) c3=threading.Thread(target=consumer,args=("shuaibai",)) t1.start() t2.start() c1.start() c2.start() c3.start() t1.join() t2.join() c1.join() c2.join() c3.join() print(q.qsize())
多进程multiprocess
multiprocessing
is a package that supports spawning processes using an API similar to the threading
module. The multiprocessing
package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.
multiprocess 是一个能支持产生多个进程的包,使用一个与threading模块类似的API(application programming interface)接口。这个multiprocessing 包能够提供本地和远程的并发(并行)),有效的回避全局解释器锁通过使用子进程而不是线程。由于这个,这个multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。它可以运行在UNIX和WIndows上。
multiprocess模块与threading模块的用法类似
import time import multiprocessing def add(): sum = 1 for i in range(2,100000): sum*=i print("sum:",sum) def multi(): count = 2 for i in range(21): count=count**2 print("count:",count) # s=time.time() # add() # multi() # print(time.time()-s) if __name__ == ‘__main__‘: p1=multiprocessing.Process(target=add) p2=multiprocessing.Process(target=multi) s=time.time() p1.start() p2.start() p1.join() p2.join() print(time.time()-s)
如果使用顺序执行这两个函数所需要的时间大致是20秒左右,如果使用多进程需要的时间是不到12秒。可以看出多进程的优势,前提是这两个函数的运行时间都差不多相同。
event事件
event=threading.Event() 用来进行两个或多个线程之间的交互
An event is a simple synchronization object; event是一个简单的同步对象
the event represents an internal flag, and threads event代表了内部的标志位和线程
can wait for the flag to be set, or set or clear the flag themselves. 能等待标志位被设置或者清除这个flag
event = threading.Event()
# a client thread can wait for the flag to be set
event.wait()
# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything. 如果flag被设置,wait方法不会起任何作用
If the flag is cleared, wait will block until it becomes set again. 如果flag为FALSE,wait方法会阻塞,直到flag设置为True。
Any number of threads may wait for the same event 任何数量的线程可能等待这相同的event
在初始化时,默认的标志位为False,
event.set() # 将flag设置为True
event.clear() #将flag设置为False
event.is_set() #返回一个bool 值,判断标志位的状态
event.wait() #等待
import threading import time,random event=threading.Event() def foo(): event.wait() time.sleep(1) print("first>>>") def bar(): sum = 0 for i in range (10): sum+=i print(sum) event.set() t1=threading.Thread(target=foo) t2=threading.Thread(target=bar) t1.start() t2.start()
红绿灯
# 2、设计一个关于红绿灯的线程,5个关于车的线程; # # 对于车线程,每隔一个随机秒数,判断红绿灯的状态,是红灯或者黄灯,打印waiting;是绿灯打印running。 # # 对于红绿灯线程: # 首先默认是绿灯,做一个计数器,十秒前,每隔一秒打印“light green”;第十秒到第十三秒,每隔一秒打印“light yellow”,13秒到20秒, # ‘light red’,20秒以后计数器清零。重新循环。 # # 知识点:event对象(提示:event对象即红绿灯,为true是即绿灯,false时为黄灯或者红 import threading import time,random def car(name): while True: time.sleep(random.randrange(20)) if event.is_set(): print(‘\033[41;1m--%s running---\033[0m‘ %name) elif not event.is_set(): print(‘\033[45;1m--%s waiting---\033[0m‘ %name) def light(): count = 0 event.set() while True: if count < 10: print("----light green----") time.sleep(1) count+=1 elif count >= 10 and count < 13: event.clear() print("----light yellow----") time.sleep(1) count+=1 elif count >= 13 and count < 20: print("----light red----") time.sleep(1) count+=1 elif count == 20: count =0 event.set() if __name__ == "__main__": event=threading.Event() t_light=threading.Thread(target=light) t_light.start() for i in range(5): t_car=threading.Thread(target=car,args=("car%s" %i,)) t_car.start()
队列,event,multiprocess