首页 > 代码库 > ZooKeeper场景实践:(6)集群监控和Master选举

ZooKeeper场景实践:(6)集群监控和Master选举

1. 集群机器监控

这通常用于那种对集群中机器状态,机器在线率有较高要求的场景,能够快速对集群中机器变化作出响应。这样的场景中,往往有一个监控系统,实时检测集群机器是否存活。

利用ZooKeeper有两个特性(读可监控,临时节点),就可以实现一种集群机器存活性监控系统:

1. 客户端在节点 x 上注册一个Watcher,那么如果x的子节点变化了,会通知该客户端
2. 创建EPHEMERAL类型的节点,一旦客户端和服务器的会话结束或过期,那么该节点就会消失

利用这两个特性,可以分别实现对客服端的状态变化、上下线进行监控。

例如,监控系统在 /Monitor 节点上注册一个Watcher,以后每动态加机器,那么就往 /Monitor 下创建一个 EPHEMERAL类型的节点:/Monitor/{hostname}. 这样,监控系统就能够实时知道机器的增减情况,至于后续处理就是监控系统的业务了。

2. Master选举

在分布式环境中,有些业务逻辑只需要集群中的某一台机器进行执行,其他的机器可以共享这个结果,这样可以大大减少重复计算,提高性能,于是就需要进行master选举。

利用ZooKeeper的强一致性,能够保证在分布式高并发情况下节点创建的全局唯一性,即:同时有多个客户端请求创建 /currentMaster 节点,最终一定只有一个客户端请求能够创建成功。利用这个特性,就能很轻易的在分布式环境中进行集群选举了。

此外,也可以利用Zookeeper的EPHEMERAL_SEQUENTIAL节点,实现动态选举:每个客户端都在/Master/下创建一个EPHEMERAL_SEQUENTIAL节点,由于ZooKeeper保证SEQUENTIAL的有序性,因此我们可以简单的把节点号最小的作为Master,就完成了选主。

3. 场景分析

假设我们要监控集群中的一群活动的业务进程,同时会在这群进程中选取一个进程作为监控的Master进程。每个进程使用IP地址加进程号标识,即{ip:pid}.当新的业务进程上线时,该进程会到/Monitor下创建一个临时有序(EPHEMERAL_SEQUENTIAL)的节点.并获取/Monitor下的子节点列表,如果发现自己创建的节点最小,则提升自己为Master进程,否则仍是业务进程。当进程退出时该节点会自动删除,其他进程则会尝试选主,保证当Master进程退出后,会提升一个新的Master进程。

举个例子,假设集群中一开始没有进程,

  1. 进程A1被创建,在/Monitor创建/Monitor/proc-1路径,由于/Monitor下只有一个路径,A1被提升为Master进程。
  2. 进程A2被创建,在/Monitor创建/Monitor/proc-2路径,选主不成功,作为Slave进程;同时A1监控/Monitor的子节点变化事件,会收到有新进程被创建 ,因此执行show_list。
  3. 进程A2被创建,在/Monitor创建/Monitor/proc-3路径,选主不成功,作为Slave进程;同时A1监控/Monitor的子节点变化事件,会收到有新进程被创建 ,因此执行show_list。
  4. 进程A1被Killed掉,其他进程监控到/Monitor的子节点变化事件,尝试选主,只有A2序号成功,因此A2选主成功,A3作为Slave进程。
  5. 进程A4被创建,在/Monitor创建/Monitor/proc-4路径,选主不成功,作为Slave进程;同时A2监控/Monitor的子节点变化事件,会收到有新进程被创建 ,因此执行show_list。

执行情况如下表所示:

A1A2A3A4
create,show_list(M)   
show_list(M)create  
show_list(M)-create 
killedshow_list(M)- 
-show_list(M)-create

4. 动手实践

首先是获取本机的IP已经当前进程的进程号PID,并通过ip_pid返回。

