首页 > 代码库 > I/O复用之select

I/O复用之select

概述

很多时候我们需要一个进程有着一种让内核一旦发现进程指定的一个或多个I/O条件就绪就通知进程的能力,这种能力就叫做I/O复用

首先Unix下可用的I/O模型共有5种:

  • 阻塞式I/O
    即等待的数据如果没到那么就一直沉睡,直到需要的数据到达或者发生错误才返回。
    技术分享

  • 非阻塞式I/O

    当所请求的I/O操作非得把本进程投入睡眠才能完成时,不要把本进程投入睡眠,而是返回一个错误。

技术分享

  • I/O复用
    阻塞在I/O复用的系统调用上,而不是阻塞在真正的I/O系统调用上
    技术分享
    图中进程阻塞与select调用,等数据报套接字变为可读时,select就会返回可读提示,于是调用相应的真正的I/O系统调用将数据进行复制。

  • 信号驱动式I/O
    以后补充

  • 异步I/O
    以后补充

其中前四种I/O模型都是同步I/O模型,即其中真正的I/O操作会阻塞进程,最后一种异步I/O模型则与异步I/O相匹配

异步I/O:当一个异步过程调用发出后,调用者不能立刻得到结果,实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。使用异步I/O,在发起I/O请求到实际使用数据这段时间内,程序还可以继续做其他事情。

select函数

该函数允许进程指示内核等待多个时间中的任何一个发生,并只在有一个或多个时间发生或经历一段指定的时间后才唤醒它。

具体调用代码

#include <sys/select.h>
#include <sys/time.h>
int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *execptset, const struct timeval *timeout);
//返回:如果有了就绪描述符就返回其数目,超时则返回0,出错返回-1

struct timeval{
    long tv_sec;//秒数
    long tv_usec;//微秒数
}

参数解析:

  • int maxfdp1
    指定待测试的描述符个数,值为待测试的最大描述符加1。
    这个参数的存在是为了提高效率,每个fd_set中都有表示大量描述符的空间,但是一个普通进程用的数量却很少,内核通过在进程与内核之间不复制描述符不必要的部分,从而不测试总为0的那些位来提高效率。

  • fd_set *readset、fd_set *writeset、fd_set*execptset
    指定要让内核测试读、写以及异常条件的描述符。
    对于异常条件的支持只有:1、某个套接字的带外数据到达;2、某个已置为分组模式的伪终端存在可从其主端读取的控制状态信息()。

  • struct timeval *timeout
    告知内核等待所指定描述符中的任何一个就绪可花多长时间。
    (1)永远等下去,只在有一个描述符准备好I/O时才返回,那么设为NULL
    (2)等待一段固定时间,情形和第一个一样,但是不超过由该参数指向timeval结构中指定的秒数和微秒数
    (3)根本不等待,检查描述符后立即返回,称为轮询,使参数指向一个定时器值为0的timeval结构
    select等待的状况会被进程在等待期间捕获的信号中断,之后从信号处理函数处返回,并不会默认自动重启,另外select的返回的值是EINTR错误。

    关于fd_set

/*
 * Those macros may have been defined in <gnu/types.h>. But we always
 * use the ones here. 
 */
#undef __NFDBITS
#define __NFDBITS   (8 * sizeof(unsigned long))/*无符号长整形4字节,总计32位*/

#undef __FD_SETSIZE
#define __FD_SETSIZE    1024

#undef __FDSET_LONGS
#define __FDSET_LONGS   (__FD_SETSIZE/__NFDBITS)/*1024/32=32*/

#undef __FDELT
#define __FDELT(d)  ((d) / __NFDBITS)

#undef __FDMASK
#define __FDMASK(d) (1UL << ((d) % __NFDBITS))

typedef struct {
    unsigned long fds_bits [__FDSET_LONGS];/*32个元素的数组*/
} __kernel_fd_set;

以上是在Linux内核中对fd_set的具体定义,可以看到在Linux中fd_set是一个结构体,结构体内容具体为长度为32(1024/32)的无符号长整形数组,每个无符号长整形数据的位数为32,于是整个fd_set的长度位数为1024位,这也是默认情况下select能够检测的描述符的最大个数。(据描述这个数可以增大,但是可能会出现一些不可预料的情况)
具体来说,当我们需要检测{0、1、4、5}这几个描述符的读是否就绪时,就需要将readset参数中的相应位数置为1,从而让select能够对它们进行检测,另外select进行检测时,并不是按照我们制定的这几个描述符进行检测的,实际上它会根据第一个参数的maxdfp1来具体指定要检测多少个描述符,于是选择从fd_set中第0位开始,一直到指定的maxdfp1-1位检测。

