首页 > 代码库 > redis源码学习(客户端)
redis源码学习(客户端)
大概介绍
redis 客户端设计主要是存储客户的链接,请求,请求解析的命令,执行结果。先看server的结构和client的结构,server里面有多个client,相当于一个服务端可以连多个客户端,服务端根据事件触发模式依次处理客户端的请求。
server结构
struct redisServer { /* General */ // 配置文件的绝对路径 char *configfile; /* Absolute config file path, or NULL */ // serverCron() 每秒调用的次数 int hz; /* serverCron() calls frequency in hertz */ // 数据库 redisDb *db; // 命令表(受到 rename 配置选项的作用) dict *commands; /* Command table */ // 命令表(无 rename 配置选项的作用) dict *orig_commands; /* Command table before command renaming. */ // 事件状态 aeEventLoop *el; // 最近一次使用时钟 unsigned lruclock:REDIS_LRU_BITS; /* Clock for LRU eviction */ // 关闭服务器的标识 int shutdown_asap; /* SHUTDOWN needed ASAP */ // 在执行 serverCron() 时进行渐进式 rehash int activerehashing; /* Incremental rehash in serverCron() */ // 是否设置了密码 char *requirepass; /* Pass for AUTH command, or NULL */ // PID 文件 char *pidfile; /* PID file path */ // 架构类型 int arch_bits; /* 32 or 64 depending on sizeof(long) */ // serverCron() 函数的运行次数计数器 int cronloops; /* Number of times the cron function run */ // 本服务器的 RUN ID char runid[REDIS_RUN_ID_SIZE+1]; /* ID always different at every exec. */ // 服务器是否运行在 SENTINEL 模式 int sentinel_mode; /* True if this instance is a Sentinel. */ /* Networking */ // TCP 监听端口 int port; /* TCP listening port */ int tcp_backlog; /* TCP listen() backlog */ // 地址 char *bindaddr[REDIS_BINDADDR_MAX]; /* Addresses we should bind to */ // 地址数量 int bindaddr_count; /* Number of addresses in server.bindaddr[] */ // UNIX 套接字 char *unixsocket; /* UNIX socket path */ mode_t unixsocketperm; /* UNIX socket permission */ // 描述符 int ipfd[REDIS_BINDADDR_MAX]; /* TCP socket file descriptors */ // 描述符数量 int ipfd_count; /* Used slots in ipfd[] */ // UNIX 套接字文件描述符 int sofd; /* Unix socket file descriptor */ int cfd[REDIS_BINDADDR_MAX];/* Cluster bus listening socket */ int cfd_count; /* Used slots in cfd[] */ // 一个链表,保存了所有客户端状态结构 list *clients; /* List of active clients */ // 链表,保存了所有待关闭的客户端 list *clients_to_close; /* Clients to close asynchronously */ // 链表,保存了所有从服务器,以及所有监视器 list *slaves, *monitors; /* List of slaves and MONITORs */ // 服务器的当前客户端,仅用于崩溃报告 redisClient *current_client; /* Current client, only used on crash report */ int clients_paused; /* True if clients are currently paused */ mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ // 网络错误 char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ // MIGRATE 缓存 dict *migrate_cached_sockets;/* MIGRATE cached sockets */ /* RDB / AOF loading information */ // 这个值为真时,表示服务器正在进行载入 int loading; /* We are loading data from disk if true */ // 正在载入的数据的大小 off_t loading_total_bytes; // 已载入数据的大小 off_t loading_loaded_bytes; // 开始进行载入的时间 time_t loading_start_time; off_t loading_process_events_interval_bytes; /* Fast pointers to often looked up command */ // 常用命令的快捷连接 struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand, *rpopCommand; /* Fields used only for stats */ // 服务器启动时间 time_t stat_starttime; /* Server start time */ // 已处理命令的数量 long long stat_numcommands; /* Number of processed commands */ // 服务器接到的连接请求数量 long long stat_numconnections; /* Number of connections received */ // 已过期的键数量 long long stat_expiredkeys; /* Number of expired keys */ // 因为回收内存而被释放的过期键的数量 long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */ // 成功查找键的次数 long long stat_keyspace_hits; /* Number of successful lookups of keys */ // 查找键失败的次数 long long stat_keyspace_misses; /* Number of failed lookups of keys */ // 已使用内存峰值 size_t stat_peak_memory; /* Max used memory record */ // 最后一次执行 fork() 时消耗的时间 long long stat_fork_time; /* Time needed to perform latest fork() */ // 服务器因为客户端数量过多而拒绝客户端连接的次数 long long stat_rejected_conn; /* Clients rejected because of maxclients */ // 执行 full sync 的次数 long long stat_sync_full; /* Number of full resyncs with slaves. */ // PSYNC 成功执行的次数 long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */ // PSYNC 执行失败的次数 long long stat_sync_partial_err;/* Number of unaccepted PSYNC requests. */ /* slowlog */ // 保存了所有慢查询日志的链表 list *slowlog; /* SLOWLOG list of commands */ // 下一条慢查询日志的 ID long long slowlog_entry_id; /* SLOWLOG current entry ID */ // 服务器配置 slowlog-log-slower-than 选项的值 long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ // 服务器配置 slowlog-max-len 选项的值 unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ size_t resident_set_size; /* RSS sampled in serverCron(). */ /* The following two are used to track instantaneous "load" in terms * of operations per second. */ // 最后一次进行抽样的时间 long long ops_sec_last_sample_time; /* Timestamp of last sample (in ms) */ // 最后一次抽样时,服务器已执行命令的数量 long long ops_sec_last_sample_ops; /* numcommands in last sample */ // 抽样结果 long long ops_sec_samples[REDIS_OPS_SEC_SAMPLES]; // 数组索引,用于保存抽样结果,并在需要时回绕到 0 int ops_sec_idx; /* Configuration */ // 日志可见性 int verbosity; /* Loglevel in redis.conf */ // 客户端最大空转时间 int maxidletime; /* Client timeout in seconds */ // 是否开启 SO_KEEPALIVE 选项 int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ int active_expire_enabled; /* Can be disabled for testing purposes. */ size_t client_max_querybuf_len; /* Limit for client query buffer length */ int dbnum; /* Total number of configured DBs */ int daemonize; /* True if running as a daemon */ // 客户端输出缓冲区大小限制 // 数组的元素有 REDIS_CLIENT_LIMIT_NUM_CLASSES 个 // 每个代表一类客户端:普通、从服务器、pubsub,诸如此类 clientBufferLimitsConfig client_obuf_limits[REDIS_CLIENT_LIMIT_NUM_CLASSES]; /* AOF persistence */ // AOF 状态(开启/关闭/可写) int aof_state; /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */ // 所使用的 fsync 策略(每个写入/每秒/从不) int aof_fsync; /* Kind of fsync() policy */ char *aof_filename; /* Name of the AOF file */ int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */ int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */ off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ // 最后一次执行 BGREWRITEAOF 时, AOF 文件的大小 off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ // AOF 文件的当前字节大小 off_t aof_current_size; /* AOF current size. */ int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ // 负责进行 AOF 重写的子进程 ID pid_t aof_child_pid; /* PID if rewriting process */ // AOF 重写缓存链表,链接着多个缓存块 list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */ // AOF 缓冲区 sds aof_buf; /* AOF buffer, written before entering the event loop */ // AOF 文件的描述符 int aof_fd; /* File descriptor of currently selected AOF file */ // AOF 的当前目标数据库 int aof_selected_db; /* Currently selected DB in AOF */ // 推迟 write 操作的时间 time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */ // 最后一直执行 fsync 的时间 time_t aof_last_fsync; /* UNIX time of last fsync() */ time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ // AOF 重写的开始时间 time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ // 最后一次执行 BGREWRITEAOF 的结果 int aof_lastbgrewrite_status; /* REDIS_OK or REDIS_ERR */ // 记录 AOF 的 write 操作被推迟了多少次 unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */ // 指示是否需要每写入一定量的数据,就主动执行一次 fsync() int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */ int aof_last_write_status; /* REDIS_OK or REDIS_ERR */ int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */ /* RDB persistence */ // 自从上次 SAVE 执行以来,数据库被修改的次数 long long dirty; /* Changes to DB from the last save */ // BGSAVE 执行前的数据库被修改次数 long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ // 负责执行 BGSAVE 的子进程的 ID // 没在执行 BGSAVE 时,设为 -1 pid_t rdb_child_pid; /* PID of RDB saving child */ struct saveparam *saveparams; /* Save points array for RDB */ int saveparamslen; /* Number of saving points */ char *rdb_filename; /* Name of RDB file */ int rdb_compression; /* Use compression in RDB? */ int rdb_checksum; /* Use RDB checksum? */ // 最后一次完成 SAVE 的时间 time_t lastsave; /* Unix time of last successful save */ // 最后一次尝试执行 BGSAVE 的时间 time_t lastbgsave_try; /* Unix time of last attempted bgsave */ // 最近一次 BGSAVE 执行耗费的时间 time_t rdb_save_time_last; /* Time used by last RDB save run. */ // 数据库最近一次开始执行 BGSAVE 的时间 time_t rdb_save_time_start; /* Current RDB save start time. */ // 最后一次执行 SAVE 的状态 int lastbgsave_status; /* REDIS_OK or REDIS_ERR */ int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ /* Propagation of commands in AOF / replication */ redisOpArray also_propagate; /* Additional command to propagate. */ /* Logging */ char *logfile; /* Path of log file */ int syslog_enabled; /* Is syslog enabled? */ char *syslog_ident; /* Syslog ident */ int syslog_facility; /* Syslog facility */ /* Replication (master) */ int slaveseldb; /* Last SELECTed DB in replication output */ // 全局复制偏移量(一个累计值) long long master_repl_offset; /* Global replication offset */ // 主服务器发送 PING 的频率 int repl_ping_slave_period; /* Master pings the slave every N seconds */ // backlog 本身 char *repl_backlog; /* Replication backlog for partial syncs */ // backlog 的长度 long long repl_backlog_size; /* Backlog circular buffer size */ // backlog 中数据的长度 long long repl_backlog_histlen; /* Backlog actual data length */ // backlog 的当前索引 long long repl_backlog_idx; /* Backlog circular buffer current offset */ // backlog 中可以被还原的第一个字节的偏移量 long long repl_backlog_off; /* Replication offset of first byte in the backlog buffer. */ // backlog 的过期时间 time_t repl_backlog_time_limit; /* Time without slaves after the backlog gets released. */ // 距离上一次有从服务器的时间 time_t repl_no_slaves_since; /* We have no slaves since that time. Only valid if server.slaves len is 0. */ // 是否开启最小数量从服务器写入功能 int repl_min_slaves_to_write; /* Min number of slaves to write. */ // 定义最小数量从服务器的最大延迟值 int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */ // 延迟良好的从服务器的数量 int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ /* Replication (slave) */ // 主服务器的验证密码 char *masterauth; /* AUTH with this password with master */ // 主服务器的地址 char *masterhost; /* Hostname of master */ // 主服务器的端口 int masterport; /* Port of master */ // 超时时间 int repl_timeout; /* Timeout after N seconds of master idle */ // 主服务器所对应的客户端 redisClient *master; /* Client that is master for this slave */ // 被缓存的主服务器,PSYNC 时使用 redisClient *cached_master; /* Cached master to be reused for PSYNC. */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ // 复制的状态(服务器是从服务器时使用) int repl_state; /* Replication status if the instance is a slave */ // RDB 文件的大小 off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ // 已读 RDB 文件内容的字节数 off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ // 最近一次执行 fsync 时的偏移量 // 用于 sync_file_range 函数 off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ // 主服务器的套接字 int repl_transfer_s; /* Slave -> Master SYNC socket */ // 保存 RDB 文件的临时文件的描述符 int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */ // 保存 RDB 文件的临时文件名字 char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */ // 最近一次读入 RDB 内容的时间 time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ int repl_serve_stale_data; /* Serve stale data when link is down? */ // 是否只读从服务器? int repl_slave_ro; /* Slave is read only? */ // 连接断开的时长 time_t repl_down_since; /* Unix time at which link with master went down */ // 是否要在 SYNC 之后关闭 NODELAY ? int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */ // 从服务器优先级 int slave_priority; /* Reported in INFO and used by Sentinel. */ // 本服务器(从服务器)当前主服务器的 RUN ID char repl_master_runid[REDIS_RUN_ID_SIZE+1]; /* Master run id for PSYNC. */ // 初始化偏移量 long long repl_master_initial_offset; /* Master PSYNC offset. */ /* Replication script cache. */ // 复制脚本缓存 // 字典 dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */ // FIFO 队列 list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */ // 缓存的大小 int repl_scriptcache_size; /* Max number of elements. */ /* Synchronous replication. */ list *clients_waiting_acks; /* Clients waiting in WAIT command. */ int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */ /* Limits */ int maxclients; /* Max number of simultaneous clients */ unsigned long long maxmemory; /* Max number of memory bytes to use */ int maxmemory_policy; /* Policy for key eviction */ int maxmemory_samples; /* Pricision of random sampling */ /* Blocked clients */ unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */ list *unblocked_clients; /* list of clients to unblock before next loop */ list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; int sort_alpha; int sort_bypattern; int sort_store; /* Zip structure config, see redis.conf for more information */ size_t hash_max_ziplist_entries; size_t hash_max_ziplist_value; size_t list_max_ziplist_entries; size_t list_max_ziplist_value; size_t set_max_intset_entries; size_t zset_max_ziplist_entries; size_t zset_max_ziplist_value; size_t hll_sparse_max_bytes; time_t unixtime; /* Unix time sampled every cron cycle. */ long long mstime; /* Like 'unixtime' but with milliseconds resolution. */ /* Pubsub */ // 字典,键为频道,值为链表 // 链表中保存了所有订阅某个频道的客户端 // 新客户端总是被添加到链表的表尾 dict *pubsub_channels; /* Map channels to list of subscribed clients */ // 这个链表记录了客户端订阅的所有模式的名字 list *pubsub_patterns; /* A list of pubsub_patterns */ int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an xor of REDIS_NOTIFY... flags. */ /* Cluster */ int cluster_enabled; /* Is cluster enabled? */ mstime_t cluster_node_timeout; /* Cluster node timeout. */ char *cluster_configfile; /* Cluster auto-generated config file name. */ struct clusterState *cluster; /* State of the cluster */ int cluster_migration_barrier; /* Cluster replicas migration barrier. */ /* Scripting */ // Lua 环境 lua_State *lua; /* The Lua interpreter. We use just one for all clients */ // 复制执行 Lua 脚本中的 Redis 命令的伪客户端 redisClient *lua_client; /* The "fake client" to query Redis from Lua */ // 当前正在执行 EVAL 命令的客户端,如果没有就是 NULL redisClient *lua_caller; /* The client running EVAL right now, or NULL */ // 一个字典,值为 Lua 脚本,键为脚本的 SHA1 校验和 dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */ // Lua 脚本的执行时限 mstime_t lua_time_limit; /* Script timeout in milliseconds */ // 脚本开始执行的时间 mstime_t lua_time_start; /* Start time of script, milliseconds time */ // 脚本是否执行过写命令 int lua_write_dirty; /* True if a write command was called during the execution of the current script. */ // 脚本是否执行过带有随机性质的命令 int lua_random_dirty; /* True if a random command was called during the execution of the current script. */ // 脚本是否超时 int lua_timedout; /* True if we reached the time limit for script execution. */ // 是否要杀死脚本 int lua_kill; /* Kill the script if true. */ /* Assert & bug reporting */ char *assert_failed; char *assert_file; int assert_line; int bug_report_start; /* True if bug report header was already logged. */ int watchdog_period; /* Software watchdog period in ms. 0 = off */ };client 结构
typedef struct redisClient { // 套接字描述符 int fd; // 当前正在使用的数据库 redisDb *db; // 当前正在使用的数据库的 id (号码) int dictid; // 客户端的名字 robj *name; /* As set by CLIENT SETNAME */ // 查询缓冲区 sds querybuf; // 查询缓冲区长度峰值 size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */ // 参数数量 int argc; // 参数对象数组 robj **argv; // 记录被客户端执行的命令 struct redisCommand *cmd, *lastcmd; // 请求的类型:内联命令还是多条命令 int reqtype; // 剩余未读取的命令内容数量 int multibulklen; /* number of multi bulk arguments left to read */ // 命令内容的长度 long bulklen; /* length of bulk argument in multi bulk request */ // 回复链表 list *reply; // 回复链表中对象的总大小 unsigned long reply_bytes; /* Tot bytes of objects in reply list */ // 已发送字节,处理 short write 用 int sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ // 创建客户端的时间 time_t ctime; /* Client creation time */ // 客户端最后一次和服务器互动的时间 time_t lastinteraction; /* time of the last interaction, used for timeout */ // 客户端的输出缓冲区超过软性限制的时间 time_t obuf_soft_limit_reached_time; // 客户端状态标志 int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */ // 当 server.requirepass 不为 NULL 时 // 代表认证的状态 // 0 代表未认证, 1 代表已认证 int authenticated; /* when requirepass is non-NULL */ // 复制状态 int replstate; /* replication state if this is a slave */ // 用于保存主服务器传来的 RDB 文件的文件描述符 int repldbfd; /* replication DB file descriptor */ // 读取主服务器传来的 RDB 文件的偏移量 off_t repldboff; /* replication DB file offset */ // 主服务器传来的 RDB 文件的大小 off_t repldbsize; /* replication DB file size */ sds replpreamble; /* replication DB preamble. */ // 主服务器的复制偏移量 long long reploff; /* replication offset if this is our master */ // 从服务器最后一次发送 REPLCONF ACK 时的偏移量 long long repl_ack_off; /* replication ack offset, if this is a slave */ // 从服务器最后一次发送 REPLCONF ACK 的时间 long long repl_ack_time;/* replication ack time, if this is a slave */ // 主服务器的 master run ID // 保存在客户端,用于执行部分重同步 char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */ // 从服务器的监听端口号 int slave_listening_port; /* As configured with: SLAVECONF listening-port */ // 事务状态 multiState mstate; /* MULTI/EXEC state */ // 阻塞类型 int btype; /* Type of blocking op if REDIS_BLOCKED. */ // 阻塞状态 blockingState bpop; /* blocking state */ // 最后被写入的全局复制偏移量 long long woff; /* Last write global replication offset. */ // 被监视的键 list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ // 这个字典记录了客户端所有订阅的频道 // 键为频道名字,值为 NULL // 也即是,一个频道的集合 dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ // 链表,包含多个 pubsubPattern 结构 // 记录了所有订阅频道的客户端的信息 // 新 pubsubPattern 结构总是被添加到表尾 list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ sds peerid; /* Cached peer ID. */ /* Response buffer */ // 回复偏移量 int bufpos; // 回复缓冲区 char buf[REDIS_REPLY_CHUNK_BYTES]; } redisClient;客户端的请求过程剖析
客户端连接server,请求命令,返回结果的整个过程
整个server都是有事件控制,client请求连接,发送请求,server接收,解析,执行,返回结果都是有事件控制。
先从事件创建,触发,到连接,客户端创建说起。
1)int main()里面的initServer函数
2)initServer函数// 为 TCP 连接关联连接应答(accept)处理器 // 用于接受并应答客户端的 connect() 调用 for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { redisPanic( "Unrecoverable error creating server.ipfd file event."); } }ipfd_count是服务监听端口的个数,为不同的监听端口创建不同的server fd
再使用各个不同的server fd创建文件事件,触发的函数为acceptTcpHandler(一旦有连接连接此其中一个监听端口,就会触发对应的event,就会触发acceptTcpHandler函数)
3)acceptTcpHandler函数/* * 创建一个 TCP 连接处理器 */ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[REDIS_IP_STR_LEN]; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); while(max--) { // accept 客户端连接 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) redisLog(REDIS_WARNING, "Accepting client connection: %s", server.neterr); return; } redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); // 为客户端创建客户端状态(redisClient) acceptCommonHandler(cfd,0); } }4)请求连接,创建客户端:
networking.c: acceptCommonHandler函数/* * TCP 连接 accept 处理器 */ #define MAX_ACCEPTS_PER_CALL 1000 static void acceptCommonHandler(int fd, int flags) { // 创建客户端 redisClient *c; if ((c = createClient(fd)) == NULL) { redisLog(REDIS_WARNING, "Error registering fd event for the new client: %s (fd=%d)", strerror(errno),fd); close(fd); /* May be already closed, just ignore errors */ return; } /* If maxclient directive is set and this is one client more... close the * connection. Note that we create the client instead to check before * for this condition, since now the socket is already set in non-blocking * mode and we can send an error for free using the Kernel I/O */ // 如果新添加的客户端令服务器的最大客户端数量达到了 // 那么向新客户端写入错误信息,并关闭新客户端 // 先创建客户端,再进行数量检查是为了方便地进行错误信息写入 if (listLength(server.clients) > server.maxclients) { char *err = "-ERR max number of clients reached\r\n"; /* That's a best effort error message, don't check write errors */ if (write(c->fd,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } // 更新拒绝连接数 server.stat_rejected_conn++; freeClient(c); return; } // 更新连接次数 server.stat_numconnections++; // 设置 FLAG c->flags |= flags; }5)命令请求处理器
networking.c:readQueryFromClient
redisClient *createClient(int fd) { // 分配空间 redisClient *c = zmalloc(sizeof(redisClient)); /* passing -1 as fd it is possible to create a non connected client. * This is useful since all the Redis commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ // 当 fd 不为 -1 时,创建带网络连接的客户端 // 如果 fd 为 -1 ,那么创建无网络连接的伪客户端 // 因为 Redis 的命令必须在客户端的上下文中使用,所以在执行 Lua 环境中的命令时 // 需要用到这种伪终端 if (fd != -1) { // 非阻塞 anetNonBlock(NULL,fd); // 禁用 Nagle 算法 anetEnableTcpNoDelay(NULL,fd); // 设置 keep alive if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); // 绑定读事件到事件 loop (开始接收命令请求) if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } //...... }createclient创建了一个请求可读的事件,事件的触发函数是readQueryFromClient,它就是命令请求处理器,下面可以看看它的功能:/* * 读取客户端的查询缓冲区内容 */ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = (redisClient*) privdata; int nread, readlen; size_t qblen; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); // 设置服务器的当前客户端 server.current_client = c; // 读入长度(默认为 16 MB) readlen = REDIS_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= REDIS_MBULK_BIG_ARG) { int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf); if (remaining < readlen) readlen = remaining; } // 获取查询缓冲区当前内容的长度 // 如果读取出现 short read ,那么可能会有内容滞留在读取缓冲区里面 // 这些滞留内容也许不能完整构成一个符合协议的命令, qblen = sdslen(c->querybuf); // 如果有需要,更新缓冲区内容长度的峰值(peak) if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; // 为查询缓冲区分配空间 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // 读入内容到查询缓存 nread = read(fd, c->querybuf+qblen, readlen); // 读入出错 if (nread == -1) { if (errno == EAGAIN) { nread = 0; } else { redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno)); freeClient(c); return; } // 遇到 EOF } else if (nread == 0) { redisLog(REDIS_VERBOSE, "Client closed connection"); freeClient(c); return; } if (nread) { // 根据内容,更新查询缓冲区(SDS) free 和 len 属性 // 并将 '\0' 正确地放到内容的最后 sdsIncrLen(c->querybuf,nread); // 记录服务器和客户端最后一次互动的时间 c->lastinteraction = server.unixtime; // 如果客户端是 master 的话,更新它的复制偏移量 if (c->flags & REDIS_MASTER) c->reploff += nread; } else { // 在 nread == -1 且 errno == EAGAIN 时运行 server.current_client = NULL; return; } // 查询缓冲区长度超出服务器最大缓冲区长度 // 清空缓冲区并释放客户端 if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClient(c); return; } // 从查询缓存重读取内容,创建参数,并执行命令 // 函数会执行到缓存中的所有内容都被处理完为止 processInputBuffer(c); server.current_client = NULL; }6)命令解析器把数据read到client的querybuf,根据querybuf信息,解析成命令
// 处理客户端输入的命令内容 void processInputBuffer(redisClient *c) { /* Keep processing while there is something in the input buffer */ // 尽可能地处理查询缓冲区中的内容 // 如果读取出现 short read ,那么可能会有内容滞留在读取缓冲区里面 // 这些滞留内容也许不能完整构成一个符合协议的命令, // 需要等待下次读事件的就绪 while(sdslen(c->querybuf)) { /* Return if clients are paused. */ // 如果客户端正处于暂停状态,那么直接返回 if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return; /* Immediately abort if the client is in the middle of something. */ // REDIS_BLOCKED 状态表示客户端正在被阻塞 if (c->flags & REDIS_BLOCKED) return; /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is * written to the client. Make sure to not let the reply grow after * this flag has been set (i.e. don't process more commands). */ // 客户端已经设置了关闭 FLAG ,没有必要处理命令了 if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; /* Determine request type when unknown. */ // 判断请求的类型 // 两种类型的区别可以在 Redis 的通讯协议上查到: // http://redis.readthedocs.org/en/latest/topic/protocol.html // 简单来说,多条查询是一般客户端发送来的, // 而内联查询则是 TELNET 发送来的 if (!c->reqtype) { if (c->querybuf[0] == '*') { // 多条查询 c->reqtype = REDIS_REQ_MULTIBULK; } else { // 内联查询 c->reqtype = REDIS_REQ_INLINE; } } // 将缓冲区中的内容转换成命令,以及命令参数 if (c->reqtype == REDIS_REQ_INLINE) { if (processInlineBuffer(c) != REDIS_OK) break; } else if (c->reqtype == REDIS_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != REDIS_OK) break; } else { redisPanic("Unknown request type"); } /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { /* Only reset the client when the command was executed. */ // 执行命令,并重置客户端 if (processCommand(c) == REDIS_OK) resetClient(c); } } }
7)命令处理器:/* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the * server for a bulk read from the client. * * 这个函数执行时,我们已经读入了一个完整的命令到客户端, * 这个函数负责执行这个命令, * 或者服务器准备从客户端中进行一次读取。 * * If 1 is returned the client is still alive and valid and * other operations can be performed by the caller. Otherwise * if 0 is returned the client was destroyed (i.e. after QUIT). * * 如果这个函数返回 1 ,那么表示客户端在执行命令之后仍然存在, * 调用者可以继续执行其他操作。 * 否则,如果这个函数返回 0 ,那么表示客户端已经被销毁。 */ int processCommand(redisClient *c) { /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in * a regular command proc. */ // 特别处理 quit 命令 if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= REDIS_CLOSE_AFTER_REPLY; return REDIS_ERR; } /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ // 查找命令,并进行命令合法性检查,以及命令参数个数检查 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { // 没找到指定的命令 flagTransaction(c); addReplyErrorFormat(c,"unknown command '%s'", (char*)c->argv[0]->ptr); return REDIS_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { // 参数个数错误 flagTransaction(c); addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name); return REDIS_OK; } /* Check if the user is authenticated */ // 检查认证信息 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { flagTransaction(c); addReply(c,shared.noautherr); return REDIS_OK; } /* If cluster is enabled perform the cluster redirection here. * * 如果开启了集群模式,那么在这里进行转向操作。 * * However we don't perform the redirection if: * * 不过,如果有以下情况出现,那么节点不进行转向: * * 1) The sender of this command is our master. * 命令的发送者是本节点的主节点 * * 2) The command has no key arguments. * 命令没有 key 参数 */ if (server.cluster_enabled && !(c->flags & REDIS_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) { int hashslot; // 集群已下线 if (server.cluster->state != REDIS_CLUSTER_OK) { flagTransaction(c); addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n")); return REDIS_OK; // 集群运作正常 } else { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); // 不能执行多键处理命令 if (n == NULL) { flagTransaction(c); if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) { addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) { /* The request spawns mutliple keys in the same slot, * but the slot is not "stable" currently as there is * a migration or import in progress. */ addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); } else { redisPanic("getNodeByQuery() unknown error."); } return REDIS_OK; // 命令针对的槽和键不是本节点处理的,进行转向 } else if (n != server.cluster->myself) { flagTransaction(c); // -<ASK or MOVED> <slot> <ip>:<port> // 例如 -ASK 10086 127.0.0.1:12345 addReplySds(c,sdscatprintf(sdsempty(), "-%s %d %s:%d\r\n", (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", hashslot,n->ip,n->port)); return REDIS_OK; } // 如果执行到这里,说明键 key 所在的槽由本节点处理 // 或者客户端执行的是无参数命令 } } /* Handle the maxmemory directive. * * First we try to free some memory if possible (if there are volatile * keys in the dataset). If there are not the only thing we can do * is returning an error. */ // 如果设置了最大内存,那么检查内存是否超过限制,并做相应的操作 if (server.maxmemory) { // 如果内存已超过限制,那么尝试通过删除过期键来释放内存 int retval = freeMemoryIfNeeded(); // 如果即将要执行的命令可能占用大量内存(REDIS_CMD_DENYOOM) // 并且前面的内存释放失败的话 // 那么向客户端返回内存错误 if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) { flagTransaction(c); addReply(c, shared.oomerr); return REDIS_OK; } } /* Don't accept write commands if there are problems persisting on disk * and if this is a master instance. */ // 如果这是一个主服务器,并且这个服务器之前执行 BGSAVE 时发生了错误 // 那么不执行写命令 if (((server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == REDIS_ERR) || server.aof_last_write_status == REDIS_ERR) && server.masterhost == NULL && (c->cmd->flags & REDIS_CMD_WRITE || c->cmd->proc == pingCommand)) { flagTransaction(c); if (server.aof_last_write_status == REDIS_OK) addReply(c, shared.bgsaveerr); else addReplySds(c, sdscatprintf(sdsempty(), "-MISCONF Errors writing to the AOF file: %s\r\n", strerror(server.aof_last_write_errno))); return REDIS_OK; } /* Don't accept write commands if there are not enough good slaves and * user configured the min-slaves-to-write option. */ // 如果服务器没有足够多的状态良好服务器 // 并且 min-slaves-to-write 选项已打开 if (server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag && c->cmd->flags & REDIS_CMD_WRITE && server.repl_good_slaves_count < server.repl_min_slaves_to_write) { flagTransaction(c); addReply(c, shared.noreplicaserr); return REDIS_OK; } /* Don't accept write commands if this is a read only slave. But * accept write commands if this is our master. */ // 如果这个服务器是一个只读 slave 的话,那么拒绝执行写命令 if (server.masterhost && server.repl_slave_ro && !(c->flags & REDIS_MASTER) && c->cmd->flags & REDIS_CMD_WRITE) { addReply(c, shared.roslaveerr); return REDIS_OK; } /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ // 在订阅于发布模式的上下文中,只能执行订阅和退订相关的命令 if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0) && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context"); return REDIS_OK; } /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and * we are a slave with a broken link with master. */ if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED && server.repl_serve_stale_data =http://www.mamicode.com/= 0 &&>8)命令回复器networking.c/sendReplyToClient函数:
/* * 负责传送命令回复的写处理器 */ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = privdata; int nwritten = 0, totwritten = 0, objlen; size_t objmem; robj *o; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); // 一直循环,直到回复缓冲区为空 // 或者指定条件满足为止 while(c->bufpos > 0 || listLength(c->reply)) { if (c->bufpos > 0) { // c->bufpos > 0 // 写入内容到套接字 // c->sentlen 是用来处理 short write 的 // 当出现 short write ,导致写入未能一次完成时, // c->buf+c->sentlen 就会偏移到正确(未写入)内容的位置上。 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); // 出错则跳出 if (nwritten <= 0) break; // 成功写入则更新写入计数器变量 c->sentlen += nwritten; totwritten += nwritten; /* If the buffer was sent, set bufpos to zero to continue with * the remainder of the reply. */ // 如果缓冲区中的内容已经全部写入完毕 // 那么清空客户端的两个计数器变量 if (c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; } } else { // listLength(c->reply) != 0 // 取出位于链表最前面的对象 o = listNodeValue(listFirst(c->reply)); objlen = sdslen(o->ptr); objmem = getStringObjectSdsUsedMemory(o); // 略过空对象 if (objlen == 0) { listDelNode(c->reply,listFirst(c->reply)); c->reply_bytes -= objmem; continue; } // 写入内容到套接字 // c->sentlen 是用来处理 short write 的 // 当出现 short write ,导致写入未能一次完成时, // c->buf+c->sentlen 就会偏移到正确(未写入)内容的位置上。 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen); // 写入出错则跳出 if (nwritten <= 0) break; // 成功写入则更新写入计数器变量 c->sentlen += nwritten; totwritten += nwritten; /* If we fully sent the object on head go to the next one */ // 如果缓冲区内容全部写入完毕,那么删除已写入完毕的节点 if (c->sentlen == objlen) { listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; c->reply_bytes -= objmem; } } /* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT * bytes, in a single threaded server it's a good idea to serve * other clients as well, even if a very large request comes from * super fast link that is always able to accept data (in real world * scenario think about 'KEYS *' against the loopback interface). * * 为了避免一个非常大的回复独占服务器, * 当写入的总数量大于 REDIS_MAX_WRITE_PER_EVENT , * 临时中断写入,将处理时间让给其他客户端, * 剩余的内容等下次写入就绪再继续写入 * * However if we are over the maxmemory limit we ignore that and * just deliver as much data as it is possible to deliver. * * 不过,如果服务器的内存占用已经超过了限制, * 那么为了将回复缓冲区中的内容尽快写入给客户端, * 然后释放回复缓冲区的空间来回收内存, * 这时即使写入量超过了 REDIS_MAX_WRITE_PER_EVENT , * 程序也继续进行写入 */ if (totwritten > REDIS_MAX_WRITE_PER_EVENT && (server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory)) break; } // 写入出错检查 if (nwritten == -1) { if (errno == EAGAIN) { nwritten = 0; } else { redisLog(REDIS_VERBOSE, "Error writing to client: %s", strerror(errno)); freeClient(c); return; } } if (totwritten > 0) { /* For clients representing masters we don't count sending data * as an interaction, since we always send REPLCONF ACK commands * that take some time to just fill the socket output buffer. * We just rely on data / pings received for timeout detection. */ if (!(c->flags & REDIS_MASTER)) c->lastinteraction = server.unixtime; } if (c->bufpos == 0 && listLength(c->reply) == 0) { c->sentlen = 0; // 删除 write handler aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ // 如果指定了写入之后关闭客户端 FLAG ,那么关闭客户端 if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c); } }
redis源码学习(客户端)
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。