void getlocalhost(char *ip_pid,int len)
{
    char hostname[64] = {0};
    struct hostent *hent ;

    gethostname(hostname,sizeof(hostname));
    hent = gethostbyname(hostname);

    char * localhost = inet_ntoa(*((struct in_addr*)(hent->h_addr_list[0])));

    snprintf(ip_pid,len,"%s:%lld",localhost,getpid());
}

选主函数,获取path下的所有子节点,选择序号最小的一个,取出它的ip_pid,如果和本进程相同,则本进程被选为Master。如果当前进程被选为Master,则进程中的全局变量g_mode会被赋值为MODE_MONITOR,否则不变。

void choose_mater(zhandle_t *zkhandle,const char *path)
{
    struct String_vector procs;
    int i = 0;
    int ret = zoo_get_children(zkhandle,path,1,&procs);

    if(ret != ZOK || procs.count == 0){
        fprintf(stderr,"failed to get the children of path %s!\n",path);
    }else{
        char master_path[512] ={0};
        char ip_pid[64] = {0};
        int ip_pid_len = sizeof(ip_pid);

        char master[512]={0};
        char localhost[512]={0};

        getlocalhost(localhost,sizeof(localhost));

        strcpy(master,procs.data[0]);
        for(i = 1; i < procs.count; ++i){
            if(strcmp(master,procs.data[i])>0){
                strcpy(master,procs.data[i]);
            }
        }

        sprintf(master_path,"%s/%s",path,master);

        ret = zoo_get(zkhandle,master_path,0,ip_pid,&ip_pid_len,NULL);
        if(ret != ZOK){
            fprintf(stderr,"failed to get the data of path %s!\n",master_path);
        }else if(strcmp(ip_pid,localhost)==0){
            g_mode = MODE_MONITOR;
        }

    }

    for(i = 0; i < procs.count; ++i){
        free(procs.data[i]);
        procs.data[i] = NULL;
    }

}

show_list为Master进程函数,所做的任务为打印path目录下所有子节点的ip_pid.


void show_list(zhandle_t *zkhandle,const char *path)
{

    struct String_vector procs;
    int i = 0;
    char localhost[512]={0};

    getlocalhost(localhost,sizeof(localhost));

    int ret = zoo_get_children(zkhandle,path,1,&procs);

    if(ret != ZOK){
        fprintf(stderr,"failed to get the children of path %s!\n",path);
    }else{
        char child_path[512] ={0};
        char ip_pid[64] = {0};
        int ip_pid_len = sizeof(ip_pid);
        printf("--------------\n");
        printf("ip\tpid\n");
        for(i = 0; i < procs.count; ++i){
            sprintf(child_path,"%s/%s",path,procs.data[i]);
            //printf("%s\n",child_path);
            ret = zoo_get(zkhandle,child_path,0,ip_pid,&ip_pid_len,NULL);
            if(ret != ZOK){
                fprintf(stderr,"failed to get the data of path %s!\n",child_path);
            }else if(strcmp(ip_pid,localhost)==0){
                printf("%s(Master)\n",ip_pid);
            }else{
                printf("%s\n",ip_pid);
            }
        }
    }

    for(i = 0; i < procs.count; ++i){
        free(procs.data[i]);
        procs.data[i] = NULL;
    }
}

监控函数如下,当发现path的子节点发生变化,就会尝试重新选主,如果当前进程被选为主,就立即执行show_list,打印path下的所有子节点对应的ip_pid.

void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)  
{  
/*  
    printf("watcher event\n");  
    printf("type: %d\n", type);  
    printf("state: %d\n", state);  
    printf("path: %s\n", path);  
    printf("watcherCtx: %s\n", (char *)watcherCtx);  
*/  

    if(type == ZOO_CHILD_EVENT &&
       state == ZOO_CONNECTED_STATE ){

        choose_mater(zh,path);
        if(g_mode == MODE_MONITOR){
            show_list(zh,path);
        }
    }
}

完整代码如下:
1.monitor.c

#include<stdio.h>  
#include<string.h>  
#include<unistd.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#include"zookeeper.h"  
#include"zookeeper_log.h"  

enum WORK_MODE{MODE_MONITOR,MODE_WORKER} g_mode;
char g_host[512]= "172.17.0.36:2181";  

