首页 > 代码库 > 基础入门_Python-模块和包.Gevent事件/队列/组/池/信号量/子进程?
基础入门_Python-模块和包.Gevent事件/队列/组/池/信号量/子进程?
常用结构:
1.Event类,事件主要用于Greenlet之间的异步通信
e = gevent.event.Event() -> Event
说明: 创建一个信号对象
e.set() -> None
说明: 设置标志位
e.clear() -> None
说明: 清除标志位
e.wait() -> None
说明: 阻塞直至标志位被设置
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import gevent from gevent.event import Event # 说明: 导入其它模块 def setter(e): print ‘I‘ gevent.sleep(3) print ‘LOVE‘ e.set() def waiter(e): e.wait() print ‘You‘ if __name__ == ‘__main__‘: e = Event() gevent.joinall([ gevent.spawn(setter, e), gevent.spawn(waiter, e), gevent.spawn(waiter, e), gevent.spawn(waiter, e), gevent.spawn(waiter, e) ])
a = gevent.event.AsyncResult() -> AsyncResult
说明: 创建一个扩展可携带数据的信号对象
a.set(value=http://www.mamicode.com/None) -> None
说明: 设置带数据的标志位
a.e.clear() -> None
说明: 清除带数据的标志位
a.get(block=True, timeout=None) -> obj
说明: 阻塞直至标志位被设置并返回value值,可设置timeout到点抛出Timeout异常
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import gevent from gevent.event import AsyncResult # 说明: 导入其它模块 def setter(a): print ‘I‘ gevent.sleep(3) print ‘LOVE‘ a.set(‘I LOVE‘) def waiter(a): value = a.get() if e.successful: print ‘found notice: recv data from setter %s.‘ % (value,) print ‘You‘ if __name__ == ‘__main__‘: a = AsyncResult() gevent.joinall([ gevent.spawn(setter, a), gevent.spawn(waiter, a), gevent.spawn(waiter, a), gevent.spawn(waiter, a), gevent.spawn(waiter, a) ])
2. Queue类,常用用于Greenlet之间的异步共享
q = gevent.queue.Queue(maxsize=None, items=None) -> Queue
说明: 创建一个指定大小包含指定items的队列对象
q.empty() -> Boolean
说明: 队列是否为空
q.full() -> Boolean
说明: 队列是否已满,如果初始化时maxsize为None时队列永远不会满,除非内存耗尽
q.get(block=True, timeout=None) -> obj
说明: 队列尾部弹出并返回末尾元素,block为True如果队列为空会一直等待,否则会抛出gevent.queue.Empty异常
q.get_nowait() -> obj
说明: 同q.get(block=False),如果队列为空直接返回gevent.queue.Empty
q.put(item, block=True, timeout=None) -> None
说明: 队列头部插入item,如果block为True则等待队列有空间时再插入,如果超出timeout则抛出Timeout异常,否则直接抛出gevent.queue.Full异常
q.put_nowait(self, item) -> None
说明: 同q.put(block=False),如果队列满直接返回gevent.queue.Full
q.qsize() -> int
说明: 返回队列的元素数
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import gevent from gevent.queue import Queue # 说明: 导入其它模块 def boss(q): for _ in xrange(25): q.put_nowait(_) def work(name, q): while not q.empty(): task = q.get() print ‘found notice: worker %s got task-%s‘ % (name, task) gevent.sleep(0) if __name__ == ‘__main__‘: q = Queue() g = gevent.spawn(boss, q) g.join() workers = [] for _ in (‘limanman‘, ‘liuzhen‘, ‘zhenbao‘): workers.append(gevent.spawn(work, _, q)) gevent.joinall(workers)
说明: 如上演示了一个老板平均分配任务给指定数目的员工的例子,为了实现每个员工都能拿到任务,我们手动的gevent.sleep(0)互相切换(关键是调用函数中没有确定性的阻塞函数)实现.
3. Group类,常用于不限制数量的管理异步任务的分组且可搜集运行结果
g = gevent.pool.Group(*args) -> Group
说明: 创建一个组对象,其实就是一个不限greenlets数量的pool,可以随时添加/删除/关闭greenlet对象
g.add(greenlet) -> None
说明: 向组中添加一个greenlet对象
g.discard(greenlet) -> None
说明: 从组中删除一个greenlet对象
g.join(timeout=None, raise_error=False) -> None
说明: 等待组中所有的greenlets都执行完毕再进行下一步
g.kill(exception=<class ‘greenlet.GreenletExit‘>, block=True, timeout=None) -> None
说明: 关闭组内所有运行中的greenlet对象
g.killone(self, greenlet, exception=<class ‘greenlet.GreenletExit‘>, block=True, timeout=None) -> None
说明: 关闭组内指定运行的greenlet对象
g.apply(self, func, args=None, kwds=None) -> obj
说明: 同apply单次调用func并返回运行结果
g.apply_async(self, func, args=None, kwds=None, callback=None) -> greenlet
说明: 同上但是返回的不是直接的运行结果而是异步对象greenlet,需要再次调用其.get()方法才能获取最终结果
g.imap(self, func, *iterables, **kwargs) -> IMap
g.imap_unordered(self, func, *iterables, **kwargs) -> IMapUnordered
g.map(self, func, iterable) -> list
g.map_async(self, func, iterable, callback=None) -> GreenletGroup
说明: 为了简化协程的使用,调用map/imap实现任务快速分组,基本上返回的对象都需要再次调用get()方法获取最终运行结果,经测试发现调用函数传递多个参数时通过map/imap类方法传参时有bug,已反馈GitHub.
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import gevent import itertools from gevent.pool import Group from gevent.queue import Queue # 说明: 导入其它模块 def add_worker(name, q): while not q.empty(): task = q.get() print ‘found notice: worker(%s:%s) got task(%s)‘ % (name, gevent.getcurrent(), task) gevent.sleep(0) if __name__ == ‘__main__‘: q = Queue() g = Group() # 添加任务 for _ in xrange(100): q.put_nowait(_) # 通用方法 for worker in (‘langlang‘, ‘fengfeng‘, ‘shuishui‘): g.add(gevent.spawn(add_worker, worker, q)) g.join()
4. Pool类,常用于限制数量的管理异步任务,在受限于网络和IO的任务时候比较有优势
p = gevent.pool.Pool(size=None, greenlet_class=None) -> Pool
说明: 创建一个协程池对象,可以指定其实协程数
p.add(greenlet) -> None
说明: 尝试向协程池中添加greenlet对象,阻塞直到有池中有协程完毕有剩余空间
p.free_count() -> int
说明: 返回池中剩余的空间,也就是可以add的greenlet数
p.full() -> boolean
说明: 返回池是否已满
p.start(greenlet) -> None
说明: 启动未启动的greenlet且将其加入pool监控
p.imap(self, func, *iterables, **kwargs) -> IMap
p.imap_unordered(self, func, *iterables, **kwargs) -> IMapUnordered
p.map(self, func, iterable) -> list
p.map_async(self, func, iterable, callback=None) -> GreenletGroup
说明: 为了简化协程的使用,调用map/imap实现任务快速分组,基本上返回的对象都需要再次调用get()方法获取最终运行结果,经测试发现调用函数传递多个参数时通过map/imap类方法传参时有bug,已反馈GitHub.
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import gevent from gevent.pool import Pool # 说明: 导入其它模块 def pool_size(p): print ‘found notice: current pool size: %d free %d‘ % (len(p), p.free_count()) if __name__ == ‘__main__‘: p = Pool(2) p.map(pool_size, [p]*3)
5. BoundedSemaphore类,常用于限制资源同时被多少个协程访问,通过限制同时发放锁的数量来限制资源访问
b = gevent.lock.BoundedSemaphore(n) -> BoundedSemaphore
说明: 创建一个信号量对象,限制资源同时只能被n个协程访问,其实就是每次分发n把锁,只有有人释放了锁才会重新追加分发锁
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import gevent from gevent.pool import Pool from gevent.lock import BoundedSemaphore # 说明: 导入其它模块 b = BoundedSemaphore(2) def work(name): with b: print ‘found notice: worker %s acquire sem lock.‘ % (name,) gevent.sleep(0) print ‘found notice: worker %s release sem lock.‘ % (name,) if __name__ == ‘__main__‘: p = Pool() p.map(work, [‘李满满‘, ‘刘珍珍‘, ‘罗诗瑶‘])
局部变量:
说明: Gevent内部实现以greenlet的getcurrent()为键,在一个私有命名空间寻址的全局查找,使得local对象可以在不同的greenlet对象中存在且相互隔离,很多继承了gevent的web框架将HTTP会话对象以局部变量的形式存储在gevent内
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import time import gevent from gevent.local import local # 说明: 导入其它模块 def session_manager(l, user): l.name = user l.timestamp = time.time() print ‘found notice: user %s login in and online.‘ % (user,) gevent.sleep(5) l.name = user l.timestamp = time.time() print ‘found notice: user %s session expire outs.‘ % (user,) print ‘‘‘ name: %s status: offline timestamp: %s ‘‘‘ % (l.name, l.timestamp) if __name__ == ‘__main__‘: l = local() gevent.joinall([ gevent.spawn(session_manager, l, ‘李满满‘), gevent.spawn(session_manager, l, ‘刘珍珍‘), ])
进程相关:
1. gevent.subprocess类,常用于作为subprocess模块增强版使用,支持异步协作式等待子进程
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import time import gevent from gevent.subprocess import PIPE, Popen # 说明: 导入其它模块 def task001(): while True: print time.time() gevent.sleep(1) if __name__ == ‘__main__‘: task = gevent.spawn(task001) task.start() # 中间插入子进程 p = Popen([‘ls;sleep 5‘], shell=True, stdout=PIPE, stderr=PIPE) out, err = p.communicate() print out # 等待协程的结束 task.join()
说明: 如上实例简单演示了支持异步协作子进程gevent.subprocess,首先task.start()启动协程调用但不会等待调用结束然后执行Popen()子进程,默认的Popen会阻塞主进程,但是在这里我们可以看到程序并没有阻塞而是继续执行task001,由于我们在最后task.join()会等待task001结束,而task001是一个死循环,所以一旦Popen子进程阻塞,立马会切换到task001执行,所以程序并不会阻塞
本文出自 “@Can Up and No BB...” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1862487
基础入门_Python-模块和包.Gevent事件/队列/组/池/信号量/子进程?