首页 > 代码库 > Redis源码分析(二十七)--- rio系统I/O的封装

Redis源码分析(二十七)--- rio系统I/O的封装

        I/O操作对于每个系统来说都是必不可少的一部分。而且I/O操作的好坏,在一定程度上也会影响着系统的效率问题。今天我学习了一下在Redis中的I/O是怎么处理的,同样的,Redis在他自己的系统中,也封装了一个I/O层。简称RIO。得先看看RIO中有什么东西喽:

struct _rio {
    /* Backend functions.
     * Since this functions do not tolerate short writes or reads the return
     * value is simplified to: zero on error, non zero on complete success. */
    /* 数据流的读方法 */
    size_t (*read)(struct _rio *, void *buf, size_t len);
    /* 数据流的写方法 */
    size_t (*write)(struct _rio *, const void *buf, size_t len);
    /* 获取当前的读写偏移量 */
    off_t (*tell)(struct _rio *);
    /* The update_cksum method if not NULL is used to compute the checksum of
     * all the data that was read or written so far. The method should be
     * designed so that can be called with the current checksum, and the buf
     * and len fields pointing to the new block of data to add to the checksum
     * computation. */
    /* 当读入新的数据块的时候,会更新当前的校验和 */
    void (*update_cksum)(struct _rio *, const void *buf, size_t len);

    /* The current checksum */
    /* 当前的校验和 */
    uint64_t cksum;

    /* number of bytes read or written */
    /* 当前读取的或写入的字节大小 */
    size_t processed_bytes;

    /* maximum single read or write chunk size */
    /* 最大的单次读写的大小 */
    size_t max_processing_chunk;

    /* Backend-specific vars. */
    /* rio中I/O变量 */
    union {
    	//buffer结构体
        struct {
        	//buffer具体内容
            sds ptr;
            //偏移量
            off_t pos;
        } buffer;
        //文件结构体
        struct {
            FILE *fp;
            off_t buffered; /* Bytes written since last fsync. */
            //同步的最小大小
            off_t autosync; /* fsync after 'autosync' bytes written. */
        } file;
    } io;
};
里面除了3个必须的方法,read,write方法,还有获取偏移量的tell方法,还有2个结构体变量,一个buffer结构体,一个file结构体,作者针对不同的I/O情况,做了不同的处理,当执行临时的I/O操作时,都与rio.buffer打交道,当与文件进行I/O操作时,则执行与rio.file之间的操作。下面看看rio统一定义的读写方法:

/* The following functions are our interface with the stream. They'll call the
 * actual implementation of read / write / tell, and will update the checksum
 * if needed. */
/* rio的写方法 */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
    while (len) {
    	//判断当前操作字节长度是否超过最大长度
        size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
        //写入新的数据时,更新校验和
        if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
        //执行写方法
        if (r->write(r,buf,bytes_to_write) == 0)
            return 0;
        buf = (char*)buf + bytes_to_write;
        len -= bytes_to_write;
        //操作字节数增加
        r->processed_bytes += bytes_to_write;
    }
    return 1;
}

/* rio的读方法 */
static inline size_t rioRead(rio *r, void *buf, size_t len) {
    while (len) {
    	//判断当前操作字节长度是否超过最大长度
        size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
        //读数据方法
        if (r->read(r,buf,bytes_to_read) == 0)
            return 0;
        //读数据时,更新校验和
        if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
        buf = (char*)buf + bytes_to_read;
        len -= bytes_to_read;
        r->processed_bytes += bytes_to_read;
    }
    return 1;
}
这里有一个比较不错的地方,每次当有数据发生改变的时候,Redis都会做一个计算校验和的处理算法,表明了数据操作的改变动作,用的算法就是之前介绍过CRC64算法,针对RIO的buffer IO和File IO,Redis定义了2个RIO结构体:

/* 根据上面描述的方法,定义了BufferRio */
static const rio rioBufferIO = {
    rioBufferRead,
    rioBufferWrite,
    rioBufferTell,
    NULL,           /* update_checksum */
    0,              /* current checksum */
    0,              /* bytes read or written */
    0,              /* read/write chunk size */
    { { NULL, 0 } } /* union for io-specific vars */
};

/* 根据上面描述的方法,定义了FileRio */
static const rio rioFileIO = {
    rioFileRead,
    rioFileWrite,
    rioFileTell,
    NULL,           /* update_checksum */
    0,              /* current checksum */
    0,              /* bytes read or written */
    0,              /* read/write chunk size */
    { { NULL, 0 } } /* union for io-specific vars */
};
里面分别定义了相对应的读写方法,比如buffer的Read方法和File的Read方法:

/* Returns 1 or 0 for success/failure. */
/* 读取rio中的buffer内容到传入的参数 */
static size_t rioBufferRead(rio *r, void *buf, size_t len) {
    if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
        return 0; /* not enough buffer to return len bytes. */
    memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
    r->io.buffer.pos += len;
    return 1;
}
/* Returns 1 or 0 for success/failure. */
/* 读取rio中的fp文件内容 */
static size_t rioFileRead(rio *r, void *buf, size_t len) {
    return fread(buf,len,1,r->io.file.fp);
}
作用的rio的对象变量不一样,最后在Redis的声明中给出了4种不同类型数据的写入方法:

/* rio写入不同类型数据方法,最终调用的是riowrite方法 */
size_t rioWriteBulkCount(rio *r, char prefix, int count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);
size_t rioWriteBulkLongLong(rio *r, long long l);
size_t rioWriteBulkDouble(rio *r, double d);
举其中的一个方法实现:

/* Write multi bulk count in the format: "*<count>\r\n". */
/* rio写入不同类型数据方法,调用的是riowrite方法 */
size_t rioWriteBulkCount(rio *r, char prefix, int count) {
    char cbuf[128];
    int clen;

    cbuf[0] = prefix;
    clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
    cbuf[clen++] = '\r';
    cbuf[clen++] = '\n';
    if (rioWrite(r,cbuf,clen) == 0) return 0;
    return clen;
}
调用的还是里面的rioWrite方法,根据你定义的是buffer IO还是File IO,.各自有各自不同的实现而已。在文件的write方法时,有一个细节,当你把内容读入到rio.file.buffer时,buffer超过给定的同步最小字节,你得必须将buffer内容刷新到文件中了。

/* Returns 1 or 0 for success/failure. */
/* 将buf写入rio中的file文件中 */
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
    size_t retval;

    retval = fwrite(buf,len,1,r->io.file.fp);
    r->io.file.buffered += len;

    if (r->io.file.autosync &&
        r->io.file.buffered >= r->io.file.autosync)
    {
    	//判读是否需要同步
        fflush(r->io.file.fp);
        aof_fsync(fileno(r->io.file.fp));
        r->io.file.buffered = 0;
    }
    return retval;
}

Redis源码分析(二十七)--- rio系统I/O的封装