//watch function when child list changed
void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx);
//show all process ip:pid
void show_list(zhandle_t *zkhandle,const char *path);
//if success,the g_mode will become MODE_MONITOR
void choose_mater(zhandle_t *zkhandle,const char *path);
//get localhost ip:pid
void getlocalhost(char *ip_pid,int len);

void print_usage();
void get_option(int argc,const char* argv[]);

/**********unitl*********************/  
void print_usage()
{
    printf("Usage : [monitor] [-h] [-m] [-s ip:port] \n");
    printf("        -h Show help\n");
    printf("        -m set monitor mode\n");
    printf("        -s zookeeper server ip:port\n");
    printf("For example:\n");
    printf("monitor -m -s172.17.0.36:2181 \n");
}

void get_option(int argc,const char* argv[])
{
    extern char    *optarg;
    int            optch;
    int            dem = 1;
    const char    optstring[] = "hms:";

    //default    
    g_mode = MODE_WORKER;

    while((optch = getopt(argc , (char * const *)argv , optstring)) != -1 )
    {
        switch( optch )
        {
        case ‘h‘:
            print_usage();
            exit(-1);
        case ‘?‘:
            print_usage();
            printf("unknown parameter: %c\n", optopt);
            exit(-1);
        case ‘:‘:
            print_usage();
            printf("need parameter: %c\n", optopt);
            exit(-1);
        case ‘m‘:
                g_mode = MODE_MONITOR;
            break;
        case ‘s‘:
            strncpy(g_host,optarg,sizeof(g_host));
            break;
        default:
            break;
        }
    }
} 
void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)  
{  
/*  
    printf("watcher event\n");  
    printf("type: %d\n", type);  
    printf("state: %d\n", state);  
    printf("path: %s\n", path);  
    printf("watcherCtx: %s\n", (char *)watcherCtx);  
*/  

    if(type == ZOO_CHILD_EVENT &&
       state == ZOO_CONNECTED_STATE ){

        choose_mater(zh,path);
        if(g_mode == MODE_MONITOR){
            show_list(zh,path);
        }
    }
}  
void getlocalhost(char *ip_pid,int len)
{
    char hostname[64] = {0};
    struct hostent *hent ;

    gethostname(hostname,sizeof(hostname));
    hent = gethostbyname(hostname);

    char * localhost = inet_ntoa(*((struct in_addr*)(hent->h_addr_list[0])));

    snprintf(ip_pid,len,"%s:%lld",localhost,getpid());
}

void choose_mater(zhandle_t *zkhandle,const char *path)
{
    struct String_vector procs;
    int i = 0;
    int ret = zoo_get_children(zkhandle,path,1,&procs);

    if(ret != ZOK || procs.count == 0){
        fprintf(stderr,"failed to get the children of path %s!\n",path);
    }else{
        char master_path[512] ={0};
        char ip_pid[64] = {0};
        int ip_pid_len = sizeof(ip_pid);

        char master[512]={0};
        char localhost[512]={0};

        getlocalhost(localhost,sizeof(localhost));

        strcpy(master,procs.data[0]);
        for(i = 1; i < procs.count; ++i){
            if(strcmp(master,procs.data[i])>0){
                strcpy(master,procs.data[i]);
            }
        }

        sprintf(master_path,"%s/%s",path,master);

        ret = zoo_get(zkhandle,master_path,0,ip_pid,&ip_pid_len,NULL);
        if(ret != ZOK){
            fprintf(stderr,"failed to get the data of path %s!\n",master_path);
        }else if(strcmp(ip_pid,localhost)==0){
            g_mode = MODE_MONITOR;
        }

    }

    for(i = 0; i < procs.count; ++i){
        free(procs.data[i]);
        procs.data[i] = NULL;
    }

}
void show_list(zhandle_t *zkhandle,const char *path)
{

    struct String_vector procs;
    int i = 0;
    char localhost[512]={0};

    getlocalhost(localhost,sizeof(localhost));

    int ret = zoo_get_children(zkhandle,path,1,&procs);

    if(ret != ZOK){
        fprintf(stderr,"failed to get the children of path %s!\n",path);
    }else{
        char child_path[512] ={0};
        char ip_pid[64] = {0};
        int ip_pid_len = sizeof(ip_pid);
        printf("--------------\n");
        printf("ip\tpid\n");
        for(i = 0; i < procs.count; ++i){
            sprintf(child_path,"%s/%s",path,procs.data[i]);
            //printf("%s\n",child_path);
            ret = zoo_get(zkhandle,child_path,0,ip_pid,&ip_pid_len,NULL);
            if(ret != ZOK){
                fprintf(stderr,"failed to get the data of path %s!\n",child_path);
            }else if(strcmp(ip_pid,localhost)==0){
                printf("%s(Master)\n",ip_pid);
            }else{
                printf("%s\n",ip_pid);
            }
        }
    }

    for(i = 0; i < procs.count; ++i){
        free(procs.data[i]);
        procs.data[i] = NULL;
    }
}

