首页 > 代码库 > redis集群实现(一)集群架构与初始化

redis集群实现(一)集群架构与初始化

redis是一个高可用、高性能、高可扩展性的基于内存也支持持久化存储的kv存储数据库,redis相比较于之前的kv存储memcached而言,不但支持的value类型大大增加,并且还支持数据的持久化,弥补了memcached的不能持久化的缺点,但是在3.0之前的redis并不支持集群功能,这也是redis3.0之前不能被大量部署的一个原因,但是由于3.0以后的redis支持了集群功能,redis就开始大量的替代之前的memcached,今天我从源代码层次学习下redis是怎么实现集群功能的。

我看的源代码是redis-3.0源代码,可以在下边这个链接下载到。

http://download.redis.io/releases/redis-3.0.0-rc1.tar.gz

redis的集群并不是类似于HDFS之类的namenodedatanode之类的架构,而是采用改进的一致性哈希算法来对数据进行分片,平均的分配到每一个master节点上,每一个master节点都有相对应的slave节点来复制master节点的数据,以便master宕机的时候来选举成为master节点。大体的架构如下图所示:


采取一致性哈希算法,保证每一块数据映射在0-16384的区间之上,然后这个区间的一部分分给一个master来服务(当然不会分的这么简单)。每一个client访问的时候就会访问对应的master,可能有人想问client是怎么知道数据在哪一个master上的,其实client也不知道,client会访问一个master,然后master发现数据不在这个master节点上,那么master就会告诉client存放client想要的数据所在的master地址,然后client就会访问到正确的master了。

那么,redis集群是怎么搭建起来的呢,难道是几十上百台机器同时开机自动连接的吗?当然不是,当只有一台机器的时候,可以认为这是一个只有一台机器的集群,然后client登录master执行cluster meet <ip> <port>来把指定ip地址的机器加入到集群里边。这样,这个集群就拥有两台机器了。就这样,一台一台的添加,就实现了大规模的redis集群。

首先看看和集群有关的数据结构,这些都是集群实现的基础。

// 节点状态结构体
struct clusterNode {

    // 创建节点的时间
    mstime_t ctime; 

    // 节点ID,通过随机数生成,长度为40,每一个字符都是一个16进制字符
    char name[REDIS_CLUSTER_NAMELEN];

    // 节点状态标识位,比如标识节点是主节点还是从节点。
    int flags;  

    // 节点当前的配置纪元
    uint64_t configEpoch; 

    // 这个node存储的数据槽位图,REDIS_CLUSTER_SLOTS就是redis集群分成的块数目,相当于上边的16384,如果值是1代表这个数据槽的数据存储在当前节点,如果是0表示不在这个节点。
    unsigned char slots[REDIS_CLUSTER_SLOTS/8]; 

    // 这个node存储的数据槽的数目
    int numslots;  

    // 如果本节点是主节点,这个字段表示从节点的数目
    int numslaves; 

    // 指针数组,指向各个从节点
    struct clusterNode **slaves;

    // 如果这是一个从节点,那么指向主节点
    struct clusterNode *slaveof;                                                                                            

    // 最后一次发送 PING数据包的时间
    mstime_t ping_sent;      

    // 最后一次接收 PONG数据包的时间戳
    mstime_t pong_received; 

    // 最后一次被设置为 FAIL状态的时间
    mstime_t fail_time;     

    // 最后一次给某个从节点投票的时间
    mstime_t voted_time;    

    // 最后一次从这个节点接收到复制偏移量的时间
    mstime_t repl_offset_time; 

    // 这个节点的复制偏移量
    long long repl_offset;     

    // 节点的 IP地址
    char ip[REDIS_IP_STR_LEN];  

    // 节点的端口号
    int port;               

    // 保存连接相关的信息
    clusterLink *link; 

    // 一个链表,记录了所有其他节点对该节点的下线报告
    list *fail_reports; 
};

然后是记录集群之间的连接的结构体

