首页 > 代码库 > 利用zookeeper的发布/订阅模式实现配置动态变更

利用zookeeper的发布/订阅模式实现配置动态变更

??ZooKeeper的Watcher事件机制可以说分布式场景下的观察者模式的实现。基于这个watcher事件机制,配合注册到特定的ZNode节点,可以实现java应用的配置运行时的变更。在学习zookeeper之前,听同事说配置可以在运行时动态变更,觉得不可思议。研习了zookeeper之后,实现这个功能是很easy的。
??发布/订阅系统设计起来无非两种模式,推和拉。
1. 推模式,服务端负责把变更的数据推给订阅的客户端。Web即时通信里的Comet技术便可以实现这种功能。
2. 拉模式,也就是客户端定时轮询服务端。拉模式不仅有延迟,给服务端带来很大压力,而且十分低效。

??zookeeper采用的是推拉结合的模式
1. 客户端订阅znode节点
2. 被订阅的节点发生变化后,zookeeper服务端向客户端发生数据变更的watcher事件通知
3. 客户端接收到watcher通知后,主动从服务端拉取变更的数据

??阿里的配置变更中间件diamond,同样是基于推拉结合的模式来实现数据的动态变更。初学zookeeper,写了一个数据库配置动态变更的demo。zookeeper自带的Watcher注册后,数据变更一次便会自动取消注册。这个设计实在反人类,大多数的开发者的需求肯定是注册一次,服务终生。所以转向开源的ZkClient客户端
??首先本地需要启动一个zookeeper的服务端,并且有一个“/db”节点,我用这个节点存储数据库配置信息。
??客户端工程需要引入zookeeper和zkclient的依赖。

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.9</version>
</dependency>
<dependency>
    <groupId>com.github.adyliu</groupId>
    <artifactId>zkclient</artifactId>
    <version>2.1.1</version>
</dependency>

??客户端demo

import com.github.zkclient.IZkDataListener;
import com.github.zkclient.ZkClient;

import java.io.IOException;

/**
 * project  : zk
 * package  : PACKAGE_NAME
 * author   : lvsheng
 * date     : 2016/10/5 下午11:17
 */
public class ZkClientTest {

    public static void main(String[] args) {
        ZkClient zkClient = new ZkClient("127.0.0.1:2181");
        zkClient.subscribeDataChanges("/db", new IZkDataListener() {
            public void handleDataChange(String dataPath, byte[] data) throws Exception {
                System.out.println(new String(data));
            }

            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println(dataPath);
            }
        });

        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

??现在我们来测试这个功能,在控制台依次输入如下命令:

[zk: localhost:2181(CONNECTED) 48] set /db chat.jdbc.driver=com.mysql.jdbc.Driver|chat.jdbc.url=jdbc:mysql://192.168.146.120:3306/chat_test?useUnicode=true&amp;amp;characterEncoding=GBK|chat.jdbc.maxActive=5

[zk: localhost:2181(CONNECTED) 49] set /db chat.jdbc.driver=com.mysql.jdbc.Driver|chat.jdbc.url=jdbc:mysql://127.0.0.1:3306/chat_test?useUnicode=true&amp;amp;characterEncoding=GBK|chat.jdbc.maxActive=5          

??观察Java代码的输出

chat.jdbc.driver=com.mysql.jdbc.Driver|chat.jdbc.url=jdbc:mysql://192.168.146.120:3306/chat_test?useUnicode=true&amp;amp;characterEncoding=GBK|chat.jdbc.maxActive=5
chat.jdbc.driver=com.mysql.jdbc.Driver|chat.jdbc.url=jdbc:mysql://127.0.0.1:3306/chat_test?useUnicode=true&amp;amp;characterEncoding=GBK|chat.jdbc.maxActive=5

??数据库配置的ip被成功的动态修改。

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    利用zookeeper的发布/订阅模式实现配置动态变更