int main(int argc, const char *argv[])  
{  
    int timeout = 30000;  
    char path_buffer[512];  
    int bufferlen=sizeof(path_buffer);  

    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); //设置日志级别,避免出现一些其他信息  

    get_option(argc,argv);

    zhandle_t* zkhandle = zookeeper_init(g_host,zktest_watcher_g, timeout, 0, (char *)"Monitor Test", 0);  

    if (zkhandle ==NULL)  
    {  
        fprintf(stderr, "Error when connecting to zookeeper servers...\n");  
        exit(EXIT_FAILURE);  
    }  

    char path[512]="/Monitor";

    int ret = zoo_exists(zkhandle,path,0,NULL); 
    if(ret != ZOK){
        ret = zoo_create(zkhandle,path,"1.0",strlen("1.0"),  
                          &ZOO_OPEN_ACL_UNSAFE,0,  
                          path_buffer,bufferlen);  
        if(ret != ZOK){
            fprintf(stderr,"failed to create the path %s!\n",path);
        }else{
            printf("create path %s successfully!\n",path);
        }
    }

    if(ret == ZOK && g_mode == MODE_WORKER){

        char localhost[512]={0};
        getlocalhost(localhost,sizeof(localhost));

        char child_path[512];
        sprintf(child_path,"%s/proc-",path);
        ret = zoo_create(zkhandle,child_path,localhost,strlen(localhost),  
                          &ZOO_OPEN_ACL_UNSAFE,ZOO_SEQUENCE|ZOO_EPHEMERAL,  
                          path_buffer,bufferlen);  
        if(ret != ZOK){
            fprintf(stderr,"failed to create the child_path %s,buffer:%s!\n",child_path,path_buffer);
        }else{
            printf("create child path %s successfully!\n",path_buffer);
        }
        choose_mater(zkhandle,path);

    }

    if(g_mode == MODE_MONITOR){
        show_list(zkhandle,path);
    }

    getchar();

    zookeeper_close(zkhandle); 

    return 0;
}

2.Makefile

CC=gcc
CFLAGS=-g 
ZOOKEEPER_INSTALL=/usr/local
ZOOKEEPER_INC=-I${ZOOKEEPER_INSTALL}/include/zookeeper
ZOOKEEPER_LIB= -L${ZOOKEEPER_INSTALL}/lib -lzookeeper_mt

APP=monitor
all:
    ${CC} monitor.c -DTHREAD ${CFLAGS} ${ZOOKEEPER_INC} ${ZOOKEEPER_LIB} -o ${APP} 
clean:
    rm -f ${APP}

可以单机上重复启动程序,它们的进程号都是不同的,也可以在集群中启动程序。
参数-s表示Zookeeper的服务器的ip和端口,(注意不要理解成master的ip和端口哦)
参数-m表示该进程是一个独立的监控进程,注意,指定这个参数的进程是不参加选主的,因为它不会在/Monitor目录下创建路径。
运行示例:
monitor -s172.17.0.36:2181


ZooKeeper场景实践:(6)集群监控和Master选举