首页 > 代码库 > [gevent源码分析] libev cython绑定core.pyx

[gevent源码分析] libev cython绑定core.pyx

gevent core就是封装了libev,使用了cython的语法,感兴趣童鞋可以好好研究研究。其实libev是有python的封装

pyev(https://pythonhosted.org/pyev/),不过pyev是使用C来写扩展的,代码巨复杂。在看core.pyx代码之前先学习一下

core.pyx用到的cython知识。


一: cython基础知识

1.cdef, def, cpdef的区别
cdef用于定义C中的函数,变量,如cdef int i;而def知识python中的函数定义方法,一般只是为了提供python的访问。
cdef定义的函数,变量在python环境中是访问不了的,要么提供一个def的包装方法,要么用cpdef。cpdef只用于定义函数,
速度比cdef稍慢,主要因为cpdef定义的类函数支持重载,调用的时候需要查找虚函数表,cpdef同时生成供cython和python
调用的函数

2.明确的类型声明
为了提高速度和可读性,cython建议所有的变量加上类型,包括python中的类型。如定义列表,cdef list result;
如果是类可用cdef object p;

3.class如何定义,public的用法
我们看下core.pyx提供的class loop的定义
cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
    cdef libev.ev_loop* _ptr
    cdef public object error_handler
    cdef libev.ev_prepare _prepare
    cdef public list _callbacks
    cdef libev.ev_timer _timer0
你可能好奇,loop后面的中括号是干嘛用的?我们想想python中int类型的PyIntObject,
typedef struct {
    PyObject_HEAD
    long ob_ival;
} PyIntObject;
同样我们会生PyGeventLoopObject对象
struct PyGeventLoopObject {
  PyObject_HEAD
  struct __pyx_vtabstruct_6gevent_4core_loop *__pyx_vtab;
  struct ev_loop *_ptr;
  PyObject *error_handler;
  struct ev_prepare _prepare;
  PyObject *_callbacks;
  struct ev_timer _timer0;
}
而PyGeventLoop_Type就是我们用来定义一个类的类型对象
DL_EXPORT(PyTypeObject) PyGeventLoop_Type = {
  PyVarObject_HEAD_INIT(0, 0)
  __Pyx_NAMESTR("gevent.core.loop"), /*tp_name*/
  sizeof(struct PyGeventLoopObject), /*tp_basicsize*/
  0, /*tp_itemsize*/
  __pyx_tp_dealloc_6gevent_4core_loop, /*tp_dealloc*/
  0, /*tp_print*/
  0, /*tp_getattr*/
  0, /*tp_setattr*/
...
}
我们再来看看public的问题,这里的public也就是说这个class将会提供给其它.c文件使用,所以会生成对应的.h文件。
而cdef public list _callbacks,这个public和函数说明cpdef有点类似,也就是提供了在python中访问_callbacks的方法。

二: 源码一览

def __init__(self, object flags=None, object default=None, size_t ptr=0):
        pass
flags: 确定后端使用的异步IO模型,如"select, epoll",可直接字符串也可数字(需参考libev/ev.h)
default:是否使用libev的默认loop,否则将创建一个新的loop
可通过loop.backend确定是否和你设置一致,loop.backend_int返回libev内部对应序号
如:
from gevent import core
flag = "select"
loop=core.loop(flag)
assert loop.backend == flag
assert core._flags_to_int(flag) == loop.backend_int

libev支持的watcher:
所有watcher都通过start启动,并传递回调函数
1.io:
    loop.io(int fd, int events, ref=True, priority=None)
        fd: 文件描述符,可通过sock.fileno()获取
        events: 事件 1:read 2:write 3.read_write

        下面两个参数所有watcher都适用
        ref: 是否增加mainLoop的引用次数,默认是增加的。在libev中watcher.start都会增加引用次数,watcher.stop都会减少引用次数。当libev发现引用次数为0,也就没有需要监视的watcher,循环就会退出。
        priority: 设置优先级

2.timer定时器
    loop.timer(double after, double repeat=0.0, ref=True, priority=None)
        after: 多久后启动
        repeat: 多次重复之间间隔
    可通过一下小程序看看:

    def f():
        print time.time()
        print ‘eeeee‘
    from gevent.core import loop
    l = loop()
    timer = l.timer(2,3) #2秒后启动,3秒后再次启动
    print time.time()
    timer.start(f)
    l.run()

3.signer信号 收到信号处理方式
    loop.signal(int signum, ref=True, priority=None)
hub中有封装signal,使用如下:
def f():
    raise ValueError(‘signal‘)
sig = gevent.signal(signal.SIGALRM, f)
assert sig.ref is False
signal.alarm(1)
try:
    gevent.sleep(2)
    raise AssertionError(‘must not run here‘)
except ValueError:
    assert str(sys.exc_info()[1]) == ‘signal‘
和其它watcher不同的是ref默认是False,因为信号并不是必须的,所以循环不需等待信号发生。

4.async 唤醒线程
    loop.async(ref=True, priority=None)
    这主要是通过管道实现的,async.send方法将向管道发送数据,循环检查到读事件唤醒线程.
    hub = gevent.get_hub()
    watcher = hub.loop.async()
    gevent.spawn_later(0.1, thread.start_new_thread, watcher.send, ())
    start = time.time()
    with gevent.Timeout(0.3):
        hub.wait(watcher)
gevent中线程池中使用了async,当worker线程运行回调函数后,设置返回值,通过async.send唤醒hub主线程

5.fork 子进程事件
    loop.fork(ref=True, priority=None)
    当调用fork时将会回调注册的子进程watcher,但必须得调用libev.ev_loop_fork才有效,
而且要在子进程中使用libev也必须要调用libev.ev_loop_fork
    
在gevent的threadpool中使用了fork监视器
    self.fork_watcher = hub.loop.fork(ref=False)
    self.fork_watcher.start(self._on_fork)
    def _on_fork(self):
        # fork() only leaves one thread; also screws up locks;
        # let‘s re-create locks and threads
        pid = os.getpid()
        if pid != self.pid:
            self.pid = pid
            # Do not mix fork() and threads; since fork() only copies one thread
            # all objects referenced by other threads has refcount that will never
            # go down to 0.
            self._init(self._maxsize)
    回调_on_fork目的就是重新初始化线程池,但是刚才说了子进程要有效必须要调用libev.ev_loop_fork,
这又是在在哪里调用的呢?
if hasattr(os, ‘fork‘):
    _fork = os.fork

    def fork():
        result = _fork()
        if not result: #子进程
            reinit() #调用libev.ev_loop_fork
        return result
问题的关键就是gevent/os.py中重定义了fork函数,当fork返回0,也就是子进程,将调用reinit
最后真正调用的就是core.pyx的loop.reinit
    def reinit(self):
        if self._ptr:
            libev.ev_loop_fork(self._ptr)
gevent中的线程池在gevent中使用的很广,尤其是windows中,如dns请求,os.read write都是通过线程池,
花点时间看看threadpool.py源码,会收获很多。

6.ev_prepare  每次event loop之前事件
    loop.prepare(ref=True, priority=None)
    还记得上面timeout中说的,在loop中回调比定时器优先级高,在loop中是没有添加回调的,gevent是通过
    ev_prepare实现的。

gevent loop.run_callback实现原理:
    1.loop.run_callback会向loop._callbacks中添加回调
    2.在loop的__init__中初始化prepare: libev.ev_prepare_init(&self._prepare, <void*>gevent_run_callbacks)
        注册回调为gevent_run_callbacks
    3.在gevent_run_callbacks中会调用loop的_run_callbacks
        result = ((struct __pyx_vtabstruct_6gevent_4core_loop *)loop->__pyx_vtab)->_run_callbacks(loop);
    4.loop的_run_callbacks中会逐个调用_callbacks中的回调
这也就是为什么说callback优先级高的原因。

loop.run_callback返回的是一个callback对象,具有stop(),pending属性,也就是说如果回调还没运行,我们可以通过stop()方法停止。
事例代码如下:
def f(a):
    a.append(1)

from gevent.hub import get_hub
loop = get_hub().loop
a= []
f = loop.run_callback(f,a)
f.stop()
gevent.sleep(0)
assert not f.pending #没有阻塞可能是已运行或被停止
assert not a
如果注释掉f.stop(),那么a是[1],因为gevent.sleep(0)也是直接run_callback,肯定是谁先加入谁先调用,
但如果是其它watcher就没有机会调用了

7.ev_check 每次event loop之后事件
    loop.check(ref=True, priority=None)
    这个和ev_prepare刚好相反

8.stat 文件属性变化
    loop.stat(path, float interval=0.0, ref=True, priority=None)
    interval说明期望多久以后libev开始检测文件状态变化

开两个窗口,一个运行该程序,另一个可touch cs.log文件,文件有无也是状态变化
开两个窗口,一个运行该程序,另一个可touch cs.log文件,文件有无也是状态变化
hub = gevent.get_hub()
filename = ‘cs.log‘
watcher = hub.loop.stat(filename,2) #2s以后才监听文件状态
def f():
    print os.path.exists(filename)
watcher.start(f)
gevent.sleep(100)

core.pyx中封装的watcher差不多都介绍完了,我们看一下libev的主循环ev_run
int
ev_run (EV_P_ int flags)
{
  do
    {
      ......
    }
  while (expect_true (
    activecnt
    && !loop_done
    && !(flags & (EVRUN_ONCE | EVRUN_NOWAIT))
  ));

  return activecnt;
}
其中activecnt就是我们上面说的loop的引用计数,所以除非特殊情况ref最好为True。

core.pyx细节问题还需要小伙伴们自己去研究,基本的使用上面已经说得很明白了。gevent还剩下什么呢?也许threadpool,dns也会具体分析一下,敬请期待。




[gevent源码分析] libev cython绑定core.pyx