首页 > 代码库 > 个人第一个开源分布式项目distributeTemplate的实现二 分布式配置以及上下文的维护同步

个人第一个开源分布式项目distributeTemplate的实现二 分布式配置以及上下文的维护同步

 我们实现分布式 也是按照 这样一步一步来的,首先是公用的资源对象,第一个必须的公用资源 对象是配置,这配置交给用户外部填写 手动配置 并能维持同步 更新 所以需要一个配置对象 维护在内存里面,需要事件驱动监听配置文件的变化情况

ok下面来 看看代码是怎么做的 ,首先 配置 有多重配置方式 ini conf prop xml 各种方式本质上 我们分布式就是要各台主机 自己所在的节点 都能知道我在网络上情况,或者所可以找到像zookeeper 只要知道或者找到了 才能进行以后的通讯同步

  我们为了能够以后支持多重配置 ,所以先把配置定义为一个接口

public interface IConfig {
    public static final String DEFAULTFILE="distribute.conf";
/**
 * 
 * 从文件读取配置初始化
 * @param file
 * 添加(修改)人:zhuyuping
 */
public void readConfigFormFile(String file);
/**
 * 从用户提供的json字符串中初始化
 * @param json
 * 添加(修改)人:zhuyuping
 */
public void readConfigFormString(String json);
/**
 * 获得系统内存中维护的内存表
 * 
 * 添加(修改)人:zhuyuping
 */
public Table<InetSocketAddress, String, Object> getConfig();
/**
 * 摄入context上下文
 * @param context
 * 添加(修改)人:zhuyuping
 */
public void setContext(Context context);
/**
 * 获得上下文context
 * @return
 * 添加(修改)人:zhuyuping
 */
public Context getContext();
public BiMap<String, InetSocketAddress> getAlias();
}

主要 是3个方法 ,一个是读配置文件到内存对象中,拧一个就是内存对象写到文件中,对于分布式还需要一个同步 

有时候 我们可能需要历史版本 能够容错还原 所以可能需要一个版本号 记录当前版本,然后保持有可能多个节点上配置文件更改后,发送请求时候能够保持顺序的更新,这后面我会处理 加入一个分布式先进先出的队列的,这里暂时未加 后面 加了会补上的,还有上下文context 对象,有人会说 你为什么需要上下文对象去保存变量啊,好多框架 都有上下文这个对象,也许是拦截器模式 也许是门面模式 等等其他模式,但是 这里我都不是,其实上下文只是为了本地节点的各个功能代码段之间的一个桥梁,有句话叫做 两情若是久长时 又岂在朝朝暮暮 我们有鹊桥,他就是上下文最重要的本质 就是维护本地节点上下文贯穿 如果上下文 里面保存着门面 那么他就有门面的功能 便于用户随时获取门面对象 进行操作,ok我们来看看 context是怎么定义的

/**
 * 
 *      
 *     
 * @author zhuyuping       
 * @version 1.0     
 * @created 2014-7-9 
 * @function:上下文接口  他只是存储用户上面类所有过程中的变量不是config配置而且分布式中不会同步的只会在单点上有效 切记  、、后期如果想支持xml 配置或者其他配置 可以添加策略模式 
 */
public interface Context {

	public final static String defaultConfig="distribute.conf";//默认配置名
	
	public void putValue(String key,Object value);
	
	public Object getValue(String key);
	
	public void setCurrHost(String host,int port);
	
	public InetSocketAddress getCurrHost();
	/**
	 * 获得默认配置文件
	 * @return
	 * 添加(修改)人:zhuyuping
	 */
	public String getDefaultFc();
//	/**
//	 * 设置默认属性文件的名称
//	 * @param pfile
//	 * 添加(修改)人:zhuyuping
//	 */
//	public void setDefaultFc(String pfile);
//	/**
//	 * 注入template 门面 便于后面直接通过上下文来使用  如果要整合spring ApplicationContextAware
//	 * @param distributedTemplate
//	 * 添加(修改)人:zhuyuping
//	 */
//	public void setTemplate(DistributedOperations distributedTemplate);
//	
//	public DistributedOperations getTemplate();
	
	
}