select具体实现

对于select的实现,实际上select具体调用情况如下
sys_select => core_sys_select => do_select => poll => sock_poll => tcp_poll
我只读了其中比较重要的部分,版本为2.6.32,其中最为重要的就是do_select,它具体体现了select的实现机制

/*fs/Select.c/sys_select*/

SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
        fd_set __user *, exp, struct timeval __user *, tvp)/*select的实现,从用户空间拷贝各个fd_set到内核空间*/
{
    struct timespec end_time, *to = NULL;
    struct timeval tv;
    int ret;

    if (tvp) {/*如果有设定超时*/
        if (copy_from_user(&tv, tvp, sizeof(tv)))/*从tvp拷贝至tv,从用户空间拷贝数据至内核空间*/
            return -EFAULT;

        to = &end_time;
        if (poll_select_set_timeout(to,/*配置超时时间,即to中保存的是超时时间*/
                tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),/* 微秒数/每秒多少微秒 */
                (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))/* 秒数*每秒多少微秒*每微秒多少纳秒 */
            return -EINVAL;
    }
    /*到上面为止,简单来说,处理了时间方面的问题,判断三种时间的状况,决定select的执行时间*/
    ret = core_sys_select(n, inp, outp, exp, to);/*核心代码,调用core_sys_select,inp为读集,outp为写集,exp为异常集*/
    ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);
    /*将内核空间中关于离超时时间还剩多少时间的结构体数据拷贝至用户空间,以及一些关于select特殊的处理,因为这里所以造成了select中三个参数的值-结果参数*/

    return ret;/*文件描述符个数*/
}

以上为sys_select的实现,可以看出其主要对调用select传入的时间进行了具体的操作,并调用core_sys_select进行下面的工作,另外它还负责计算出离超时还剩多少时间并拷贝进入传入时间的结构体中。

/*fs/Select.c/core_sys_select*/

int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
               fd_set __user *exp, struct timespec *end_time)
{
    fd_set_bits fds;/*用于指向描述符集的结构体,结构体内总计6个指针,对、写、异常都有两个集,一个是输入,一个用于输出*/
    void *bits;
    int ret, max_fds;
    unsigned int size;
    struct fdtable *fdt;
    /* Allocate small arguments on the stack to save memory and be faster */
    long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];/*在栈中分配的空间,256/4=64  为什么是256?*/

    ret = -EINVAL;
    if (n < 0)/*没有文件描述符,返回出错*/
        goto out_nofds;

    /* max_fds can increase, so grab it once to avoid race */
    rcu_read_lock();/*读锁,安全获取文件描述符,不允许写操作修改*/
    fdt = files_fdtable(current->files);/*获取当前进程的文件描述符表*/
    max_fds = fdt->max_fds;/*最大文件描述符的具体数值数*/
    rcu_read_unlock();/*获取完成后解锁*/
    if (n > max_fds)/*判断如果当前进程中文件描述符的数目没有用户所传入的数目那么多的话,进行修改*/
        n = max_fds;

    /*
     * We need 6 bitmaps (in/out/ex for both incoming and outgoing),
     * since we used fdset we need to allocate memory in units of
     * long-words. 
     */
    size = FDS_BYTES(n);/*根据n的大小来计算出每个位图存下所有的需要检测的描述符需要多少字节*/
    bits = stack_fds;
    /*判断stack_fds的空间大小够不够6个fd_set(3个fd_set的输入输出),不够的话就用kmalloc进行重新分配*/
    if (size > sizeof(stack_fds) / 6) {
        /* Not enough space in on-stack array; must use kmalloc */
        ret = -ENOMEM;
        bits = kmalloc(6 * size, GFP_KERNEL);
        if (!bits)
            goto out_nofds;
    }
    /*分配每个位图指定的内存空间,指针指向就行了,体现了之前定义fd_set_bits结构的作用,使得6个位图很清楚*/
    fds.in      = bits;
    fds.out     = bits +   size;
    fds.ex      = bits + 2*size;
    fds.res_in  = bits + 3*size;
    fds.res_out = bits + 4*size;
    fds.res_ex  = bits + 5*size;

    if ((ret = get_fd_set(n, inp, fds.in)) ||/*调用了copy_from_user从用户空间拷贝fd_set*/
        (ret = get_fd_set(n, outp, fds.out)) ||
        (ret = get_fd_set(n, exp, fds.ex)))
        goto out;
    zero_fd_set(n, fds.res_in);/*清0记录结果的内存空间*/
    zero_fd_set(n, fds.res_out);
    zero_fd_set(n, fds.res_ex);

    ret = do_select(n, &fds, end_time);/*核心代码,调用do_select,select的实现原理,甩锅*/

    if (ret < 0)/*ret小于0的情况,出错*/
        goto out;
    if (!ret) {/*针对ret返回值为0的情况*/
        ret = -ERESTARTNOHAND;/*这里首先将标志置上,这个标志的意思是被调用被中断,下面判断是否有待处理信号要处理,有的话就退出,显示的状态就会是ERESTARTNOHAND的状态,对应之前的sys_select中返回EINTR,与应用的描述相对应*/
        if (signal_pending(current))/*对于当前进程有信号要处理的情况下*/
            goto out;
        ret = 0;/*没有待处理信号,即不需要处理其他事件,就将刚置上的状态清0*/
    }

    if (set_fd_set(n, inp, fds.res_in) ||
        set_fd_set(n, outp, fds.res_out) ||
        set_fd_set(n, exp, fds.res_ex))/*刚刚两种情况都没发生,那么就代表运行成功,将输出的3个位图拷贝至用户空间*/
        ret = -EFAULT;

