首页 > 代码库 > 项目中的libevent

项目中的libevent

单线程libevent模式项目里面是多线程版的,我先理解下单线程的。//client1.调用NGP::init()bool NGP::init(NGPcontext context){    _context = context;    //_TcpLink = NEWSP(TcpLink);    _TcpLink = NEWSP(TcpLinkEx);    _TcpLink->Init(this);    return true;}2.初始化Libeventbool LibEvtServer::init(I_NetServerEvent* event, int start, int size){    m_ids = new ChannelIDGenerator();    m_ids->init(start, size);    m_allChannels.resize(m_ids->getSize());    m_event = event;    //使用windows模式的线程和锁    int hr = evthread_use_windows_threads();//当前是两个线程一个主线程一个派发线程        //一个线程只有一个event_base,对应的是一个struct event_base结构体和相应的事件管理器,这个事件管理器管理跟这个event_base绑定的事件    m_base = event_base_new();//创建event_base    if (!m_base) {        fprintf(stderr, "Could not initialize libevent!\n");        return false;    }    return true;}3.监听,搞不懂客户端为什么要监听bool LibEvtServer::listen(int* port){    struct sockaddr_in sin;    memset(&sin, 0, sizeof(sin));    sin.sin_family = AF_INET;    if(-1 == *port)        sin.sin_port = htons(10000);    else        sin.sin_port = htons(*port);        //如果libevent分配和绑定套接字就调用evconnlistener_new_bind,这个相当于原始的socket()和bind()和listen(),然后有连接会调用listener_cb    //其中调用了evutil_make_socket_nonblocking,将socket设置成无阻塞的,bind绑定,evconnlistener_new创建监听器,然后返回监听器    //连接监听器使用event_base来得知什么时候在监听套接字上有TCP连接,当有连接时会调用回调函数    m_listener = evconnlistener_new_bind(m_base, ::listener_cb, (void*)this,//evconnlistener_new_bind提供了监听和接受TCP的一个方法,这个是要libevent分配和绑定套接字        LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,        (struct sockaddr*)&sin,        sizeof(sin));    if (!m_listener)     {        return false;    }    if( -1 == *port)        *port = ntohs(sin.sin_port);    if (!m_listener) {        fprintf(stderr, "Could not create a listener!\n");        return false;    }    m_spThread.reset(new std::thread([this]    {        //SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST);        //event_base_loop(m_base, EVLOOP_ONCE);        //循环在epoll/kqueue系统调用上,当事件发生时自动调用绑定的事件,但这些事件需要绑定到这个event_base上面        event_base_dispatch(m_base);//启动event_base的循环,event_base_dispatch内部就是一个while循环,项目直接开了一个线程取做这个        if(WSAENOTSOCK == WSAGetLastError())        {            Plug::PlugMessageBox(L"操作无效套接字啊!");        }                Plug::PlugMessageBox(L"Libevent派发线程退出!");    }));    return true;}4.客户端连接bool LibEvtServer::connet_to(int ip, int port){    //创建基于socket的bufferevent,并将其托管给event_base    //buffevent从单词可以看出是在event上的buffer,即在read/write两个事件上有input和output缓冲区    //是文件描述符,决定哪个文件被写入写出,可以将socket看成文件描述符,但不允许设置成pipe(2),为安全起见可以设置成-1,然后 set it with bufferevent_setfd or bufferevent_socket_connect().    auto bev = bufferevent_socket_new(m_base, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); //创建一个bufferevent,关联sockfd,托管给event_base    if (!bev){          std::cout << "bufferevent_socket_new error!" << std::endl;          return false;      }      struct sockaddr_in sin;      sin.sin_family = AF_INET;      sin.sin_addr.s_addr = ntohl(ip);    sin.sin_port = htons(port);          //连接    //If the bufferevent does not already have a socket set, we allocate a new socket here and make it nonblocking before we begin.    int hr = bufferevent_socket_connect(bev, (struct sockaddr*)&sin, sizeof(sin));    if(0 != hr)        return false;    std::lock_guard<std::mutex> lock(m_offline_mtx);//跟申请释放channel id互锁    auto c2 = CreateChannel(bev);        //设置bufferevent上的回调    bufferevent_setcb(bev, conn_readcb, conn_writecb, conn_eventcb, c2);//设置读写事件    //启动bufferevent    bufferevent_enable(bev, EV_READ | EV_WRITE);//启动读写事件,将读写事件放入监听队列poll中    return true;}//-------------------------------------到此client方面就做好了libevent方面的准备-------------------------//server/* 成员函数 */bool LibEvtServer::init(I_NetServerEvent* event, int start, int size){    m_ids = new ChannelIDGenerator();    m_ids->init(start, size);    m_allChannels.resize(m_ids->getSize());    m_event = event;    //event支持多线程的初始化函数    int hr = evthread_use_windows_threads();    m_base = event_base_new();    if (!m_base) {        fprintf(stderr, "Could not initialize libevent!\n");        return false;    }    return true;}//侦听端口,-1表示向系统申请一个任意可用端口bool LibEvtServer::listen(int* port){    struct sockaddr_in sin;    memset(&sin, 0, sizeof(sin));    sin.sin_family = AF_INET;    if(-1 == *port)        sin.sin_port = htons(10000);    else        sin.sin_port = htons(*port);    m_listener = evconnlistener_new_bind(m_base, ::listener_cb, (void*)this,        LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,        (struct sockaddr*)&sin,        sizeof(sin));    if (!m_listener)     {        return false;    }    if( -1 == *port)        *port = ntohs(sin.sin_port);    if (!m_listener) {        fprintf(stderr, "Could not create a listener!\n");        return false;    }    m_spThread.reset(new std::thread([this]    {        //SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST);        //event_base_loop(m_base, EVLOOP_ONCE);        event_base_dispatch(m_base);        if(WSAENOTSOCK == WSAGetLastError())        {            Plug::PlugMessageBox(L"操作无效套接字啊!");        }                Plug::PlugMessageBox(L"Libevent派发线程退出!");    }));    return true;}//然后在这个监听器里面的回调函数里面/* 新连接事件处理 */typedef void (*evconnlistener_cb)(struct evconnlistener *, evutil_socket_t, struct sockaddr *, int socklen, void *);//这个是回调的原型static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd,    struct sockaddr *sa, int socklen, void *user_data){    auto ser = (LibEvtServer*)user_data;    ser->listener_cb(listener, fd, sa, socklen, user_data);}void LibEvtServer::listener_cb(struct evconnlistener *listener, evutil_socket_t fd,    struct sockaddr *sa, int socklen, void *user_data){    //获取listener相关的event_base    auto base = evconnlistener_get_base(listener);    //建立socket的bufferevent    auto bev = bufferevent_socket_new(base, fd, BEV_OPT_THREADSAFE);//BEV_OPT_CLOSE_ON_FREE |     if (!bev)     {        fprintf(stderr, "Error constructing bufferevent!");        event_base_loopbreak(base);        return ;    }    auto c2 = CreateChannel(bev);        //设置回调    bufferevent_setcb(bev, conn_readcb, NULL, conn_eventcb, c2);    //启动bufferevent    bufferevent_enable(bev, EV_READ | EV_WRITE );}//看来服务器过程和客户端差不多,//libevent过程,基于event_base的socket bufferevent,在event_base_dispath里面不知道用的是select还是iocp,然后发现socket状态发生变化可读,可写,或者//错误都会调用相应的注册时间,建立的socket都是不阻塞的。还有个这个读写回调到底是什么时候调用的,我还不太明白,和那个读写缓冲区到底什么区别.

 

项目中的libevent