首页 > 代码库 > python queue

python queue

 

 

 

线程中的Queue

 1 import time
 2 import threading
 3 import queue
 4 import random
 5 
 6 def putMessage():
 7     for i in "Hello World!!!":
 8         q.put(i)
 9         time.sleep(random.random())
10         # print("size:%s"%q.qsize())            # 查看队列长度
11         #
12         # print("full:%s"%q.full())             # 查看队列是否为满的状态
13         #
14         # print("empty:%s"%q.empty())        # 查看队列是否为空的状态
15 
16 
17 def getMessage():
18     while True:
19         if not q.empty():
20             print(q.get())
21         else:
22             time.sleep(random.random())
23 
24 
25 if __name__ == "__main__":
26     q = queue.Queue()
27 
28     t1 = threading.Thread(target=putMessage)
29     t1.setDaemon(True)
30     t1.start()
31 
32     t2 = threading.Thread(target=getMessage)
33     t2.setDaemon(True)
34     t2.start()
35 
36     time.sleep(10)

 

进程中的Queue

 1 from multiprocessing import Queue
 2 
 3 q = Queue(3)    # 初始化一个Queue对象,最多可以put三条信息,如果不写3,那么久无限制
 4 
 5 q.put("Message01")         # 添加信息的方法
 6 q.put("Message02")
 7 print(q.full())          # 查看 队列 是否满了的方法
 8 
 9 q.put("Message03")
10 print(q.full())
11 
12 # 因为队列已经满了,所以下面的消息会出现异常,第一个 try 会等待2秒后再抛出异常,
13 # 第二个 try 会立刻抛出异常
14 try:
15     q.put("Message04", True, 2)
16 except:
17     print("消息队列已满,现有消息数量:%s"%q.qsize())
18 
19 try:
20     q.put_nowait("Message04")
21 except:
22     print("消息队列已满,现有消息数量:%s"%q.qsize())
23 
24 # 推荐使用的方式,先判断队列是否已满,再写入
25 if not q.full():
26     q.put_nowait("Message04")
27 
28 # 读取消息的时候,先判断消息队列是否为空,再读取
29 if not q.empty():
30     for i in range(q.qsize()):
31         print(q.get_nowait())

队列:
  为什么要用队列?列表也很好用啊。:数据安全
  创建方法:
    模式1:FIFO -- queue.Queue()
    模式2:FILO -- queue.LifoQueue()
    模式3:priorty -- queue.PriorityQueue()
      q.put([1, ‘hello‘])
      q.put([2, ‘world‘])
      级别 1 比 2 低, 1 先出来

  方法的参数:
    put()
      调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常
    get()
      调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

  其它方法:
    q.empty() 如果队列为空,返回True,反之False
    q.full() 如果队列满了,返回True,反之False
    q.full 与 maxsize 大小对应
    q.get([block[, timeout]]) 获取队列,timeout等待时间
    q.get_nowait() 相当q.get(False)
    非阻塞 q.put(item) 写入队列,timeout等待时间
    q.put_nowait(item) 相当q.put(item, False)
    q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号

    q.join() 实际上意味着等到队列为空,再执行别的操作
    # join多少次,就需要用几次 task_done

多进程优点:

  1. 可以利用多核实现并行运算

缺点:

  1. 开销大
  2. 通信困难

 

管道Pipe

multiprocessing.Pipe([duplex])
返回2个连接对象(conn1, conn2),代表管道的两端,
默认是双向通信.如果duplex=False,conn1只能用来接收消息,conn2只能用来发送消息.

主要用到的方法:
send() 发送数据
recv() 接收数据
 1 import multiprocessing
 2 
 3 
 4 from multiprocessing import Process, Pipe
 5 
 6 def send(pipe):
 7     pipe.send([spam] + [42, egg])
 8     pipe.close()
 9 
10 def talk(pipe):
11     pipe.send(dict(name = Bob, spam = 42))
12     reply = pipe.recv()
13     print(talker got:, reply)
14 
15 if __name__ == __main__:
16     (con1, con2) = Pipe()
17     sender = Process(target = send, name = send, args = (con1, ))
18     sender.start()
19     print("con2 got: %s" % con2.recv())#从send收到消息
20     con2.close()
21 
22     (parentEnd, childEnd) = Pipe()
23     child = Process(target = talk, name = talk, args = (childEnd,))
24     child.start()
25     print(parent got:, parentEnd.recv())
26     parentEnd.send({x * 2 for x in spam})
27     child.join()
28     print(parent exit)

 

进程间的信息共享Manage

Python中进程间共享数据,处理基本的queue,pipe外,还提供了更高层次的封装。使用multiprocessing.Manager可以简单地使用这些高级接口。

