首页 > 代码库 > 个人第一个开源分布式项目distributeTemplate的实现三 网络通讯netty传输大文件

个人第一个开源分布式项目distributeTemplate的实现三 网络通讯netty传输大文件

 今天 我将讲讲网络通讯,这里我初始版本 由于采用的事Netty框架  所以 这里讲网络Netty在我们这里是怎么使用的,下周开始添加rpc lucene内容了 实现之后的0.2 0.3版本,后面将会去掉netty依赖 采用原生的NIO2 (aio) 异步非阻塞方式 实现自己网络通讯,也就是说 这部分可能会实现一个简单的但是比netty精简高效的网络框架,后期做出来 可能会单独开一个分支开源出来,netty说白了 就是 事件驱动 以及 NIO 加一些协议 以及 异常 处理,废话不多说了。

我最近 有个想法 就是想添加一个分布式锁的功能  我最近正在试图 去实现 分布式锁 ,因为最近要做RPC 顺便去试图实现以下分布式锁 试试,当然 初级的排它锁 先实现掉 ,后面的全程队列 也就是rabbitmq那些消息中间件那样的话 也会去试图实现的 

        前2天 我加了 一个配置类,为了更好的实现用户对配置的管理 

      

public  class ConfigBuilder {

	private static ConfigBuilder configBuilder=new ConfigBuilder();
	//主机列表
	private List<InetSocketAddress> address=Lists.newArrayList();
	
	private InetSocketAddress currentHost=null;
	
	private List<ServiceConfig> services=Lists.newArrayList();
	//别名
	protected BiMap<String,InetSocketAddress> alias=HashBiMap.create();
	
	private String configFile;
	
    public ConfigBuilder() {
    	configFile=Context.defaultConfig;
    	Config config=ConfigFactory.load(Context.defaultConfig);
    	initConfig(config);
    }

	private void initConfig(Config config) {
		List<? extends ConfigObject> nodes=config.getObjectList("server.hosts");
    	for (ConfigObject node : nodes) {
    		  Integer remotePort=Integer.parseInt(node.get("remotePort").render());
   		      String remoteIp=node.get("remoteHost").unwrapped().toString();//远程主机的ip
   		      String name=node.containsKey("name")?node.get("name").unwrapped().toString():remoteIp;//主机别名
   		      InetSocketAddress host=new InetSocketAddress(remoteIp, remotePort);
   		      address.add(host);
    	      alias.put(name, host);
    	      //TODO 获取
    	      for(ServiceType serviceType : ServiceConfig.ServiceType.values()){
    	    	   String serviceName=serviceType.name().toLowerCase();
    	    	  if(node.containsKey(serviceName)){
    	    		  HashMap fcs=(HashMap) node.get(serviceName).unwrapped();
    	    		  ServiceConfig serviceConfig=new ServiceConfig();
    	    		  serviceConfig.setServiceType(serviceType);
    	    		  serviceConfig.setInfo(fcs);
    	    		  services.add(serviceConfig);
    	    	  }
    	      }
    	     
    	}
    	String chost=config.getString("client.currentHost");
		int port=config.getInt("client.currentPort");
		currentHost=new InetSocketAddress(chost, port);
	}
	
	public ConfigBuilder(String configFile) {
		this.configFile=configFile;
		Config config=ConfigFactory.load(configFile);
		initConfig(config);
	}
	public static ConfigBuilder getInstance(){
		if(configBuilder==null){
			return new ConfigBuilder();
		}else{
			return configBuilder;
		}
		
	}
	public static ConfigBuilder getInstance(String configFile){
		if(configBuilder==null){
			return new ConfigBuilder(configFile);
		}else{
			return configBuilder;
		}
		
	}
	/**
	 * 添加服务器主机
	 * @param host
	 * @param port
	 * @return
	 * 添加(修改)人:zhuyuping
	 */
	public ConfigBuilder addHost(String host,int port){
		InetSocketAddress h=new InetSocketAddress(host, port);
		address.add(h);
		alias.put(host, h);
		return this;
	}
	/**
	 * 添加主机并设置 主机别名
	 * @param host
	 * @param port
	 * @param nname
	 * @return
	 * 添加(修改)人:zhuyuping
	 */
    public ConfigBuilder addHost(String host,int port,String nname){
	      addHost(host, port);
	      alias.put(nname, new InetSocketAddress(host, port));
		return this;
	}
    /**
     * 移除主机
     * @param host
     * @param port
     * @return
     * 添加(修改)人:zhuyuping
     */
    public ConfigBuilder removeHost(String host,int port){
    	InetSocketAddress h=new InetSocketAddress(host, port);
		alias.inverse().remove(h);
		address.remove(h);
		return this;
	}
    /**
     * 设置当前自身的主机名 与端口 
     * @param host
     * @param port
     * @return
     * 添加(修改)人:zhuyuping
     */
    public ConfigBuilder setSelf(String host,int port){
		currentHost=new InetSocketAddress(host, port);
		return this;
	}
    /**
     * 写入到配置文件中
     * 
     * 添加(修改)人:zhuyuping
     */
    public void buildToFile(){
    	
    	String path=Thread.currentThread().getContextClassLoader().getResource("")+configFile;
    	try {
			Writer out = new BufferedWriter(new OutputStreamWriter(
					new FileOutputStream(path), "UTF8"));
			StringBuilder sb=new StringBuilder("server{");
			sb.append("\n\t").append("hosts=[");
			int i=0;
			int size=address.size();
			for (InetSocketAddress host : address) {
				sb.append("\t {");
				sb.append("name=").append(alias.inverse().get(host)).append("\n\t");
				sb.append("remoteHost=").append(host.getHostString()).append("\n\t");
				sb.append("remotePort=").append(host.getPort()).append("\n\t");
				 sb.append("\t }");
				    if(i!=size){
				    	sb.append(",");
				    }
			}
			sb.append("]").append("\n\t").append("}");	
			  //继续 保存client
			  
			   sb.append("\n\t").append("client{").append("\n\t").append("currentHost=").append(currentHost.getHostString()).append("\n\t");
			   sb.append("\n\t").append("currentPort=").append(currentHost.getPort()).append("\n\t");
			   sb.append("}");
			   out.write(sb.toString());
		} catch (Exception e) {
		
		}
    	
    	
    }
    
