首页 > 代码库 > IO多路复用深入浅出

IO多路复用深入浅出

前言

从零单排高性能问题,这次轮到异步通信了。这个领域入门有点难,需要了解UNIX五种IO模型和 TCP协议,熟练使用三大异步通信框架:Netty、NodeJS、Tornado。目前所有标榜异步的通信框架用的都不是异步IO模型,而是IO多路复 用中的epoll。因为Python提供了对Linux内核API的友好封装,所以我选择Python来学习IO多路复用。

IO多路复用

  1. select

    举一个EchoServer的例子,客户端发送任何内容,服务端会原模原样返回。

    #!/usr/bin/env python# -*- coding: utf-8 -*-‘‘‘Created on Feb 16, 2016@author: mountain‘‘‘import socketimport selectfrom Queue import Queue#AF_INET指定使用IPv4协议,如果要用更先进的IPv6,就指定为AF_INET6。#SOCK_STREAM指定使用面向流的TCP协议,如果要使用面向数据包的UCP协议,就指定SOCK_DGRAM。server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.setblocking(False)#设置监听的ip和portserver_address = (‘localhost‘, 1234)server.bind(server_address)#设置backlog为5,client向server发起connect,server accept后建立长连接,#backlog指定排队等待server accept的连接数量,超过这个数量,server将拒绝连接。server.listen(5)#注册在socket上的读事件inputs = [server]#注册在socket上的写事件outputs = []#注册在socket上的异常事件exceptions = []#每个socket有一个发送消息的队列msg_queues = {}print "server is listening on %s:%s." % server_addresswhile inputs:    #第四个参数是timeout,可选,表示n秒内没有任何事件通知,就执行下面代码    readable, writable, exceptional = select.select(inputs, outputs, exceptions)    for sock in readable:        #client向server发起connect也是读事件,server accept后产生socket加入读队列中        if sock is server:            conn, addr = sock.accept()            conn.setblocking(False)            inputs.append(conn)            msg_queues[conn] = Queue()            print "server accepts a conn."        else:            #读取client发过来的数据,最多读取1k byte。            data = http://www.mamicode.com/sock.recv(1024)            #将收到的数据返回给client            if data:                msg_queues[sock].put(data)                if sock not in outputs:                    #下次select的时候会触发写事件通知,写和读事件不太一样,前者是可写就会触发事件,并不一定要真的去写                    outputs.append(sock)            else:                #client传过来的消息为空,说明已断开连接                print "server closes a conn."                if sock in outputs:                    outputs.remove(sock)                inputs.remove(sock)                sock.close()                del msg_queues[sock]    for sock in writable:        if not msg_queues[sock].empty():            sock.send(msg_queues[sock].get_nowait())        if msg_queues[sock].empty():            outputs.remove(sock)    for sock in exceptional:        inputs.remove(sock)        if sock in outputs:            outputs.remove(sock)        sock.close()        del msg_queues[sock]
    [mountain@king ~/workspace/wire]$ telnet localhost 1234Trying 127.0.0.1...Connected to localhost.Escape character is ‘^]‘.11

    select有3个缺点:

    1. 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大。
    2. 每次调用select后,都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。
      这点从python的例子里看不出来,因为python select api更加友好,直接返回就绪的socket列表。事实上linux内核select api返回的是就绪socket数目:
      int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
    3. fd数量有限,默认1024。
  2. poll

    采用poll重新实现EchoServer,只要搞懂了select,poll也不难,只是api的参数不太一样而已。

    #!/usr/bin/env python# -*- coding: utf-8 -*-‘‘‘Created on Feb 27, 2016@author: mountain‘‘‘import selectimport socketimport sysimport Queueserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.setblocking(False)server_address = (‘localhost‘, 1234)server.bind(server_address)server.listen(5)print ‘server is listening on %s port %s‘ % server_addressmsg_queues = {}timeout = 1000 * 60#POLLIN: There is data to read#POLLPRI: There is urgent data to read#POLLOUT: Ready for output#POLLERR: Error condition of some sort#POLLHUP: Hung up#POLLNVAL: Invalid request: descriptor not openREAD_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERRREAD_WRITE = READ_ONLY | select.POLLOUTpoller = select.poll()#注册需要监听的事件poller.register(server, READ_ONLY)#文件描述符和socket映射fd_to_socket = { server.fileno(): server}while True:    events = poller.poll(timeout)    for fd, flag in events:        sock = fd_to_socket[fd]        if flag & (select.POLLIN | select.POLLPRI):            if sock is server:                conn, client_address = sock.accept()                conn.setblocking(False)                fd_to_socket[conn.fileno()] = conn                poller.register(conn, READ_ONLY)                msg_queues[conn] = Queue.Queue()            else:                data = http://www.mamicode.com/sock.recv(1024)                if data:                    msg_queues[sock].put(data)                    poller.modify(sock, READ_WRITE)                else:                    poller.unregister(sock)                    sock.close()                    del msg_queues[sock]        elif flag & select.POLLHUP:            poller.unregister(sock)            sock.close()            del msg_queues[sock]        elif flag & select.POLLOUT:            if not msg_queues[sock].empty():                msg = msg_queues[sock].get_nowait()                sock.send(msg)            else:                poller.modify(sock, READ_ONLY)        elif flag & select.POLLERR:            poller.unregister(sock)            sock.close()            del msg_queues[sock]

    poll解决了select的第三个缺点,fd数量不受限制,但是失去了select的跨平台特性,它的linux内核api是这样的:

    int poll (struct pollfd *fds, unsigned int nfds, int timeout);struct pollfd {     int fd; /* file descriptor */    short events; /* requested events to watch */    short revents; /* returned events witnessed */};
  3. epoll

    用法与poll几乎一样。

    #!/usr/bin/env python# -*- coding: utf-8 -*-‘‘‘Created on Feb 28, 2016@author: mountain‘‘‘import selectimport socketimport Queueserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.setblocking(False)server_address = (‘localhost‘, 1234)server.bind(server_address)server.listen(5)print ‘server is listening on %s port %s‘ % server_addressmsg_queues = {}timeout = 60READ_ONLY = select.EPOLLIN | select.EPOLLPRIREAD_WRITE = READ_ONLY | select.EPOLLOUTepoll = select.epoll()#注册需要监听的事件epoll.register(server, READ_ONLY)#文件描述符和socket映射fd_to_socket = { server.fileno(): server}while True:    events = epoll.poll(timeout)    for fd, flag in events:        sock = fd_to_socket[fd]        if flag & READ_ONLY:            if sock is server:                conn, client_address = sock.accept()                conn.setblocking(False)                fd_to_socket[conn.fileno()] = conn                epoll.register(conn, READ_ONLY)                msg_queues[conn] = Queue.Queue()            else:                data = http://www.mamicode.com/sock.recv(1024)                if data:                    msg_queues[sock].put(data)                    epoll.modify(sock, READ_WRITE)                else:                    epoll.unregister(sock)                    sock.close()                    del msg_queues[sock]        elif flag & select.EPOLLHUP:            epoll.unregister(sock)            sock.close()            del msg_queues[sock]        elif flag & select.EPOLLOUT:            if not msg_queues[sock].empty():                msg = msg_queues[sock].get_nowait()                sock.send(msg)            else:                epoll.modify(sock, READ_ONLY)        elif flag & select.EPOLLERR:            epoll.unregister(sock)            sock.close()            del msg_queues[sock]

    epoll解决了select的三个缺点,是目前最好的IO多路复用解决方案。为了更好地理解epoll,我们来看一下linux内核api的用法。

    int epoll_create(int size)//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)//注册事件,每个fd只拷贝一次。int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)/*等待IO事件,事件发生时,内核调用回调函数,把就绪fd放入就绪链表中,并唤醒epoll_wait,epoll_wait只需要遍历就绪链表即可,而select和poll都是遍历所有fd,这效率高下立判。*/

IO多路复用深入浅出