首页 > 代码库 > JAVA NIO服务器间连续发送文件(本地测试版)

JAVA NIO服务器间连续发送文件(本地测试版)

说在前面:给我的需求是实现从服务器A将大量文件(大概几十TB)发送到服务器B,在A服务器生成文件的MD5码,并在服务器B进行md5验证,验证通过保存。

我的实现思路:

  • 将待上传的所有文件目录生成一个txt文件,格式如下。前缀中,当后面的数字等于9999的时候,前面的数字会自行相加。(此处加前缀的目的是为了整齐,而且失败了便于查询。)

AAA0000:D:\upload\addChannel.html

AAA0001:D:\upload\addChannel2.html

AAA0002:D:\upload\addContactPerson.html

AAA0003:D:\upload\admin.html

AAA0004:D:\upload\businessOfChannel.html

....

AAA9999:D:\upload\admin1.html

AAB0000:D:\upload\businessOfChannel1.html

...

  • 每次读取一条目录,进行上传。

  • 本地测试版未去实现的部分,没有把成功和失败的目录写到文件中,也没有添加日志。


第一部分:将文件目录存储到文本中,文件夹不进行存储。

import java.io.File;
import java.io.FileOutputStream;

public class ReadAllPaths {
	private static final String rootPath="D:/upload/"; //the root path of the files which will be copied 
	private static final String filePath="G:/temp/unUploadedFilePath.txt";//the record of all files path
	/*
	 * the items of prefix and num construct the path prefix,for example AAA0001
	 * and it‘s mainly  convenient for searching
	 */
	private String prefix="AAA"; 
	private int num=0;
	/**
	 * main
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		ReadAllPaths  paths=new ReadAllPaths();
		File file=new File(filePath);
		if(file.exists()){
			file.delete();
		}
		FileOutputStream out=new FileOutputStream(file,true);
		paths.getAllPaths(rootPath, out);
		out.close();
	}
	
	/**
	 * get all path out
	 * @param root
	 * @param out
	 * @throws Exception
	 */
	private  void getAllPaths(String root,FileOutputStream out) throws Exception{
		File file=new File(root);
		
		if(file.isDirectory()){
			try{if(file.list().length==0){
				return;
			}else{
				String[] files=file.list();
				for(String f:files){
					getAllPaths(root+f+File.separator, out);
				}
			}
			}catch(NullPointerException npe){
				return;
			}
		}else{
			String pathNum=getPathNum();
			String path=file.getAbsolutePath();
			out.write((pathNum+":"+path+"\n").getBytes());
		}
	}
	/**
	 * get the path prefix
	 * @return
	 */
	private String getPathNum(){
		StringBuilder sb=new StringBuilder();
		sb.append(getPrefix()).append(getNum());
		setNum();
		return sb.toString();
	}
	
	/**
	 * get the String prefix of path prefix
	 * @return
	 */
	private String getPrefix() {
		return prefix;
	}
	/**
	 * set the String prefix of path prefix
	 * for example:AAA AAB AAC....AAZ ABA....AZZ BAA...
	 */
	private void setPrefix() {
		char[] ch=new char[3];
		ch=getPrefix().toCharArray();
		if(ch[2]!=‘Z‘){
			ch[2]++;
		}else{
			ch[2]=‘A‘;
			if(ch[1]!=‘Z‘){
				ch[1]++;
			}else{
				ch[1]=‘A‘;
				ch[0]++;
			}
		}
		prefix=new String(ch);
	}
	/**
	 * get the int prefix of path prefix
	 * @return
	 */
	private String getNum() {
		StringBuffer sb=new StringBuffer();
		if(num<10){
			sb.append("000").append(num);
		}else if(num<100){
			sb.append("00").append(num);
		}else if(num<1000){
			sb.append("0").append(num);
		}else{
			sb.append(num);
		}
		return sb.toString();
	}
	/**
	 * set the int prefix of path prefix
	 * and the max num is 9999 and the min is 0000
	 */
	private void setNum() {
		if(num<9999){			
			num++;
		}else{
			num=0;
			setPrefix();
		}
	}
}

第二部分,服务器端代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;

public class Server {
	Selector selector = null;
	ServerSocketChannel serverSocketChannel = null;
	private NioserverHandler2 handler;

	public Server() throws IOException {
		selector = Selector.open();
		// 打开服务器套接字通道
		serverSocketChannel = ServerSocketChannel.open();

		// 调整通道的阻塞模式非阻塞
		serverSocketChannel.configureBlocking(false);
		//serverSocketChannel.socket().setReuseAddress(true);
		serverSocketChannel.socket().bind(new InetSocketAddress(9999));

		serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
	}

	public Server(NioserverHandler2 handler) throws IOException {

		this();

		this.handler = handler;
		while (selector.select() > 0) {
			Iterator<SelectionKey> it = selector.selectedKeys().iterator();
			while (it.hasNext()) {
				SelectionKey s = it.next();
				it.remove();
				this.handler.excute((ServerSocketChannel) s.channel());
			}
		}
	}

