首页 > 代码库 > Gevent源码之loop的实现
Gevent源码之loop的实现
gevent之所以性能好,最主要就得益于对libev的封装,这里就来看看这部分具体的实现。。。
稍微看一下libev的用法就知道,libev将各种事件都定义为了watcher,这里包括了定时,io等等。。
在gevent主要就是对libev的loop以及watcher进行了封装。。这部分采用的是cython来写的。。
通过以前看的gevent的代码可以知道,所有建立的协程都有一个共同的parent协程,也就是hub协程,他有一个loop对象,其实就可以理解为gevnet通过hub协程来管理整个loop的运行。。。
loop是采用cython来编写的,代码在core.pyx里面。。。
先来看看它的一些属性的定义:
cdef libev.ev_loop* _ptr #libev的loop的引用 cdef public object error_handler cdef libev.ev_prepare _prepare #这个prepare事件,每次在loop之前,都会调用它的回调 cdef public list _callbacks #当前loop对象上面挂起的所有回调的链表 cdef libev.ev_timer _timer0 #一个超时为0的timer,用于让loop立即返回
对于这几个属性,上面的注释应该很清楚了。。。嗯,这里要感叹一下,用cython来写python的扩展还真的很方便。。。
好了,这里来看看构造函数吧:
#构造函数 def __init__(self, object flags=None, object default=None, size_t ptr=0): cdef unsigned int c_flags cdef object old_handler = None #这个是注册每次event loop 之前的回调,每次loop开始之前,就会执行这里 #其实最终调用的时_run_callbacks方法 libev.ev_prepare_init(&self._prepare, <void*>gevent_run_callbacks) #ifdef _WIN32 libev.ev_timer_init(&self._periodic_signal_checker, <void*>gevent_periodic_signal_check, 0.3, 0.3) #endif #注册这个timer的回调,这个回调其实什么都不做,主要的目的就是让event loop的循环立即退出,去处理回调 libev.ev_timer_init(&self._timer0, <void*>gevent_noop, 0.0, 0.0) if ptr: self._ptr = <libev.ev_loop*>ptr else: c_flags = _flags_to_int(flags) _check_flags(c_flags) c_flags |= libev.EVFLAG_NOENV if default is None: default = True if _default_loop_destroyed: default = False if default: self._ptr = libev.gevent_ev_default_loop(c_flags) if not self._ptr: raise SystemError("ev_default_loop(%s) failed" % (c_flags, )) #ifdef _WIN32 libev.ev_timer_start(self._ptr, &self._periodic_signal_checker) libev.ev_unref(self._ptr) #endif else: #创建loop的引用,一般情况下都是在这里创建的 self._ptr = libev.ev_loop_new(c_flags) if not self._ptr: raise SystemError("ev_loop_new(%s) failed" % (c_flags, )) if default or __SYSERR_CALLBACK is None: set_syserr_cb(self._handle_syserr) #在loop上面启动prepare,这样可以保证每次loop之前都执行了prepare的回调,也就是执行在loop上面注册的callback libev.ev_prepare_start(self._ptr, &self._prepare) libev.ev_unref(self._ptr) self._callbacks = [] #初始化回调链表
这个还是很简单的,关于libev方面的东西这里就不细说了,代码首先初始化了prepare事件,并将其的回调设置为了gevent_run_callbacks方法,这个方法其实最终又是调用的当前loop对象的_run_callbacks方法来处理所有在当前loop上面挂起的回调。。。这样也就是说,每一次在运行loop的wait之前,都会先处理在当前loop上面挂起的回调。。这里就包括很多协程的switch啥的。。。
另外这里可以看到初始化了一个超时为0的timer,这个事干嘛用的呢,嗯,例如我们在运行当前loop上面挂起的所有的回调之后,这些回调中有可能又在当前loop上面挂起回调,为了让eventloop可以理解返回,接下来处理这些挂起的回调,那么就在当前eventloop上面挂起一个超时为0的timer,可以让loop立刻返回。
接下来就是构建libev的loop的过程,最后可以看到调用了ev_prepare_start方法来启动了prepare事件。。。
嗯,接下来看看loop对象是怎么处理挂在当前loop上的回调的,也就是_run_callbacks方法:
#在gevent_run_callbacks中其实也是调用这个方法来具体的运行所有的回调 #在每一次运行loop之前,都会执行这些回调 cdef _run_callbacks(self): cdef callback cb cdef object callbacks cdef int count = 1000 libev.ev_timer_stop(self._ptr, &self._timer0) while self._callbacks and count > 0: callbacks = self._callbacks self._callbacks = [] for cb in callbacks: libev.ev_unref(self._ptr) gevent_call(self, cb) count -= 1 #在运行回调的时候有可能又加入了回调,所以这里进行定时 #因为这个定时的超时是0,所以让loop立即返回,可以处理回调 if self._callbacks: libev.ev_timer_start(self._ptr, &self._timer0)
其实代码比较简单吧,无非就是遍历回调的队列,然后执行,然后判断是否加入了新的回调,如果有的话,那么就启动那个超时为0的timer,让loop可以立即返回接着处理回调。。。
接下来再来看看loop的run方法吧。。。在hub协程中每次也就是调用这个方法来处理的:
#运行loop对象 def run(self, nowait=False, once=False): CHECK_LOOP2(self) cdef unsigned int flags = 0 if nowait: flags |= libev.EVRUN_NOWAIT if once: flags |= libev.EVRUN_ONCE with nogil: #这里主要就是运行libev的loop libev.ev_run(self._ptr, flags)
这里代码也算是简单吧,其实就是运行libev的事件循环。。。。
好了。。。其实到这里loop的基本上的东西 就说的差不多了。。。。
那么接下来来看看loop上面是如何封装事件的吧。。。这里就拿IO事件来举例子了。。。。
对于I/O watcher的创建,一般情况下是在loop上面调用io方法:
#创建I/O watcher #ifdef _WIN32 def io(self, libev.vfd_socket_t fd, int events, ref=True, priority=None): return io(self, fd, events, ref, priority) #else def io(self, int fd, int events, ref=True, priority=None): return io(self, fd, events, ref, priority) #endif
这里根据系统的不同分成两种,不过我们只需要关心下面一种就好了,windows的就暂时不考虑了。。。这里其实就是构建了一个I/O类型的watcher,构造函数,第一个是当前loop的引用,第二个是文件描述符,第三个是感兴趣的事件(读或者写)。。。
这里来看看IO watcher的源码定义吧:
#I/Owatcher的定义 cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]: WATCHER_BASE(io) #通过这个宏定义了一些基本的属性,例如libev的 watcher 引用等 #这个其实其实就是启动watcher def start(self, object callback, *args, pass_events=False): CHECK_LOOP2(self.loop) if callback is None: raise TypeError('callback must be callable, not None') self.callback = callback if pass_events: self.args = (GEVENT_CORE_EVENTS, ) + args else: self.args = args LIBEV_UNREF libev.ev_io_start(self.loop._ptr, &self._watcher) #在libev的loop上面启动这个io watcher PYTHON_INCREF ACTIVE PENDING #ifdef _WIN32 #io watcher的构造 def __init__(self, loop loop, libev.vfd_socket_t fd, int events, ref=True, priority=None): if events & ~(libev.EV__IOFDSET | libev.EV_READ | libev.EV_WRITE): raise ValueError('illegal event mask: %r' % events) cdef int vfd = libev.vfd_open(fd) libev.vfd_free(self._watcher.fd) #初始化,并设置回调为gevent_callback_io #在回调里面,会执行最开始watcher注册的回调,并会取消读写事件的注册 libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, vfd, events) self.loop = loop if ref: self._flags = 0 else: self._flags = 4 if priority is not None: libev.ev_set_priority(&self._watcher, priority) #else #一般就考虑这里的构造就好了,三个重要的参数分别是loop对象的引用,文件描述符,事件类型,一般情况下都不会带有优先级的 def __init__(self, loop loop, int fd, int events, ref=True, priority=None): if fd < 0: raise ValueError('fd must be non-negative: %r' % fd) if events & ~(libev.EV__IOFDSET | libev.EV_READ | libev.EV_WRITE): raise ValueError('illegal event mask: %r' % events) #调用libev的方法来初始化I/O watcher libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, fd, events) self.loop = loop if ref: self._flags = 0 else: self._flags = 4 if priority is not None: libev.ev_set_priority(&self._watcher, priority) #endif property fd: def __get__(self): return libev.vfd_get(self._watcher.fd) def __set__(self, long fd): if libev.ev_is_active(&self._watcher): raise AttributeError("'io' watcher attribute 'fd' is read-only while watcher is active") cdef int vfd = libev.vfd_open(fd) libev.vfd_free(self._watcher.fd) libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, vfd, self._watcher.events) property events: def __get__(self): return self._watcher.events def __set__(self, int events): if libev.ev_is_active(&self._watcher): raise AttributeError("'io' watcher attribute 'events' is read-only while watcher is active") libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, self._watcher.fd, events) property events_str: def __get__(self): return _events_to_str(self._watcher.events) def _format(self): return ' fd=%s events=%s' % (self.fd, self.events_str)
这里要注意WATCHER_BASE(io)是一个宏定义,他定义了一些基本的属性,例如libev的watcher引用等。。。
从构造函数中可以看到调用了libev的ev_io_init方法来初始化内部的I/O watcher引用,并设置了其回调为gevent_callback_io方法,在start方法中则调用了libev的ev_io_start方法来启动这个watcher,并记录了回调的方法。。。这里如果对libev有基本了解的话,通过上述代码就已经了解了I/O watcher的运行原理。。。
不过这里感觉有必要来说一下gevent_callback_io这个回调函数,也就是我们感兴趣的I/O事件发生了之后,会调用这个方法来处理。。。。
它的定义在callbacks.h以及callbacks.c中。。。而且是通过宏定义的方式来定义了很多类似的方法,例如gevent_callback_timer啥的。。。关于这部分的内容就不细说了。。直接来看看究竟调用的时什么代码吧:
//首先获取wather对象,然后调用回调 #define DEFINE_CALLBACK(WATCHER_LC, WATCHER_TYPE) static void gevent_callback_##WATCHER_LC(struct ev_loop *_loop, void *c_watcher, int revents) { struct PyGevent##WATCHER_TYPE##Object* watcher = GET_OBJECT(PyGevent##WATCHER_TYPE##Object, c_watcher, _watcher); gevent_callback(watcher->loop, watcher->_callback, watcher->args, (PyObject*)watcher, c_watcher, revents); }
首先获取对应的watcher对象,然后调用gevent_callback方法来调用在watcher上面注册的回调。。。
//对于所有的事件的回调,其实最终都是通过这个函数来处理的 static void gevent_callback(struct PyGeventLoopObject* loop, PyObject* callback, PyObject* args, PyObject* watcher, void *c_watcher, int revents) { GIL_DECLARE; PyObject *result, *py_events; long length; py_events = 0; GIL_ENSURE; Py_INCREF(loop); Py_INCREF(callback); Py_INCREF(args); Py_INCREF(watcher); gevent_check_signals(loop); if (args == Py_None) { args = __pyx_empty_tuple; } length = PyTuple_Size(args); if (length < 0) { gevent_handle_error(loop, watcher); goto end; } if (length > 0 && PyTuple_GET_ITEM(args, 0) == GEVENT_CORE_EVENTS) { py_events = PyInt_FromLong(revents); if (!py_events) { gevent_handle_error(loop, watcher); goto end; } PyTuple_SET_ITEM(args, 0, py_events); } else { py_events = NULL; } result = PyObject_Call(callback, args, NULL); //执行回调 if (result) { Py_DECREF(result); } else { gevent_handle_error(loop, watcher); if (revents & (EV_READ|EV_WRITE)) { //这里可以看出,如果注册了读写事件,那么会取消,也就标示每一次读写,都需要重写注册事件 /* io watcher: not stopping it may cause the failing callback to be called repeatedly */ gevent_stop(watcher, loop); goto end; } } if (!ev_is_active(c_watcher)) { /* Watcher was stopped, maybe by libev. Let's call stop() to clean up * 'callback' and 'args' properties, do Py_DECREF() and ev_ref() if necessary. * BTW, we don't need to check for EV_ERROR, because libev stops the watcher in that case. */ gevent_stop(watcher, loop); } end: if (py_events) { Py_DECREF(py_events); PyTuple_SET_ITEM(args, 0, GEVENT_CORE_EVENTS); } Py_DECREF(watcher); Py_DECREF(args); Py_DECREF(callback); Py_DECREF(loop); GIL_RELEASE; }
这里吧gevent_callback方法的代码列出来了。。。是因为有一部分需要注意,对于I/O事件类型,处理了回调之后,将会取消在loop上面挂起的I/O事件。。。
这样的话,对于每一次读写,都需要在loop上面注册。。。。嗯。。不过好处就是。。提供了同步的变成范式。。。
好了,到这里gevent的loop的实现就差不多了。。。
Gevent源码之loop的实现