首页 > 代码库 > redis源码分析(4)——发送响应内容
redis源码分析(4)——发送响应内容
前一篇介绍了redis处理请求的过程,接下来是如何发送响应内容。
在请求处理完之后,进行响应时,需要调用addReplyXXX族函数,具体包括:
void addReply(redisClient *c, robj *obj)
void addReplySds(redisClient *c, sds s)
void addReplyString(redisClient *c, char *s, size_t len)
这几个函数又会被封装成addReplyBulk等函数。这里以addReply为例进行分析。
首先,先介绍一下北京知识。redis将响应内容组织成两部分,一个固定大小的buffer(16KB),一个响应内容对象的链表。在链表为空并且buffer有足够空间时,则将响应添加到buffer中,否则创建一个节点追加到链表上。固定buffer和响应链表,整体上构成了一个队列。这么组织的好处是,既可以节省内存(不需一开始预先分配大块内存,动态内存的由链表cover),并且可以避免频繁分配、回收内存(在响应内容小于buffer大小时)。
在addReply函数中,首先会调用prepareClientToWrite,在响应前进行初始化工作。
if (prepareClientToWrite(c) != REDIS_OK) return;prepareClientToWrite函数在每次发送响应时调用,并且要在向响应buffer添加数据之前。只有在返回REDIS_OK时,才能向响应buffer输出内容。在响应buffer为空时(固定buffer以及链表均为空)时,向事件循环注册写事件,回调函数是sendReplyToClient。
/* This function is called every time we are going to transmit new data * to the client. The behavior is the following: * * If the client should receive new data (normal clients will) the function * returns REDIS_OK, and make sure to install the write handler in our event * loop so that when the socket is writable new data gets written. * * If the client should not receive new data, because it is a fake client, * a master, a slave not yet online, or because the setup of the write handler * failed, the function returns REDIS_ERR. * * Typically gets called every time a reply is built, before adding more * data to the clients output buffers. If the function returns REDIS_ERR no * data should be appended to the output buffers. */ int prepareClientToWrite(redisClient *c) { if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK; if ((c->flags & REDIS_MASTER) && !(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR; if (c->fd <= 0) return REDIS_ERR; /* Fake client */ // <MM> // 在没有向output buf输出之前,才可以注册write event handler // </MM> if (c->bufpos == 0 && listLength(c->reply) == 0 && (c->replstate == REDIS_REPL_NONE || c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) == AE_ERR) return REDIS_ERR; return REDIS_OK; }接下来,需要将响应内容添加到output buffer中。总体思路是,先尝试向固定buffer添加,添加失败的话,在尝试添加到响应链表。
if (obj->encoding == REDIS_ENCODING_RAW) { if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) _addReplyObjectToList(c,obj);先看一下添加到固定buffer的逻辑。这个函数很简单,主要检查两个地方,一是输出链表是否为空,二是buffer的剩余空间是否足够大。如果两个都满足,则可以添加到buffer。最后,拷贝内容,并更新bufpos(用于指向buffer中下一个空闲位置)。
int _addReplyToBuffer(redisClient *c, char *s, size_t len) { size_t available = sizeof(c->buf)-c->bufpos; if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK; /* If there already are entries in the reply list, we cannot * add anything more to the static buffer. */ if (listLength(c->reply) > 0) return REDIS_ERR; /* Check that the buffer has enough space available for this string. */ if (len > available) return REDIS_ERR; memcpy(c->buf+c->bufpos,s,len); c->bufpos+=len; return REDIS_OK; }接下来是添加到输出链表函数_addReplyObjectToList。逻辑也比较简单,就是链表追加的操作。如果链表为空的话,直接在尾部添加一个节点。redisClient->reply_bytes记录输出链表总大小,这里更新一下。
if (listLength(c->reply) == 0) { incrRefCount(o); listAddNodeTail(c->reply,o); c->reply_bytes += zmalloc_size_sds(o->ptr); }
如果链表不为空,首先会尝试将响应内容追加到尾节点,否则创建一个新的节点追加到链表尾部。
如果尾节点和当前响应内容大小之和小于REDIS_REPLY_CHUNK_BYTES(16KB),则合并成一个节点。这么做可以提高链表的有效载荷(有效数据大小/链表元数据大小),节省内存。
如果尾节点和当前响应内容大小之和小于REDIS_REPLY_CHUNK_BYTES(16KB),则合并成一个节点。这么做可以提高链表的有效载荷(有效数据大小/链表元数据大小),节省内存。
/* Append to this object when possible. */ if (tail->ptr != NULL && sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES) { c->reply_bytes -= zmalloc_size_sds(tail->ptr); tail = dupLastObjectIfNeeded(c->reply); tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr)); c->reply_bytes += zmalloc_size_sds(tail->ptr); }如果超过REDIS_REPLY_CHUNK_BYTES,则新增一个节点,并追加到尾部。
else { incrRefCount(o); listAddNodeTail(c->reply,o); c->reply_bytes += zmalloc_size_sds(o->ptr); }如果不限制客户端的响应buffer,那么大量的活跃客户端就会导致redis的内存爆掉(比如在操作频繁的实例上,打开了很多的monitor客户端)。所以需要限制响应buffer的大小,在超过限制时,关闭该客户端。_addReplyObjectToList的最后一行代码就是完成这个功能的。
asyncCloseClientOnOutputBufferLimitReached(c);这个函数就是检查一下整个reply链表的内存大小是否超过软限制或硬限制。然后异步关闭该客户端(添加到链表,在serverCron进行关闭),之所以异步关闭,是因为没有立即停止请求处理,后续还有可能向响应buffer输出内容。具体的条件是大小超过硬限制,或者间隔n秒连续超过软限制。
/* Asynchronously close a client if soft or hard limit is reached on the * output buffer size. The caller can check if the client will be closed * checking if the client REDIS_CLOSE_ASAP flag is set. * * Note: we need to close the client asynchronously because this function is * called from contexts where the client can‘t be freed safely, i.e. from the * lower level functions pushing data inside the client output buffers. */ void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) { // <MM> // assert不超过4GB - 64KB // </MM> redisAssert(c->reply_bytes < ULONG_MAX-(1024*64)); if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return; if (checkClientOutputBufferLimits(c)) { sds client = catClientInfoString(sdsempty(),c); freeClientAsync(c); redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client); sdsfree(client); } }上面就是响应内容输出到buffer的过程,下面看一下发送buffer的过程。
之前介绍过,在每次响应之前都需要调用prepareClientToWrite函数,进行响应前的准备。这个函数中会为客户端添加写事件处理函数sendReplyToClient,在下一次事件循环时会调用。也就是说,请求的处理和响应的发送是在两个事件循环中完成的。之后介绍AOF时,会知道通过在两次事件循环之间调用beforeSleep,以完成AOF的输出。
sendReplyToClient函数就是按顺序将响应buffer中的内容,发送出去,内部是一个while循环,条件是固定buffer或者响应链表不为空。
while(c->bufpos > 0 || listLength(c->reply)) { ….. }循环内部首先是尝试发送固定buffer,totwritten记录一次sendReplyToClient函数发送数据的总大小。redisClient->sentlen在发送固定buffer和链表时是复用的。在发送固定buffer时,如果sentlen等于bufpos,则固定buffer发送完,接下来发送reply链表。
if (c->bufpos > 0) { // <MM> // buffer不空 // </MM> nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; /* If the buffer was sent, set bufpos to zero to continue with * the remainder of the reply. */ if (c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; } }发送reply链表时,会从链表头部开始依次发送。首先,取头结点,计算对象的长度。
// <MM> // 将reply链表的头结点发送 // </MM> o = listNodeValue(listFirst(c->reply)); objlen = sdslen(o->ptr); objmem = zmalloc_size_sds(o->ptr);如果节点内容为空,则直接将该节点删除。
if (objlen == 0) { listDelNode(c->reply,listFirst(c->reply)); continue; }接下来调用write系统调用,sentlen表示当前该对象已发送的长度。每次调用write发送尚未发送的内容(objlen - c->sentlen)。同样,这里需要更新totwritten。
nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten;如果sentlen==objlen,说明这个reply节点的数据发送完成,需要删除该节点。
/* If we fully sent the object on head go to the next one */ if (c->sentlen == objlen) { listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; c->reply_bytes -= objmem; }为了所有客户端的公平,redis限制每次事件循环中,每个客户端最大发送REDIS_MAX_WRITE_PER_EVENT(64KB)。但是,这也有例外,如果当前redis设置了最大内存,并且已经超过该限制,这里会尽量多得发送数据,以尽快释放空间。
/* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT * bytes, in a single threaded server it‘s a good idea to serve * other clients as well, even if a very large request comes from * super fast link that is always able to accept data (in real world * scenario think about ‘KEYS *‘ against the loopback interface). * * However if we are over the maxmemory limit we ignore that and * just deliver as much data as it is possible to deliver. */ if (totwritten > REDIS_MAX_WRITE_PER_EVENT && (server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory)) break;跳出while循环,首先会判断write是否出错。如果出错,记录日志并停止这个客户端。
if (nwritten == -1) { if (errno == EAGAIN) { nwritten = 0; } else { redisLog(REDIS_VERBOSE, "Error writing to client: %s", strerror(errno)); freeClient(c); return; } }如果发送出数据,则认为该客户端还是活跃的更新一下redisClient->lastinteraction(用于清理timeout的客户端)。
if (totwritten > 0) { /* For clients representing masters we don‘t count sending data * as an interaction, since we always send REPLCONF ACK commands * that take some time to just fill the socket output buffer. * We just rely on data / pings received for timeout detection. */ if (!(c->flags & REDIS_MASTER)) c->lastinteraction = server.unixtime; }最后,判断一下响应内容是不是全部发送完。如果发送完,则将删除写事件。否则,下一轮事件循环继续发送响应内容,指导完成为止。
// <MM> // 当client的响应buf输出完毕后,删除对应的write event // </MM> if (c->bufpos == 0 && listLength(c->reply) == 0) { c->sentlen = 0; aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c); }请求和响应的过程已经完成,下一篇介绍一下AOF。
redis源码分析(4)——发送响应内容
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。