这里其实 就是一个map 保存属性key value 而常用的就取出作为一个方法了

这个context 因为后面我们给用户一个继承的 这样便于 用户实现自己的上下文 或交给其他框架上下文 以及整合所以我们实现了一个抽象的 默认实现

/**
 * 
 *      
 *     
 * @author zhuyuping       
 * @version 1.0     
 * @created 2014-7-9 下午5:58:37 
 * @function:抽象的上下文 主要是 管理context的资源 还有就是提供用户自定义 整合spring使用该类 //这里后期需整合策略 实现
 */
public abstract class AbstractContext implements Context{
    //?也可以使用LocalThread 也可以
	private Map<String,Object> context=Maps.newConcurrentMap();
	
	private InetSocketAddress currHost;//当前主机 比如192.168.0.1 8888
	
	private String dfConfig;//默认读取的配置文件 当用户 提供就修改 没有提供就默认 
	
    
	public AbstractContext(String dfConfig) {
		super();
		
		this.dfConfig = dfConfig;
		//当前classes 下的文件
		//currentPort
		this.currHost=new InetSocketAddress(ConfigFactory.load(dfConfig).getString("client.currentHost"),ConfigFactory.load(dfConfig).getInt("client.currentPort"));
	}

	
	
	@Override
	public InetSocketAddress getCurrHost() {
		
		return currHost;
	}

    

	@Override
	public void setCurrHost(String host,int port) {
		
		this.currHost=new InetSocketAddress(host, port);
		
	}



	@Override
	public String getDefaultFc() {
		
		return dfConfig!=null?dfConfig:defaultConfig;
	}



	public AbstractContext() {
		super();
		this.dfConfig=defaultConfig;

		this.currHost=new InetSocketAddress(ConfigFactory.load(defaultConfig).getString("client.currentHost"),ConfigFactory.load(defaultConfig).getInt("client.currentPort"));
	    
	}



	@Override
	public void putValue(String key, Object value) {
		
		context.put(key, value);
		
	}

	@Override
	public Object getValue(String key) {
		return context.get(key);
	}
	
	
	
}

 ok 很简单的 维护者

然后回到刚才的配置 ,首先 我们配置文件 需要从文件读取 到配置对象中 ,这是为了用户更改 或者初始化时候 吧配置文件初始化到配置内存对象中 然后这个内存对象将会在同步 配置文件 更改 网络通讯时候用到,在对于全局的所有节点的沦陷时候 单纯的context只维护本节点桥梁信息的 已经不够用了 因为他不会同步的,这就是为什么需要他的原因,我这里采用的配置文件时conf 也就是configLoad方式,后面我会逐步添加更多的支持方式 无非是xml 等读取问题,这并不重要 思想才是重要的

/**
 * 
 *      
 *     
 * @author zhuyuping       
 * @version 1.0     
 * @created 2014-7-9 下午4:07:00  如果后期需要支持xml 其他 只需要使用策略模式 
 * @function:基类config 主要维持 配置基本信息 以及配置信息的同步 备份 同时内存中维持一张内存表table
 */
public abstract class AbstractConfig implements IConfig{
    /**
     * 当前的配置表 行 为主机别名 其中一定有一列为版本号 AotmicLong 以及配置的相关字段  值为相关的对象  
     */
protected Table<InetSocketAddress, String, Object> config=HashBasedTable.create();//table信息  table 信息 这里无需要用到 哪个ConcurrentHashMap<K, V> 因为这个只会加载读取 加载 
//不会修改,因为这个table当用户真的更新config后 会同步并同时刷到更新到文件中的 ,而且每次用户提供查询的配置
//是不会更新到文件里面的 
protected AtomicLong version=new AtomicLong(0);//初始化版本为0;//判断当前的版本 是佛在config 里面是否存在
protected BiMap<String,InetSocketAddress> alias=HashBiMap.create();
protected Context context;
     