// clusterLink 包含了与其他节点进行通讯所需的全部信息
typedef struct clusterLink {                                                                                                                                  

    // 连接的创建时间
    mstime_t ctime;        

    // TCP 套接字描述符
    int fd;          

    // 输出缓冲区,保存着等待发送给其他节点的消息(message)。
    sds sndbuf;     

    // 输入缓冲区,保存着从其他节点接收到的消息。
    sds rcvbuf;     

    // 与这个连接相关联的节点,如果没有的话就为 NULL
    struct clusterNode *node;

} clusterLink;

然后是记录集群状态的结构体,每一个节点都有一个这个结构体,用来表示当前集群的状态。

typedef struct clusterState {
                                                                                                                                                              
    // 指向当前节点的指针
    clusterNode *myself;

    // 集群当前的配置纪元,用于实现故障转移
    uint64_t currentEpoch;

    // 集群当前的状态:是在线还是下线
    int state;

    // 集群中至少处理着一个槽的节点的数量。
    int size;   

    // 集群节点名单(包括 myself节点)
    // 字典的键为节点的名字,字典的值为 clusterNode结构
    dict *nodes;   

    // 节点黑名单,用于 CLUSTER FORGET命令
    // 防止被 FORGET的命令重新被添加到集群里面
    // (不过现在似乎没有在使用的样子,已废弃?还是尚未实现?)
    dict *nodes_black_list;

    // 记录要从当前节点迁移到目标节点的槽,以及迁移的目标节点
    // migrating_slots_to[i] = NULL 表示槽 i未被迁移
    // migrating_slots_to[i] = clusterNode_A 表示槽 i要从本节点迁移至节点 A
    clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];

    // 记录要从源节点迁移到本节点的槽,以及进行迁移的源节点
    // importing_slots_from[i] = NULL 表示槽 i未进行导入
    // importing_slots_from[i] = clusterNode_A 表示正从节点 A中导入槽 i
    clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];

    // 负责处理各个槽的节点
    // 例如 slots[i] = clusterNode_A表示槽 i 由节点 A处理
    clusterNode *slots[REDIS_CLUSTER_SLOTS];

    // 跳跃表,表中以槽作为分值,键作为成员,对槽进行有序排序
    // 当需要对某些槽进行区间(range)操作时,这个跳跃表可以提供方便
    // 具体操作定义在 db.c里面
    zskiplist *slots_to_keys;

    // 以下这些域被用于进行故障转移选举

    // 上次执行选举或者下次执行选举的时间
    mstime_t failover_auth_time;

    // 节点获得的投票数量
    int failover_auth_count; 

    // 如果值为 1,表示本节点已经向其他节点发送了投票请求
    int failover_auth_sent; 

    int failover_auth_rank;

    uint64_t failover_auth_epoch; 

    /* 共用的手动故障转移状态 */

    // 手动故障转移执行的时间限制
    mstime_t mf_end;           

    /* 主服务器的手动故障转移状态 */
    clusterNode *mf_slave;     

    /* 从服务器的手动故障转移状态 */
    long long mf_master_offset;    // 指示手动故障转移是否可以开始的标志值
    // 值为非 0时表示各个主服务器可以开始投票
    int mf_can_start;          

    /* The followign fields are uesd by masters to take state on elections. */
    /* 以下这些域由主服务器使用,用于记录选举时的状态 */

    // 集群最后一次进行投票的纪元
    uint64_t lastVoteEpoch;   

    // 在进入下个事件循环之前要做的事情,以各个 flag来记录
    int todo_before_sleep;

    // 通过 cluster连接发送的消息数量
    long long stats_bus_messages_sent; 

    // 通过 cluster接收到的消息数量
    long long stats_bus_messages_received; 
} clusterState;

基本的结构体都介绍完了,我们来看看集群的代码实现吧。

首先在看redis单节点的初始化代码,这是集群的第一步,首先启动单节点服务。

redis源代码里,redis.c文件里的main函数是redis-server的开始,由于我们只关心集群的实现代码,一些和集群关系不大的我就忽略了。