Manager支持的类型有list,dict,Namespace, Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
 1 import multiprocessing
 2 import time
 3 
 4 
 5 def worker(d, key, value):
 6     d[key] = value
 7 
 8 if __name__ == __main__:
 9     mgr = multiprocessing.Manager()
10 
11     d = mgr.dict()
12     jobs = [multiprocessing.Process(target=worker, args=(d, i, i*2))
13              for i in range(10)
14              ]
15     for j in jobs:
16         j.start()
17     for j in jobs:
18         j.join()
19     print(Results: )
20     for key, value in enumerate(dict(d)):
21         print("%s=%s:%s" % (key, value, d[value]))
22 
23 
24     print("================================================================")
25 
26     manager = multiprocessing.Manager()
27     Global = manager.Namespace()
28     Global.x = 10
29     Global.y = hello
30     print(Global)
31 
32     print("==================================================================")

 

问题:

列表不可变
在学习python多进程管理manager时候,当不使用join对当前进程(主进程)进行阻塞时会报错:

这样进行一下总结:在使用manager管理/进行多进程及其数据交互时候,必须对每一个manager内的进程进行join-------待所有子进程完成后再回到主进程。

 

多进程之进程池

 1 import time
 2 from multiprocessing import Pool
 3 
 4 def worker():
 5     for i in range(10):
 6         print("From worker %s"%i)
 7         time.sleep(0.5)
 8 
 9 def foo():
10     for i in range(10):
11         print("From foo %s"%i)
12         time.sleep(0.5)
13 
14 def bar():
15     for i in range(10):
16         print("From bar %s"%i)
17         time.sleep(0.5)
18 
19 if __name__ == "__main__":
20     pool = Pool(4)            # 创建Pool对象, 3 表示同时最多可以增加 3 条进程
21     pool.apply_async(worker)
22     pool.apply_async(worker)
23     pool.apply_async(worker)
24     pool.apply_async(foo)
25     pool.apply_async(foo)
26     pool.apply_async(foo)
27     pool.apply_async(bar)
28     pool.apply_async(bar)
29     pool.apply_async(bar)
30 
31     pool.close()                  # 关闭进程池,禁止添加任务
32     pool.join()          # 等待子进程结束后,主进程才往下走
33     print("Is done...")

 

并发之协程

 1 import time
 2 
 3 def consumer():
 4     r = ‘‘
 5     while True:
 6         # 3、consumer通过yield拿到消息,处理,又通过yield把结果传回;
 7         #    yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。
 8         #    当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,
 9         #    就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。
10         n = yield r
11         if not n:
12             return
13         print([CONSUMER] ←← Consuming %s... % n)
14         time.sleep(1)
15         r = 200 OK
16 def produce(c):
17     # 1、首先调用c.next()启动生成器
18     next(c)
19     n = 0
20     while n < 5:
21         n = n + 1
22         print([PRODUCER] →→ Producing %s... % n)
23         # 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
24         cr = c.send(n)
25         # 4、produce拿到consumer处理的结果,继续生产下一条消息;
26         print([PRODUCER] Consumer return: %s % cr)
27     # 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
28     c.close()
29 if __name__==__main__:
30     # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
31     c = consumer()
32     produce(c)

 

协程封装之greenlet

 1 import greenlet
 2 import time
 3 import random
 4 
 5 """
 6 创建方法:greenlet.greenlet(self, run=None, parent=None)
 7 主要方法:
 8         a.switch()    切换到 a 里面执行
 9 """
10 def foo():
11     for i in range(10):
12         print("foo:",i)
13         time.sleep(random.random())
14         gb.switch()
15 
16 
17 def bar():
18     for i in range(10):
19         print("bar:", i)
20         time.sleep(random.random())
21         gf.switch()
22 
23 if __name__ == "__main__":
24     gf = greenlet.greenlet(foo)
25     gb = greenlet.greenlet(bar)
26 
27     gf.switch()

 

协程封装之 gevent

 1 import gevent
 2 from gevent import monkey
 3 import time
 4 import random
 5 
 6 monkey.patch_all()      #  如果遇到 IO 阻塞,那么就切换到下一个 协程的程序
 7 
 8 def foo():
 9     for i in range(10):
10         print("foo:",i)
11         time.sleep(random.random())
12 
13 
14 def bar():
15     for i in range(10):
16         print("bar:", i)
17         time.sleep(random.random())
18 
19 
20 if __name__ == "__main__":
21     gevent.joinall([gevent.spawn(foo),
22                     gevent.spawn(bar)])
23 
24 
25     # 固定用法,将里面的函数放入到 协程的执行序列中

 

python queue