    public BiMap<String, InetSocketAddress> getAlias() {
return alias;
}
/**
     * context 需要提供当前主机 以及 
     * @param context
     */
public AbstractConfig(Context context) {
super();
this.context = context;
wrapConfig(ConfigFactory.load(context.getDefaultFc()));
}
@Override
public void setContext(Context context) {
this.context=context;
}
@Override
public Context getContext() {
return context;
}
@Override
public void readConfigFormFile(String file) {
Config config=TypeSafeConfigLoadUtils.loadFromFile(new File(file));
wrapConfig(config);
}
@Override
public void readConfigFormString(String json) {
Config config=TypeSafeConfigLoadUtils.loadFromString(json);
wrapConfig(config);
}
/**
 * 对config进行初始化 table
 * @param config
 * 添加(修改)人:zhuyuping
 */
protected abstract void wrapConfig(Config config);
/**
 * 把table 从内存中读取从新写入到配置文件中 
 * @param config
 * 添加(修改)人:zhuyuping
 */
protected abstract String wrapTable(Table<String, String, Object> config);
/**
 * 只保留最近的5个版本 可以回滚 更新最新的
 * 
 * 添加(修改)人:zhuyuping
 */
public void updateVersion(Long version){
 
}
/**
 * 版本数更新
 *  更新完后 需要 
 * 添加(修改)人:zhuyuping
 */
public void addVersion(){
Long v=version.getAndIncrement();
//TODO 需要通知所有节点 我要修改版本了 如果同时有几个人也这样 那么接受该节点下次更新的版本号,
    //在回调函数中 更新配置 随后同步 只保留5个版本 
}
    @Override
public Table<InetSocketAddress, String, Object> getConfig() {
return config;
}
/**
     * 
     * 提交对文件配置做出的修改
     * 添加(修改)人:zhuyuping
     */
    protected abstract void commit();
/**
 * 
 * 提交对配置的修改 如果一个人在一个节点上 更改了配置 需要核对版本 并从新更新本地的配置文件
 * 添加(修改)人:zhuyuping
 */
protected abstract void sync();
    
}

这里 我为了好维护 就直接使用guava的table ,其实你可以用List< map> 实现,这里 重要是获取所有主机列表的方法

然后就是配置文件读取后 写入context 对象 当然 上面说的读取配置到内存对象 ,内存对象写入到配置文件是基础

然后看看怎么写入的 我为了以后支持xml 所以这读取方式 写入方式 交给后面的子类实现类去实现 这样实现xml只要实现这2个方法即可

/**
 * 
 *      
 *     
 * @author zhuyuping       
 * @version 1.0     
 * @created 2014-7-9 下午10:38:05 
 * @function:默认的配置  允许用户实现自定义的配置规则 只需要继承 AbstractConfig
 */
public class SimpleDistributeConfig extends AbstractConfig{
	public SimpleDistributeConfig(Context context) {
		super(context);
		
	}