int main(int argc, char **argv) {

………………..

//说明用户指定了参数,我们需要检查用户是不是指定了配置文件

if (argc >= 2) {

……………………

//读取配置文件

loadServerConfig(configfile,options);

}

然后跳入loadServerConfig函数来进行字符串配置的解析。在loadServerConfig函数里有以下代码,如果配置项有cluster-enabled,我们就设置server.cluster_enabled1,表示集群功能的开启。

else if (!strcasecmp(argv[0],"cluster-enabled") && argc ==2) {                                                                                     

             if ((server.cluster_enabled = yesnotoi(argv[1])) == -1) {

                 err = "argument must be ‘yes‘ or ‘no‘";goto loaderr;

             }

接着在main函数里边,读取玩配置文件,执行initServer()函数,在initServer函数里边,

// 如果服务器以 cluster模式打开,那么初始化 cluster

if (server.cluster_enabled) clusterInit();

接着我们进入clusterInit函数,看看单机集群设置的初始化代码

// 初始化集群
void clusterInit(void) {
    int saveconf = 0; 

    // 初始化配置,server.cluster就是clusterState结构体,每一个节点保存一个。
    server.cluster = zmalloc(sizeof(clusterState));
    //指向自己的节点指针
    server.cluster->myself = NULL;
    //初始配置纪元为0
    server.cluster->currentEpoch = 0; 
    //初始配置状态fail
    server.cluster->state = REDIS_CLUSTER_FAIL;
    //集群数目为1
    server.cluster->size = 1; 
    server.cluster->todo_before_sleep = 0; 
    //建立节点映射的哈希结构体
    server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
    //节点的黑名单。。
    server.cluster->nodes_black_list =
        dictCreate(&clusterNodesBlackListDictType,NULL);
    //执行选举相关的变量初始化
    server.cluster->failover_auth_time = 0; 
    server.cluster->failover_auth_count = 0; 
    server.cluster->failover_auth_rank = 0; 
    server.cluster->failover_auth_epoch = 0; 
    server.cluster->lastVoteEpoch = 0; 
    server.cluster->stats_bus_messages_sent = 0; 
    server.cluster->stats_bus_messages_received = 0; 
    //初始化槽
    memset(server.cluster->slots,0, sizeof(server.cluster->slots));
    //把槽数组清零
    clusterCloseAllSlots();

    /* 锁住集群配置文件,确保每个每个节点使用的是自己的配置文件 */
    if (clusterLockConfig(server.cluster_configfile) == REDIS_ERR)
        exit(1);

    /* 载入本节点的集群配置文件. */
    if (clusterLoadConfig(server.cluster_configfile) == REDIS_ERR) {
        /* 如果没有发现集群配置文件,就把自己加入到集群里. */
        myself = server.cluster->myself =
            createClusterNode(NULL,REDIS_NODE_MYSELF|REDIS_NODE_MASTER);
        redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
            myself->name);
        clusterAddNode(myself);
        saveconf = 1;
    }

    // 保存 nodes.conf文件
    if (saveconf) clusterSaveConfigOrDie(1);

    // 监听 TCP端口
    server.cfd_count = 0;

    if (server.port > (65535-REDIS_CLUSTER_PORT_INCR)) {
        redisLog(REDIS_WARNING, "Redis port number too high. "
                   "Cluster communication port is 10,000 port "
                   "numbers higher than your Redis port. "
                   "Your Redis port number must be "
                   "lower than 55535.");
        exit(1);
    }
    //监听本节点的端口号
    if (listenToPort(server.port+REDIS_CLUSTER_PORT_INCR,
        server.cfd,&server.cfd_count) == REDIS_ERR)
    {
        exit(1);
    } else {
        int j;

        for (j = 0; j < server.cfd_count; j++) {
            // 关联监听事件处理器
            if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
                clusterAcceptHandler,NULL) == AE_ERR)                                                                                                        
                    redisPanic("Unrecoverable error creating Redis Cluster "
                                "file event.");
        }
    }

    // slots -> keys 映射是一个有序集合,基础实现是跳跃链表,分值是槽号,返回的是key值
    server.cluster->slots_to_keys = zslCreate();
    resetManualFailover();
}
好了,至此,和集群相关的初始化就结束了,以后我再写一些添加删除节点以及故障恢复相关的文章,欢迎大家提问哦~~






redis集群实现(一)集群架构与初始化