首页 > 代码库 > IO多路复用及ThreadingTCPServer源码阅读

IO多路复用及ThreadingTCPServer源码阅读

IO多路复用

socket模块是阻塞的,通过socket建立的服务端可以接收多个请求,但只能同时处理一个请求,其他请求都被阻塞。可以通过IO多路复用解决这个问题,socketserver内部使用的就是IO多路复用以及多线程和多进程。

IO多路复用就是指通过一种机制可以监视多个描述符,一旦某个描述符就绪(读写就绪),就通知程序进行相应读写操作。

Linux中的select,poll,epoll都是IO多路复用的机制。

select
 
select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。
 
poll
 
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。
poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。
 
epoll
 
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

Python

python中有一个select模块,其中提供了:select,poll,epoll三个方法,分别调用系统的select,poll和epoll从而实现IO多路复用。

1 Windows Python:
2     提供: select
3 Mac Python:
4     提供: select
5 Linux Python:
6     提供: select、poll、epoll

谈一下select方法:

句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超时时间)
 
参数: 可接受四个参数(前三个必须)
返回值:三个列表
 
select方法用来监视文件句柄,如果句柄发生变化,则获取该句柄。
1、当 参数1 序列中的句柄发生可读时(accetp和read),则获取发生变化的句柄并添加到 返回值1 序列中
2、当 参数2 序列中含有句柄时,则将该序列中所有的句柄添加到 返回值2 序列中
3、当 参数3 序列中的句柄发生错误时,则将该发生错误的句柄添加到 返回值3 序列中
4、当 超时时间 未设置,则select会一直阻塞,直到监听的句柄发生变化
   当 超时时间 = 1时,那么如果监听的句柄均无任何变化,则select会阻塞 1 秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。

例子:
通过select和socket模块实现的伪IO多路复用,客户端输入什么,服务端就返回response + 客户端输入内容

技术分享
#!/usr/bin/env python
# coding=utf-8

import socket
import select

sk = socket.socket()
sk.bind((127.0.0.1, 9999, ))
sk.listen(5)
inputs = [sk, ]
messages = {}
# messages = {
#  hexm: [消息1,消息2】
#  zhuxj: [消息1,消息2】
# }
# inputs = [sk, hexm, zhuxj, ly] # 服务端sk,客户端对象
outputs = []
while True:
    # sk监听哪个对象,只要有变化,新连接来了,rlist = [sk], 否则rlist=[], 如果一个连接sk来了,rlist=[sk],如果两个sk1,sk2同时来了,rlist=【sk1,sk2】
    # 1 超时时间

    # 监听sk(服务器端)对象,如果sk对象发生变化,表示有客户端来连接了,此时rlist值为服务端[sk]
    # 监听conn对象,如果conn发生变化,表示客户端有新消息发送过来,此时rlist值为[客户端]
    rlist, wlist, e = select.select(inputs, outputs, [], 1)
    # wlist所有给我发消息的人
    # r就是sk
    # rlist = [hexm]
    # rlist = [zhuxij, ly]
    print(len(inputs), len(rlist), len(outputs)) # inputs里面多少对象, rlist表示多少客户端对象发生变化
    # 只有连接进来才for循环,不然rlist一直为空
    for r in rlist:
        if r == sk:
            # 新客户端来连接
            print(r)
            # conn 是socket对象, 每个客户端的socket对象
            conn, address = r.accept()
            conn.sendall(bytes(hello, encoding=utf-8))
            # 新客户来连接,
            messages[conn] = []
            inputs.append(conn)
        else:
            print(-----)
            try:
                ret = r.recv(1024)
                if not ret:  # 接受空消息,主动抛出异常,断开连接
                    raise Exception(断开连接)
                else:
                    outputs.append(r)
                    # 客户发的消息加入这个客户的消息列表
                    messages[r].append(ret)
            # 有人发消息
            except Exception as e:
                # 断开连接,移除
                inputs.remove(r)
                del messages[r]  # 删除这个用户消息

    # 所有给我发过消息的人
    for w in wlist:
        msg = messages[w].pop()
        resp = msg + bytes(response, encoding=utf-8)
        w.sendall(resp)
        outputs.remove(w)
server
技术分享
#!/usr/bin/env python
# coding=utf-8

import socket
sk = socket.socket()
sk.connect(("127.0.0.1", 9999))
data = sk.recv(1024)
print(data)
while True:
    ret = input(">>>")
    print(ret)
    sk.sendall(bytes(ret, encoding=utf-8))
    print(sk.recv(1024))
sk.close()
View Code

此处的Socket服务端相比与原生的Socket,他支持当某一个请求不再发送数据时,服务器端不会等待而是可以去处理其他请求的数据。但是,如果每个请求的耗时比较长时,select版本的服务器端也无法完成同时操作。

ThreadingTCPServer

 

IO多路复用及ThreadingTCPServer源码阅读