	public static void main(String[] args) throws IOException {
		new Server(new NioserverHandler2());
	}
}
public class NioserverHandler2 {


	private final static String DIRECTORY = "G:\\NioRequest\\";

	/**
	 * 这里边我们处理接收和发送
	 * 
	 * @param serverSocketChannel
	 */
	public void excute(ServerSocketChannel serverSocketChannel) {
		SocketChannel socketChannel = null;
		try {
			socketChannel = serverSocketChannel.accept(); // 等待客户端连接

			RequestObject2 requestObject = receiveData(socketChannel);// 接数据
			// logger.log(Level.INFO,requestObject.toString());
			String md5 = DigestUtils.md5Hex(requestObject.getContents());
			String response = "";
			if (md5.equals(requestObject.getMd5())) {
				response = (new ResponseObject("succeed", requestObject.getAbsolutePath(), "")).toString();
				File file = new File(DIRECTORY + requestObject.getRelativePath());
				if (!file.exists()) {
					file.mkdirs();
				}
				File file1 = new File(DIRECTORY + requestObject.getRelativePath() + requestObject.getFilename());
				if (!file1.exists()) {
					file1.createNewFile();
				}
				FileOutputStream fos = new FileOutputStream(file1);
				fos.write(requestObject.getContents());
				fos.close();
			} else {
				response = (new ResponseObject("failed", requestObject.getAbsolutePath(), "md5验证失败")).toString();

			}
			System.out.println(response);
			responseData(socketChannel, response);
			// logger.log(Level.INFO, response);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	/**
	 * <p>
	 * 读取通道中的数据到Object里去
	 * </p>
	 * 
	 * @param socketChannel
	 * @return
	 * @throws IOException
	 */
	public RequestObject2 receiveData(SocketChannel socketChannel) throws IOException {

		// 文件名
		String fileName = null;
		String relativePath = null;
		String absolutePath = null;
		String md5 = null;
		// 文件长度
		int contentLength = 0;
		// 文件内容
		byte[] contents = null;
		// 由于我们解析时前4个字节是文件名长度
		int capacity = 4;
		ByteBuffer buf = ByteBuffer.allocate(capacity);
		int size = 0;
		byte[] bytes = null;
		// 拿到文件名的长度
		size = socketChannel.read(buf);
		if (size >= 0) {
			buf.flip();
			capacity = buf.getInt();
			buf.clear();
		}

		// 拿文件名,相信文件名一次能够读完,如果你文件名超过1K 你有病了
		buf = ByteBuffer.allocate(capacity);
		size = socketChannel.read(buf);
		if (size >= 0) {
			buf.flip();
			bytes = new byte[size];
			buf.get(bytes);
			buf.clear();
		}
		String fileInfo = new String(bytes);
		System.out.println(fileInfo);
		fileName = fileInfo.split(";")[0];
		relativePath = fileInfo.split(";")[1];
		absolutePath = fileInfo.split(";")[2];
		md5 = fileInfo.split(";")[3];
		// 拿到文件长度
		capacity = 4;
		buf = ByteBuffer.allocate(capacity);
		size = socketChannel.read(buf);
		if (size >= 0) {
			buf.flip();
			// 文件长度是可要可不要的,如果你要做校验可以留下
			capacity = buf.getInt();
			buf.clear();
		}
		if (capacity == 0) {
			contents = new byte[] {};
		} else {
			// 用于接收buffer中的字节数组
			ByteArrayOutputStream baos = new ByteArrayOutputStream();
			// 文件可能会很大
			// capacity = 1024;
			buf = ByteBuffer.allocate(capacity);
			while ((size = socketChannel.read(buf)) >= 0) {
				buf.flip();
				bytes = new byte[size];
				buf.get(bytes);
				baos.write(bytes);
				buf.clear();
			}
			contents = baos.toByteArray();
		}

		RequestObject2 requestObject = new RequestObject2(fileName, relativePath, absolutePath, md5, contents);

		return requestObject;
	}


	private void responseData(SocketChannel socketChannel, String response) {
		ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
		try {
			socketChannel.write(buffer);
			buffer.clear();
			// 确认要发送的东西发送完了关闭output 不然它端接收时socketChannel.read(Buffer)
			// 很可能造成阻塞 ,可以把这个(L)注释掉,会发现客户端一直等待接收数据
			socketChannel.socket().shutdownOutput();// (L)
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

}

import java.io.Serializable;

public class RequestObject2 implements Serializable {
	private static final long serialVersionUID = 1L;
	private String filename;
	private String relativePath;
	private String absolutePath;
	private String md5;
	private byte[] contents;
	public RequestObject2(String filename, String relativePath, String absolutePath, String md5, byte[] contents) {
		this.filename = filename;
		this.relativePath = relativePath;
		this.absolutePath = absolutePath;
		this.md5 = md5;
		this.contents = contents;
	}
	public String getFilename() {
		return filename;
	}
	public String getRelativePath() {
		return relativePath;
	}
	public String getAbsolutePath() {
		return absolutePath;
	}
	public String getMd5() {
		return md5;
	}
	public byte[] getContents() {
		return contents;
	}

第三部分 客户端代码

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class Client2 {
	private static final String unpath = "G:\\temp\\unUploadedFilePath.txt";
	private static final String pathPre = "D:\\upload\\";
	private static final String IPADDR = "127.0.0.1";
	private static final int PORT = 9999;
	Selector selector;

	public Client2() throws IOException {
		selector = Selector.open();
		new Thread(new SendDataRunnable()).start();
	}

	private class SendDataRunnable implements Runnable {
		private ClientHandler handler;

		public SendDataRunnable() {
			handler = new ClientHandler();
		}

		@Override
		public void run() {
			try {
				BufferedReader reader = new BufferedReader(new FileReader(new File(unpath)));
				String path = "";
				while ((path = reader.readLine()) != null && path.length() != 0) {
					SocketChannel socketChannel;
					socketChannel = SocketChannel.open();
					socketChannel.connect(new InetSocketAddress(IPADDR, PORT));
					socketChannel.configureBlocking(false);
					socketChannel.register(selector, SelectionKey.OP_READ);

					handler.sendData(socketChannel, path, pathPre);
					String response = handler.receiveData(socketChannel);
					System.out.println(response);
					socketChannel.close();
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	}

	public static void main(String[] args) throws IOException {
		Client2 client = new Client2();
	}

}


import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

import org.apache.commons.codec.digest.DigestUtils;


public class ClientHandler {
	
	public void sendData(SocketChannel socketChannel,String path,String pathPre)throws Exception{
			System.out.println(path);
			String absoluteFilePath=getAbsoluteFilePath(path);
			String fileName=getFileName(absoluteFilePath);
			String relativeFilePath=getRelativeFilePath(absoluteFilePath, pathPre,fileName);
			System.out.println(absoluteFilePath);
			byte[] bytes=makeFileToBytes(absoluteFilePath);
			System.out.println(bytes.length);
			String md5=DigestUtils.md5Hex(bytes);
			String fileInfo=new StringBuffer()
					.append(fileName)
					.append(";")
					.append(relativeFilePath)
					.append(";")
					.append(path)
					.append(";")
					.append(md5)
					.toString();
			System.out.println(fileInfo);
			ByteBuffer buffer = ByteBuffer.allocate(8 +fileInfo.getBytes().length+bytes.length);
			buffer.putInt(fileInfo.getBytes().length);
			buffer.put(fileInfo.getBytes());
			buffer.putInt(bytes.length);
			buffer.put(ByteBuffer.wrap(bytes));
			buffer.flip();
			socketChannel.write(buffer);
			buffer.clear();
		// 关闭输出流防止接受时阻塞,就是告诉接收方本次的内容已经发完了,你不用等了
		socketChannel.socket().shutdownOutput();
	}

	private String getAbsoluteFilePath(String path){
		return path.substring(8);
	}
	private String getRelativeFilePath(String absoluteFilePath,String pathPre,String fileName){
		return absoluteFilePath.substring(pathPre.length(),absoluteFilePath.length()-fileName.length());
	}
	private String getFileName(String path){
		return new File(path).getName();
	}
	
	
	private byte[] makeFileToBytes(String filePath){
		File file=new File(filePath);
		byte[] ret = null;  
        try {    
            FileInputStream in = new FileInputStream(file);  
            ByteArrayOutputStream out = new ByteArrayOutputStream(4096);  
            byte[] b = new byte[4096];  
            int n;  
            while ((n = in.read(b)) != -1) {  
                out.write(b, 0, n);  
            }  
            in.close();  
            out.close();  
            ret = out.toByteArray();  
        } catch (IOException e) {  
            // log.error("helper:get bytes from file process error!");  
            e.printStackTrace();  
        }  
        return ret;  
	}
	
	public String receiveData(SocketChannel socketChannel) throws IOException {
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
		String response = "";
		try {
			ByteBuffer buffer = ByteBuffer.allocate(1024);
			byte[] bytes;
			int count = 0;
			while ((count = socketChannel.read(buffer)) >= 0) {
				buffer.flip();
				bytes = new byte[count];
				buffer.get(bytes);
				baos.write(bytes);
				buffer.clear();
			}

			bytes = baos.toByteArray();
			response = new String(bytes, "UTF-8");
//			socketChannel.socket().shutdownInput();
		} finally {
			try {
				baos.close();
			} catch (Exception ex) {
			}
		}
		return response;
	}

}

/*至此全部完成,注释不够多,部分代码是从网上找的。后期有时间会补全注释的,或者下次直接上最终使用的代码*/

本文出自 “枫叶还没红” 博客,请务必保留此出处http://itlearninger.blog.51cto.com/12572641/1945045

JAVA NIO服务器间连续发送文件(本地测试版)