    public IConfig bulid(Context context){
    	
    	
    	return null;
    }
	
    
	
	@Override
	public String toString() {
		return "ConfigBuilder [address=" + address + ", currentHost="
				+ currentHost + ", services=" + services + ", alias=" + alias
				+ ", configFile=" + configFile + "]";
	}

	public static void main(String[] args) {
		ConfigBuilder build=ConfigBuilder.getInstance();
		build.addHost("192.168.8.8", 1234).addHost("192.168.9.9", 2222, "test");
		System.out.println(build);
	}
}

  这样 后面用户 就很容易的建立 配置 文件了 ,本周末 我会添加这样的方式到 distributeTemplate上 ,这样既可以用query对象 也可以传递sql 进行相应的增删改查与 同步。

   ok 我们看看 我们网络通讯怎么实现的 

  首先配置  上下文 都好,我们想想 网络通讯 是不是核心 贯穿 到处用到,既然这样 我们何不做成 service 服务呢

 所以我定义了一个服务

 
/**
 * 
 *      
 *     
 * @author zhuyuping       
 * @version 1.0     
 * @created 2014-7-9 下午12:56:04 
 * @function:把 一些 常用的网络通讯作为基础 通过其他客服端 驱动 来进行基本访问的 功能  封装成服务 交由 服务来解析 
 */
public interface Service {
    /**
     * 启动服务
     * 
     * 添加(修改)人:zhuyuping
     */
void start();
/**
 * 关闭服务 
 * 
 * 添加(修改)人:zhuyuping
 */
void stop(); 
/**
 * 服务的名称
 * @return
 * 添加(修改)人:zhuyuping
 */
String getName();
void setName(String name);
/**
 * 获得配置 
 * @return
 * 添加(修改)人:zhuyuping
 */
IConfig getConfig();
/**
 * 注入配置
 */
void setConfig(IConfig config);
    /**
     * 往主机中 新增文件
     * @param address
     * @param build
     * 添加(修改)人:zhuyuping
     */
void insert(List<InetSocketAddress> address,
ImmutableMap<String, Object> build);
    /**
     * 删除文件 或文件夹
     * @param address
     * @param build
     * 添加(修改)人:zhuyuping
     */
void delete(List<InetSocketAddress> address,
ImmutableMap<String, Object> build);
    /**
     * 同步文件或者文件夹 如果用户没有传递条件 那么默认为同步区域
     * @param address
     * @param build
     * 添加(修改)人:zhuyuping
     */
void sync(List<InetSocketAddress> address,
ImmutableMap<String, Object> build);
}

 然后 这个服务 交给 我们要用的时候 注入 进去 就可以,这样就是 我要网络 那么网络服务来了,我要女朋友 然后女朋友没来  我要 xxx服务 服务就能获取,而这个服务 只在初始框架时候 初始化进去,这样的情况,我们于是想到了一种模式

 想到了 吧 就是 

Flyweight 模式

就是这样,因为服务跟节点依赖了,所以 我就节点这样采用了,后面我还要想想要不要脱离

public IConfig getConfig() {
return config;
}
public void setConfig(IConfig config) {
this.config = config;
}
    