out:
    if (bits != stack_fds)
        kfree(bits);/*如果前面因空间不够大而调用了kmalloc分配了空间后,用完释放*/
out_nofds:
    return ret;
}

core_sys_select函数中,主要负责处理描述符集,计算出它们需要的空间并进行分配空间,将数据从用户空间拷贝进入内核空间,这里实际上用到了6个位图,3个用于输出(res_in、res_out、res_ex),3个用于输入(in、out、ex),并调用了do_select进行具体操作,对其返回值进行具体判断从而针对不同情况进行对应的操作

接下来就是最主要的do_select部分了

/*fs/Select.c/do_select*/

int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
    ktime_t expire, *to = NULL;
    struct poll_wqueues table;/*辅助实现fd轮询的数据结构*/
    poll_table *wait;
    int retval, i, timed_out = 0;
    unsigned long slack = 0;

    rcu_read_lock();
    retval = max_select_fd(n, fds);/*检查用户打开的fd,与位图上对应的fd都必须打开,返回为最大fd*/
    rcu_read_unlock();

    if (retval < 0)
        return retval;
    n = retval;

    poll_initwait(&table);/*初始化poll_wqueues中的6个成员,最重要是其中初始化了poll_wqueues.poll_table.qproc的函数指针,这在后面调用具体的poll中要调用poll_wait中要使用的函数*/
    wait = &table.pt;
    if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
        wait = NULL;
        timed_out = 1;/*判断系统的时间,如果系统中传入的时间为0,那么就设置为1,即select不会进行阻塞,直接返回*/
    }

    if (end_time && !timed_out)/*系统传入了时间的结构体且其中的时间不为0就代表需要有超时时间的设定,这里将数据进行最后的转换方便使用*/
        slack = estimate_accuracy(end_time);/*对超时时间进行了转换?*/

    retval = 0;/*用于保存已经准备好的描述符数,初始为0*/
    for (;;) {
        unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;/*无符号长整形一次跳4字节,即32位*/

        set_current_state(TASK_INTERRUPTIBLE);/*设置可中断的睡眠状态*/

        inp = fds->in; outp = fds->out; exp = fds->ex;/*输入的描述符集*/
        rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;/*输出的描述符集*/

        for (i = 0; i < n; ++rinp, ++routp, ++rexp) {/*对总共要检测的n个描述符的循环,每次循环将输出的位置向后移32位,即跳过4个字节,与输入的位图的位置相对应*/
            unsigned long in, out, ex, all_bits, bit = 1, mask, j;
            unsigned long res_in = 0, res_out = 0, res_ex = 0;
            const struct file_operations *f_op = NULL;
            struct file *file = NULL;

            in = *inp++; out = *outp++; ex = *exp++;/*取出了目前的循环里面32个文件描述符对应的位图,并对输入的三个描述符集自增,从而跳到下一个32位的位图上*/
            all_bits = in | out | ex;/*这里实际上在判断是否所有的都没有要检测的状态,或逻辑上为0即三个参数全为0*/
            if (all_bits == 0) {/*对于32个描述符中没有要检测的状态的情况,就设置当前检查的描述符的个数,并跳入下次循环*/
                i += __NFDBITS;/*定义循环的数量,每次查32个文件描述符,总共需要循环查几次*/
                continue;
            }

            for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {/*每次32个fd的循环中对于有需要检测的状态的存在,每次循环bit会左移一位*/
                int fput_needed;
                if (i >= n)/*检测是否超出了最大要检测的fd*/
                    break;
                if (!(bit & all_bits))/*测试每一位,bit初始为1,从0001开始向左每次移一位,与all_bits进行与逻辑操作,从而判断是否需要检测这一位代表的文件描述符,当结果为0,即此位代表的文件描述符不需要检测*/
                    continue;
                file = fget_light(i, &fput_needed);/*返回i对应的file结构体,将引用计数f_count+1,与后面的fput_light对应*/
                if (file) {/*如果file结构体存在*/
                    f_op = file->f_op;
                    mask = DEFAULT_POLLMASK;
                    if (f_op && f_op->poll) {/*这里socket中对应的是sock_poll*/
                        wait_key_set(wait, in, out, bit);
                        mask = (*f_op->poll)(file, wait);/*这里进入了以后将让本进程沉睡?直到poll执行成功了以后就会将本进程唤醒?*/
                    }
                    fput_light(file, fput_needed);/*释放file结构体,将引用计数-1*/
                    /*根据poll的结果,判断状态,对具体的位进行设置*/
                    if ((mask & POLLIN_SET) && (in & bit)) {/*对应的描述符可读*/
                        res_in |= bit;
                        retval++;/*最后返回的那个值是in、out、ex三个集合就绪的描述符的总和,因此每次都要retval++*/
                        wait = NULL;
                    }
                    if ((mask & POLLOUT_SET) && (out & bit)) {/*对应的描述符可写*/
                        res_out |= bit;
                        retval++;
                        wait = NULL;
                    }
                    if ((mask & POLLEX_SET) && (ex & bit)) {/*对应的描述符有异常*/
                        res_ex |= bit;
                        retval++;
                        wait = NULL;
                    }
                }
            }
            if (res_in)
                *rinp = res_in;/*将poll的结果写回至位图*/
            if (res_out)
                *routp = res_out;
            if (res_ex)
                *rexp = res_ex;
            cond_resched();/*不懂,要定义了need_resched标志后才能生效?作用为调度一个新程序投入运行?增加一个抢占点?*/
        }
        wait = NULL;
        if (retval || timed_out || signal_pending(current))/*有就绪的文件描述符or运行时间到了or有信号需要处理*//*也对应超时的情况下timeout设置为1了于是这里会直接结束,跳出死循环*/
            break;/*跳出死循环*/
        if (table.error) {/*等待队列出现问题?*/
            retval = table.error;
            break;
        }

        /*
         * If this is the first loop and we have a timeout
         * given, then we convert to ktime_t and set the to
         * pointer to the expiry value.
         */
        if (end_time && !to) {
            expire = timespec_to_ktime(*end_time);
            to = &expire;
        }

        /*select在第一次循环完成后,如果没有设置超时时间即立刻返回的情况下,不进入睡眠直接结束select*/
        if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,/*但是在设置了超时时间or一直阻塞的情况下,这里就会进入睡眠状态,之后如果超时了,那么就会返回0,如果是被唤醒了就会返回EINTR*/
                       to, slack))
            timed_out = 1;/*超时的情况下,设置为1,从而退出for这个死循环*/
    }

    poll_freewait(&table);/*清理驱动的等待队列?*/

    return retval;
}

具体的do_select中可以看出,存在for死循环,遍历所有指定好的描述符范围(0 - maxdfp1-1)内所有的位,每次循环获取32位的位图,然后其中如果有需要检测状态的文件描述符,就对其中每一位进行检测,即轮询,并进一步获取置1的位代表的描述符对应的file结构体,从file结构体的f_op->poll,找到具体指定的poll(socket具体指定的文件类型为socket_file_ops,其中poll处指定为sock_poll),之后需要将返回的状态值与三个宏定义(可读、可写、有异常)进行比较来确定描述符读、写or异常是否就绪。

对于do_select后面有一部分的操作仍有疑问,比如驱动方面关于等待队列具体如何实现、抢占点为什么要添加、sock_poll中如何查询等问题,不过对于select的实现方面有了大致的理解。

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    I/O复用之select