首页 > 代码库 > [gevent源码分析] libev cython绑定core.pyx
[gevent源码分析] libev cython绑定core.pyx
gevent core就是封装了libev,使用了cython的语法,感兴趣童鞋可以好好研究研究。其实libev是有python的封装
pyev(https://pythonhosted.org/pyev/),不过pyev是使用C来写扩展的,代码巨复杂。在看core.pyx代码之前先学习一下
core.pyx用到的cython知识。
一: cython基础知识
1.cdef, def, cpdef的区别
cdef用于定义C中的函数,变量,如cdef int i;而def知识python中的函数定义方法,一般只是为了提供python的访问。
cdef定义的函数,变量在python环境中是访问不了的,要么提供一个def的包装方法,要么用cpdef。cpdef只用于定义函数,
速度比cdef稍慢,主要因为cpdef定义的类函数支持重载,调用的时候需要查找虚函数表,cpdef同时生成供cython和python
调用的函数。
2.明确的类型声明
为了提高速度和可读性,cython建议所有的变量加上类型,包括python中的类型。如定义列表,cdef list result;
如果是类可用cdef object p;
3.class如何定义,public的用法
我们看下core.pyx提供的class loop的定义
cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]: cdef libev.ev_loop* _ptr cdef public object error_handler cdef libev.ev_prepare _prepare cdef public list _callbacks cdef libev.ev_timer _timer0你可能好奇,loop后面的中括号是干嘛用的?我们想想python中int类型的PyIntObject,
typedef struct { PyObject_HEAD long ob_ival; } PyIntObject;同样我们会生PyGeventLoopObject对象
struct PyGeventLoopObject { PyObject_HEAD struct __pyx_vtabstruct_6gevent_4core_loop *__pyx_vtab; struct ev_loop *_ptr; PyObject *error_handler; struct ev_prepare _prepare; PyObject *_callbacks; struct ev_timer _timer0; }
而PyGeventLoop_Type就是我们用来定义一个类的类型对象
DL_EXPORT(PyTypeObject) PyGeventLoop_Type = { PyVarObject_HEAD_INIT(0, 0) __Pyx_NAMESTR("gevent.core.loop"), /*tp_name*/ sizeof(struct PyGeventLoopObject), /*tp_basicsize*/ 0, /*tp_itemsize*/ __pyx_tp_dealloc_6gevent_4core_loop, /*tp_dealloc*/ 0, /*tp_print*/ 0, /*tp_getattr*/ 0, /*tp_setattr*/ ... }我们再来看看public的问题,这里的public也就是说这个class将会提供给其它.c文件使用,所以会生成对应的.h文件。
而cdef public list _callbacks,这个public和函数说明cpdef有点类似,也就是提供了在python中访问_callbacks的方法。
二: 源码一览
def __init__(self, object flags=None, object default=None, size_t ptr=0): passflags: 确定后端使用的异步IO模型,如"select, epoll",可直接字符串也可数字(需参考libev/ev.h)
default:是否使用libev的默认loop,否则将创建一个新的loop
可通过loop.backend确定是否和你设置一致,loop.backend_int返回libev内部对应序号
如:
from gevent import core flag = "select" loop=core.loop(flag) assert loop.backend == flag assert core._flags_to_int(flag) == loop.backend_int
所有watcher都通过start启动,并传递回调函数
1.io:
loop.io(int fd, int events, ref=True, priority=None)
fd: 文件描述符,可通过sock.fileno()获取
events: 事件 1:read 2:write 3.read_write
下面两个参数所有watcher都适用
ref: 是否增加mainLoop的引用次数,默认是增加的。在libev中watcher.start都会增加引用次数,watcher.stop都会减少引用次数。当libev发现引用次数为0,也就没有需要监视的watcher,循环就会退出。
priority: 设置优先级
2.timer定时器
loop.timer(double after, double repeat=0.0, ref=True, priority=None)
after: 多久后启动
repeat: 多次重复之间间隔
可通过一下小程序看看:
def f(): print time.time() print ‘eeeee‘ from gevent.core import loop l = loop() timer = l.timer(2,3) #2秒后启动,3秒后再次启动 print time.time() timer.start(f) l.run()
loop.signal(int signum, ref=True, priority=None)
hub中有封装signal,使用如下:
def f(): raise ValueError(‘signal‘) sig = gevent.signal(signal.SIGALRM, f) assert sig.ref is False signal.alarm(1) try: gevent.sleep(2) raise AssertionError(‘must not run here‘) except ValueError: assert str(sys.exc_info()[1]) == ‘signal‘和其它watcher不同的是ref默认是False,因为信号并不是必须的,所以循环不需等待信号发生。
4.async 唤醒线程
loop.async(ref=True, priority=None)
这主要是通过管道实现的,async.send方法将向管道发送数据,循环检查到读事件唤醒线程.
loop.async(ref=True, priority=None)
这主要是通过管道实现的,async.send方法将向管道发送数据,循环检查到读事件唤醒线程.
hub = gevent.get_hub() watcher = hub.loop.async() gevent.spawn_later(0.1, thread.start_new_thread, watcher.send, ()) start = time.time() with gevent.Timeout(0.3): hub.wait(watcher)gevent中线程池中使用了async,当worker线程运行回调函数后,设置返回值,通过async.send唤醒hub主线程
5.fork 子进程事件
loop.fork(ref=True, priority=None)
当调用fork时将会回调注册的子进程watcher,但必须得调用libev.ev_loop_fork才有效,
而且要在子进程中使用libev也必须要调用libev.ev_loop_fork
loop.fork(ref=True, priority=None)
当调用fork时将会回调注册的子进程watcher,但必须得调用libev.ev_loop_fork才有效,
而且要在子进程中使用libev也必须要调用libev.ev_loop_fork
在gevent的threadpool中使用了fork监视器
self.fork_watcher = hub.loop.fork(ref=False) self.fork_watcher.start(self._on_fork) def _on_fork(self): # fork() only leaves one thread; also screws up locks; # let‘s re-create locks and threads pid = os.getpid() if pid != self.pid: self.pid = pid # Do not mix fork() and threads; since fork() only copies one thread # all objects referenced by other threads has refcount that will never # go down to 0. self._init(self._maxsize)回调_on_fork目的就是重新初始化线程池,但是刚才说了子进程要有效必须要调用libev.ev_loop_fork,
这又是在在哪里调用的呢?
if hasattr(os, ‘fork‘): _fork = os.fork def fork(): result = _fork() if not result: #子进程 reinit() #调用libev.ev_loop_fork return result问题的关键就是gevent/os.py中重定义了fork函数,当fork返回0,也就是子进程,将调用reinit
最后真正调用的就是core.pyx的loop.reinit
def reinit(self): if self._ptr: libev.ev_loop_fork(self._ptr)gevent中的线程池在gevent中使用的很广,尤其是windows中,如dns请求,os.read write都是通过线程池,
花点时间看看threadpool.py源码,会收获很多。
6.ev_prepare 每次event loop之前事件
loop.prepare(ref=True, priority=None)
还记得上面timeout中说的,在loop中回调比定时器优先级高,在loop中是没有添加回调的,gevent是通过
ev_prepare实现的。
loop.prepare(ref=True, priority=None)
还记得上面timeout中说的,在loop中回调比定时器优先级高,在loop中是没有添加回调的,gevent是通过
ev_prepare实现的。
gevent loop.run_callback实现原理:
loop.run_callback返回的是一个callback对象,具有stop(),pending属性,也就是说如果回调还没运行,我们可以通过stop()方法停止。
事例代码如下:
但如果是其它watcher就没有机会调用了
7.ev_check 每次event loop之后事件
loop.check(ref=True, priority=None)
这个和ev_prepare刚好相反
8.stat 文件属性变化
loop.stat(path, float interval=0.0, ref=True, priority=None)
1.loop.run_callback会向loop._callbacks中添加回调 2.在loop的__init__中初始化prepare: libev.ev_prepare_init(&self._prepare, <void*>gevent_run_callbacks) 注册回调为gevent_run_callbacks 3.在gevent_run_callbacks中会调用loop的_run_callbacks result = ((struct __pyx_vtabstruct_6gevent_4core_loop *)loop->__pyx_vtab)->_run_callbacks(loop); 4.loop的_run_callbacks中会逐个调用_callbacks中的回调这也就是为什么说callback优先级高的原因。
loop.run_callback返回的是一个callback对象,具有stop(),pending属性,也就是说如果回调还没运行,我们可以通过stop()方法停止。
事例代码如下:
def f(a): a.append(1) from gevent.hub import get_hub loop = get_hub().loop a= [] f = loop.run_callback(f,a) f.stop() gevent.sleep(0) assert not f.pending #没有阻塞可能是已运行或被停止 assert not a如果注释掉f.stop(),那么a是[1],因为gevent.sleep(0)也是直接run_callback,肯定是谁先加入谁先调用,
但如果是其它watcher就没有机会调用了
7.ev_check 每次event loop之后事件
loop.check(ref=True, priority=None)
这个和ev_prepare刚好相反
8.stat 文件属性变化
loop.stat(path, float interval=0.0, ref=True, priority=None)
interval说明期望多久以后libev开始检测文件状态变化
开两个窗口,一个运行该程序,另一个可touch cs.log文件,文件有无也是状态变化
开两个窗口,一个运行该程序,另一个可touch cs.log文件,文件有无也是状态变化 hub = gevent.get_hub() filename = ‘cs.log‘ watcher = hub.loop.stat(filename,2) #2s以后才监听文件状态 def f(): print os.path.exists(filename) watcher.start(f) gevent.sleep(100)
core.pyx中封装的watcher差不多都介绍完了,我们看一下libev的主循环ev_run
int ev_run (EV_P_ int flags) { do { ...... } while (expect_true ( activecnt && !loop_done && !(flags & (EVRUN_ONCE | EVRUN_NOWAIT)) )); return activecnt; }其中activecnt就是我们上面说的loop的引用计数,所以除非特殊情况ref最好为True。
core.pyx细节问题还需要小伙伴们自己去研究,基本的使用上面已经说得很明白了。gevent还剩下什么呢?也许threadpool,dns也会具体分析一下,敬请期待。
[gevent源码分析] libev cython绑定core.pyx
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。