	@Override
	protected void wrapConfig(Config configObj) {
		//得到所有的节点
		List<? extends ConfigObject> nodes=configObj.getObjectList("server.hosts");
		int i=0;
		for (ConfigObject node : nodes) {
	
		   i++;
		   //如果后期添加其他mysql 等支持 这里就需要添加判断
		  
		   //Integer localport=node.containsKey("localPort")?Integer.parseInt(node.get("localPort").render()):LOCALPORT;//Integer.parseInt(node.get("localPort").render());
		   Integer remotePort=Integer.parseInt(node.get("remotePort").render());
		   String remoteIp=node.get("remoteHost").unwrapped().toString();//远程主机的ip
		   //开始初始化配置table
		   String name=node.containsKey("name")?node.get("name").unwrapped().toString():remoteIp;//主机别名
		   InetSocketAddress remoteHost=new InetSocketAddress(remoteIp, remotePort);
		   super.alias.put(name, remoteHost);
		   super.config.put(remoteHost,"version", super.version.incrementAndGet());
		   super.config.put(remoteHost, "remoteHost", remoteHost);
		   //super.config.put(remoteHost, "localPort", localport);
		   super.config.put(remoteHost, "remotePort", remotePort);
		   super.config.put(remoteHost, "name", name);
		   
		 
		   if(node.containsKey("file")){
			   HashMap fcs=(HashMap) node.get("file").unwrapped();
			   String syncPath=fcs.get("syncPath").toString();//文件同步的路径
			  // System.out.println("SimpleDistributeConfig.wrapConfig() "+syncPath);
			   super.config.put(remoteHost, "file", syncPath);//以后配置多的时候 分装成一个bean存入
		   }
		   
		   
		}
		String chost=configObj.getString("client.currentHost");
		int port=configObj.getInt("client.currentPort");
		super.context.setCurrHost(chost, port);
		
		//config.root().containsKey(key)
		//config.getString(path);
	}
	
	
	@Override
	protected String wrapTable(Table<String, String, Object> table) {
		StringBuilder sb=new StringBuilder("server{");
		
		sb.append("\n\t").append("hosts=[");
		Set<String> rows=table.rowKeySet();
		int size=rows.size();
		int i=0;
		for (String row : rows) {
			i++;
			Map<String,Object> map=table.row(row);
			if(!map.containsKey("remoteHost")) continue;
			sb.append("\t {");
			Object remoteHost=map.get("remoteHost");	
			Object remotePort=map.get("remotePort");
			//Object localPort=map.get("localPort");
			Object name=map.get("name");
			sb.append("name=").append(name).append("\n\t");
			//sb.append("localPort=").append(localPort).append("\n\t");
			sb.append("remoteHost=").append(remoteHost).append("\n\t");
			sb.append("remotePort=").append(remotePort).append("\n\t");
		    if(map.containsKey("file")){
		    	sb.append("file{").append("\n\t").append("syncPath=").append(map.get("syncPath")).append("}");
		    }
		   
		    sb.append("\t }");
		    if(i!=size){
		    	sb.append(",");
		    }
		}
	   sb.append("]").append("\n\t").append("}");	
	  //继续 保存client
	  
	   sb.append("\n\t").append("client{").append("\n\t").append("currentHost=").append(context.getCurrHost().getHostString()).append("\n\t");
	   sb.append("\n\t").append("currentPort=").append(context.getCurrHost().getPort()).append("\n\t");
	   sb.append("}");
	   
	   return sb.toString();
		
	}

	@Override
	protected void commit() {
		
		
	}

	@Override
	protected void sync() {
		
		
	}

	public static void main(String[] args) {
	
		
		
	}

}

ok 基本的简单的 配置 以及 上下文 这些用来同步的 用来做桥梁的都已经做好了 下面就是怎么监听配置文件的更改,有人说怎么监听一个配置文件更改啊,其实 文件有个属性 最后更改的时间,只要监听这个就可以 初级版本没有加上更改,这个后面可以随时加上 而且为了以后好更改 只写了相关方式没有 加上

import java.io.File;

public interface FileListener {
	public void fileChanged(File file);
}

然后用定时器 timeer 或者线程池定时线程去轮训他 ,

然后比较时间 一个一个通知事件  (图片传反了)

这里 注意弱应用 ,其实 对于这些时间 我建议用LinkTransferQueue 他是FIFO 先进先出的无阻晒的队列 然后队列 加上若引用,保证时间的一些顺序,

然后 通知了接口 在接口里面我们在做相应的更改配置 同步配置 的相关操作,这一章基本思想就是

 同步 资源 配置 上下文为桥梁 事件驱动为引擎 

还有支出的是 timer excutors.newSchulerXXXX  还有很多方式实现轮训的方式  这种方式 也可以实现心跳线 

同时 告诉大家的事 Apache、 camel| 里面的事件驱动文件部分 核心代码就是上面 这一小部分 

代码 没有贴完整 ,但是代码已经托管到  http://git.oschina.net/zhuyuping/distributeTemplate