首页 > 代码库 > 一个简单高效的多线程解决方案

一个简单高效的多线程解决方案

import java.io.File;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 多线程抓取数据的简单程序
 */
public class MultithreadFetcher {

	/** 阻塞队列的最大长度,防止内存溢出。  */
	public static final int MAX_QUEUE_SIZE = 100;
	/** 最大递归深度,防止递归死循环  */
	public static final int RECURSION_LEVEL = Integer.MAX_VALUE;
	/** 致命毒药,优雅关闭后续的工作线程  */
	private static final File DEADLY_POISON = new File("./deadly.tmp");
	
	/**
	 * 递归遍历文件夹,将遍历的文件放入队列中。
	 * @param folder 目标文件夹
	 * @param fileQueue 文件队列
	 * @param level 递归深度
	 */
	private static void visitFolder(File folder, BlockingQueue<File> fileQueue, int level) throws InterruptedException{
		if(level<=0){//控制递归深度,防止递归死循环。
			return;
		}
		File[] files = folder.listFiles();
		for(File file : files){
			if(file.isDirectory()){
				visitFolder(file,fileQueue,level-1);
			}else if(file.getName().toLowerCase().endsWith(".xml")){
				fileQueue.put(file);
			}else{
				//do nothing ...
			}
		}
	}
	/**
	 * 创建目标文件。通过原文件的名称创建一个新的文件。
	 * @param file 原始文件
	 * @param targetFolder 目标文件夹
	 * @return 新的文件,目标文件
	 */
	private static File createTargetFile(File file, File targetFolder){
		String targetFileName = file.getName();
		return new File(targetFolder,targetFileName);
	}
	/**
	 * 处理文件的操作,可以在这个里面读取文件数据,解析文件,抓取网页,写入备份。
	 * @param file 原始文件,待解析的文件
	 * @param target 目标文件,备份文件
	 */
	private static void travelFile(File file, File target) throws Throwable{
		//详细操作从略
	}
	
	/** 递归文件夹的线程。不支持多线程并发递归。 */
	static class VisitFolderThread extends Thread{
		private File folder;
		private BlockingQueue<File> fileQueue;
		public VisitFolderThread(File folder, BlockingQueue<File> fileQueue) {
			super("visit-folder-thread");
			this.folder = folder;
			this.fileQueue = fileQueue;
		}
		@Override
		public void run() {
			try {
				visitFolder(folder, fileQueue, RECURSION_LEVEL);
				fileQueue.put(DEADLY_POISON);//放置毒药,优雅关闭
			} catch (InterruptedException e) {
				// 在这里可以做一些异常处理
				e.printStackTrace();
			}
		}
	}
	
	/** 处理文件的线程,可以多线程并发处理,每个线程处理一个文件  */
	static class TravelFileThread extends Thread{
		private static final AtomicInteger ThreadCount = new AtomicInteger();
		private File targetFolder;
		private BlockingQueue<File> fileQueue;
		public TravelFileThread(File targetFolder, BlockingQueue<File> fileQueue) {
			super("travel-file-thread-"+ThreadCount.incrementAndGet());
			this.targetFolder = targetFolder;
			this.fileQueue = fileQueue;
		}
		@Override
		public void run() {
			File file = null;
			try {
				while((file=fileQueue.take())!=DEADLY_POISON){
					File target = createTargetFile(file, targetFolder);
					try {
						travelFile(file, target);
					} catch (Throwable e) {
						onException(e,file,target);
					}
				}
				fileQueue.put(DEADLY_POISON);//放置毒药,优雅关闭
			} catch (InterruptedException e) {
				// 在这里可以做一些异常处理
				e.printStackTrace();
			}
		}
		/** 在处理文件的过程中,如果抛出异常,则进入下面的处理程序,从略。 */
		private void onException(Throwable e, File file, File target) {
			// 如果travelFile抛出异常,则在此处进行处理。
			e.printStackTrace();
		}
	}
	
	private BlockingQueue<File> fileQueue = new LinkedBlockingQueue<File>(MAX_QUEUE_SIZE);
	private Thread visitFolderThread;
	private Thread[] travelFileThreads;
	
	public MultithreadFetcher(File sourceFolder, File targetFolder, int travelThreads) {
		super();
		visitFolderThread = new VisitFolderThread(sourceFolder, fileQueue);
		travelFileThreads = new TravelFileThread[travelThreads];
		for(int i=0;i<travelFileThreads.length;i++){
			travelFileThreads[i] = new TravelFileThread(targetFolder, fileQueue);
		}
	}
	
	/**
	 * 开始执行
	 */
	public void start(){
		visitFolderThread.start();
		for(int i=0;i<travelFileThreads.length;i++){
			travelFileThreads[i].start();
		}
	}
	/**
	 * 强行终止。请慎用。程序会自动关闭
	 */
	public void terminate(){
		visitFolderThread.interrupt();
		for(int i=0;i<travelFileThreads.length;i++){
			travelFileThreads[i].interrupt();
		}
	}
	
	/**
	 * 测试用例
	 */
	public static void main(String[] args) {
		final File sourceFolder = new File("");
		final File targetFolder = new File("");
		final int travelThreads = 20;
		MultithreadFetcher fetcher = new MultithreadFetcher(sourceFolder,targetFolder,travelThreads);
		fetcher.start();
	}

}

采取了数据分离的形式,消除了线程间互斥。效率不错。

一个简单高效的多线程解决方案