public NodeFactory(IConfig config) {
super();
this.config = config;
}
private IConfig config;
private static Map<String,Node> caches=Maps.newHashMap();
public  Node createNode(Class<? extends Node> clazz){
Node nd=null;
if(clazz.isAssignableFrom(FileSystem.class)){
nd=caches.get(FileSystem.NAME);
if(nd==null){
nd=new FileSystem(config);
caches.put(FileSystem.NAME, nd);
}
}//....
return nd;
}
}

 也就是说 用Map 做缓存 避免 创建过多对象 浪费内存,这里 这样的情况很多场合 都用到了,特别是连接池 

  下面进入正题我们看看怎么实现 通讯 ,  事件驱动 监听到时间后,在队列轮训 然后交给服务处理

 


  然后这部分交给服务 管理 服务 只提供访问 发送数据 而已

  所以服务里面就需要一些访问通讯 需要的 如果你采用Tcp BIO 就只需要简单的创建 socket 以及serverSocket就可以了 下面就用它 来发送 对象文件了 了,关于netty 怎么创建基本使用 可以参照 我的博文 spring 与 netty简单整合,哪里有一个问题 就是eventLoop没有分离开来 知道的朋友,记得分开来,我就懒得重更新了,

 我们来看服务里方法 怎么发送的

 

这里 检查是否连接上,然后发送 没连接 发送到死信队列里面,由于死信队列是弱引用 然后在交给虚幻引用,最后回到道虚幻引用的队列里面,最后持久化记录下来,这类似于mysql的日志方式,用户如果若引用能够遍历到就去若引用找,不能遍历到就去虚幻引用的队列里面持久化回收的找,备份还原 同步 都用到他了,这里还有个就是网络连接 为什么我不建立从新连接机制呢,其实netty建立重新连接很容易 只需要在连接 时候加个监听 重新连接就可以,但是现在还不需要,大家理解就好,

关于tcp udp 服务端客服端 见我代码 里面server client包文件

   

小文件 发送 我们自定义一个协议 编码 就好 ,

 * 
 *      
 *     
 * @author zhuyuping       
 * @version 1.0     
 * @created 2014-7-15 下午7:00:49 
 * @function:将文件转为字节码传输
 */
public class FileEncoder extends MessageToByteEncoder<FileDataMessage>{
@Override
protected void encode(ChannelHandlerContext ctx, FileDataMessage msg,
ByteBuf out) throws Exception {
//协议 
//---head开始
// 1.head 2位 short int  2
//1.sesionid 4位 int     4
//2.hash  4为 int        4
//3.pathlenth 为 多少位? 4
//4.blenth 为多少位 ?     4
//8. start 4字节                4
//9. end 4字节                   4
//----body开始
//5.command 1个字节          1
//6. path byte[]        ?
//7. byte[]             ?
   
msg.toBuffer(out);
//out.writeBytes(msg.toBuffer());
}
}

  小文件解码

这里 发送文件 我采用的是netty官方 提供的 那个实例的2个文件,但是我修改了下,

  大文件就不行了 因为udp tcp 字节数的限制,虽然有个FixLenthXXX....可以 拼接 但是 他依然不能上传大于2G的文件,

看看我是怎么实现的  我覆盖了netty的源码类  然后 实现了了流上传的方法

/**
 * 
 *      
 *     
 * @author zhuyuping       
 * @version 1.0     
 * @created 2014-7-28 
 * @function:覆盖掉netty的源文件,实现大文件分割后的 上传
 */
public class MyHttpPostRequestEncoder implements ChunkedInput<HttpContent>{

	 /**
     * Different modes to use to encode form data.
     */
    public enum EncoderMode {
        /**
         *  Legacy mode which should work for most. It is known to not work with OAUTH. For OAUTH use
         *  {@link EncoderMode#RFC3986}. The W3C form recommentations this for submitting post form data.
         */
        RFC1738,

        /**
         * Mode which is more new and is used for OAUTH
         */
        RFC3986
    }

    private static final Map<Pattern, String> percentEncodings = new HashMap<Pattern, String>();

    static {
        percentEncodings.put(Pattern.compile("\\*"), "%2A");
        percentEncodings.put(Pattern.compile("\\+"), "%20");
        percentEncodings.put(Pattern.compile("%7E"), "~");
    }

    /**
     * Factory used to create InterfaceHttpData
     */
    private final HttpDataFactory factory;

    /**
     * Request to encode
     */
    private final HttpRequest request;

    /**
     * Default charset to use
     */
    private final Charset charset;

    /**

然后 我们只要发送文件时候开来,这样 原来2G就分开成 加入我们设置500M 就是就是5分同时并发传输了 然后在系统后台 拼接组装

上面 这里 我使用了MessageDigest生成了一个checksum 用来后面检查文件快的完整性 的,后台业务逻辑还没完全写好

 我们后面使用的文件内存块隐射 加锁形式 ,为什么用它呢,因为 文件太大 如果传统时候 需要在内存中创建 这很需要大量的内存 所以 我们采用这种方式 , 核心代码 在这里 ,这里没添加chesum检查完整性代码 后面会添加上去的

 


ok 然后就这样了,后面添加RPC调用也是 一样很简单,就是系列化  网络传输 事件驱动异步机制 我将会采用probuffer来做系列化 。 下周可能要编代码了 所以不一定有时间更新sql 解析 这一章 所以抱歉了 ,由于文字大小限制

 所以一些是图片 抱歉,大家可以 进 http://git.oschina.net/zhuyuping/distributeTemplate

 或者 https://github.com/zhuyuping/distributeTemplate