首页 > 代码库 > 使用aicp实现事件回调模式
使用aicp实现事件回调模式
相对于asio事件等待池aiop的reactor模式,基于proactor模式的事件回调池aicp封转的更加的上层。
在类unix系统上
底层是基于aiop的实现,在一个线程里面进行事件等待,然后分发所有事件,在worker中处理完后调用回调返回到上层。
并对不同系统的aiop支持力度,进行针对性优化。- 如果aiop支持边缘触发(例如:epoll、kqueue),尽量启用边缘触发,以减少系统api的频繁调用。
- 使用sendfile对发送文件进行优化
- 针对linux系统,启用native file io,实现file的dma读写,节省cpu的时间。(目前还没实现)
在windows系统上
直接基于iocp进行封装,并针对windows的一些扩展socket api进行针对性优化。
- 使用DisconnectEx对socket的进行重用,实现socket池来管理,优化整体性能
- 使用TransmitFile对发送文件进行优化
- 在高版本系统上,优先使用GetQueuedCompletionStatusEx来批量获取事件,减少系统调用
不管在哪个平台上,只需要两个线程(worker线程+timer线程),就能实现高性能并发io读写。
其中timer线程主要处理投递事件的超时维护,如果io事件长时间不响应,则会超时取消,超时事件是可以根据不同事件,自己设置。
如果确实想要快速取消,也有接口安全的强制kill掉,并在对应的回调里面监听到killed事件。
针对timer,aicp中了两种定时器:
- tb_ltimer_t:低精度定时器,精度为1s,主要用于超时维护,效率很高,采用简化的timing-wheel算法
- tb_timer_t:通用高精度定时器,精度为1ms,使用最小堆维护,主要用于维护所有定时任务,以及各种io限速操作
并且整个aicp参考nginx对于gettimeofday的优化,也对其进行了缓存,只在master loop中进行更新,其他地方直接使用缓存的值。
虽然aicp只需要一个worker线程进行loop就足够了,但是同时它也支持多个线程同时开启loop,来提高分发处理效率。
具体的aicp架构,参见:asio架构
下面看看aicp是如何使用的:
所有aicp事件投递,都是由aico对象完成,每个aico对象一个事件对象,目前支持三种事件对象:
- socket
- file
- task(用于投递定时任务)
具体aico的事件投递接口,参见:asio的事件投递接口说明
/* ////////////////////////////////////////////////////////////////////////////////////// * includes */ #include "tbox/tbox.h" /* ////////////////////////////////////////////////////////////////////////////////////// * types */ typedef struct __tb_demo_context_t { // the file tb_file_ref_t file; // the aico tb_aico_ref_t aico; // the size tb_hize_t size; }tb_demo_context_t; /* ////////////////////////////////////////////////////////////////////////////////////// * implementation */ // aico对象关闭回调函数 static tb_bool_t tb_demo_aico_clos(tb_aice_t const* aice) { // check tb_assert_and_check_return_val(aice && aice->aico && aice->code == TB_AICE_CODE_CLOS, tb_false); // trace tb_trace_d("aico[%p]: clos: %s", aice->aico, tb_state_cstr(aice->state)); // exit aico tb_aico_exit(aice->aico); // ok return tb_true; } static tb_void_t tb_demo_context_exit(tb_demo_context_t* context) { if (context) { // clos aico if (context->aico) tb_aico_clos(context->aico, tb_demo_aico_clos, tb_null); context->aico = tb_null; // exit tb_free(context); } } // 发送文件事件回调函数 static tb_bool_t tb_demo_sock_sendf_func(tb_aice_t const* aice) { // check tb_assert_and_check_return_val(aice && aice->code == TB_AICE_CODE_SENDF, tb_false); // the context tb_demo_context_t* context = (tb_demo_context_t*)aice->priv; tb_assert_and_check_return_val(context, tb_false); // ok? if (aice->state == TB_STATE_OK) { // trace tb_trace_d("sendf[%p]: real: %lu, size: %llu", aice->aico, aice->u.sendf.real, aice->u.sendf.size); // save size context->size += aice->u.sendf.real; // 如果还没有发送完,则继续发送剩余文件数据 if (aice->u.sendf.real < aice->u.sendf.size) { // post sendf from file if (!tb_aico_sendf(aice->aico, context->file, context->size, aice->u.sendf.size - aice->u.sendf.real, tb_demo_sock_sendf_func, context)) return tb_false; } else { tb_trace_i("sendf[%p]: finished", aice->aico); tb_demo_context_exit(context); } } // closed or failed? else { tb_trace_i("sendf[%p]: state: %s", aice->aico, tb_state_cstr(aice->state)); tb_demo_context_exit(context); } // ok return tb_true; } // accept事件回调函数 static tb_bool_t tb_demo_sock_acpt_func(tb_aice_t const* aice) { // check tb_assert_and_check_return_val(aice && aice->code == TB_AICE_CODE_ACPT, tb_false); // the file path tb_char_t const* path = (tb_char_t const*)aice->priv; tb_assert_and_check_return_val(path, tb_false); // the aicp tb_aicp_ref_t aicp = tb_aico_aicp(aice->aico); tb_assert_and_check_return_val(aicp, tb_false); // acpt ok? if (aice->state == TB_STATE_OK) { // trace tb_trace_i("acpt[%p]: %p", aice->aico, aice->u.acpt.aico); // done tb_bool_t ok = tb_false; tb_demo_context_t* context = tb_null; do { // make context context = tb_malloc0_type(tb_demo_context_t); tb_assert_and_check_break(context); // init file context->file = tb_file_init(path, TB_FILE_MODE_RO | TB_FILE_MODE_ASIO); tb_assert_and_check_break(context->file); // 获取客户端连接的aico对象,用于发送数据 context->aico = aice->u.acpt.aico; tb_assert_and_check_break(context->aico); // 投递一个发送文件事件 if (!tb_aico_sendf(context->aico, context->file, 0ULL, tb_file_size(context->file), tb_demo_sock_sendf_func, context)) break; // ok ok = tb_true; } while (0); // failed? if (!ok) { // exit context if (context) tb_demo_context_exit(context); } } // failed? else { // exit loop tb_trace_i("acpt[%p]: state: %s", aice->aico, tb_state_cstr(aice->state)); // accept失败,关闭用于监听的aico对象 if (aice->aico) tb_aico_clos(aice->aico, tb_demo_aico_clos, tb_null); } // ok return tb_true; } /* ////////////////////////////////////////////////////////////////////////////////////// * main */ tb_int_t main(tb_int_t argc, tb_char_t** argv) { // check tb_assert_and_check_return_val(argv[1], 0); // done tb_aico_ref_t aico = tb_null; do { // 初始化tbox库 if (!tb_init(tb_null, tb_null, 0)) break; /* 初始化一个用于监听的aico对象 * * 注:这里为了简化代码,直接使用了全局的tb_aicp()对象, * 全局的aicp对象,用起来更加方便,内部回去自己开一个线程运行loop来驱动 * 一般用于客户端应用。 * * 如果想要更高的并发性能,启用更多的线程去驱动loop,需要手动调用tb_aicp_init,指定需要的并发量,来创建。 * 并且需要手动运行tb_aicp_loop */ aico = tb_aico_init(tb_aicp()); tb_assert_and_check_break(aico); // 打开这个aico对象,并为其创建一个tcp socket if (!tb_aico_open_sock_from_type(aico, TB_SOCKET_TYPE_TCP)) break; // 绑定监听端口 if (!tb_socket_bind(tb_aico_sock(aico), tb_null, 9090)) break; // 设置监听 if (!tb_socket_listen(tb_aico_sock(aico), 20)) break; /* 投递accept事件,仅需一次投递 * * 注: * 只有accept事件是一次投递,长期有效,只要有事件过来,就回去调用对应的回调函数 * 不需要重复投递,这样设计也是为了尽可能的接受更多地并发连接 */ if (!tb_aico_acpt(aico, tb_demo_sock_acpt_func, argv[1])) break; // 等待 getchar(); } while (0); // exit tbox tb_exit(); return 0; }
- TBOX项目详情
- TBOX项目源码
- TBOX项目文档
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。