首页 > 代码库 > ThreadPoolExecutor并行编程Demo

ThreadPoolExecutor并行编程Demo

    今天要把线上600M的日志进行信息抽取,提取出被多次编码的中文账号。日志文件有200W行,用单线程的程序跑,花了20分钟。感觉速度不够快,而且CPU的使用率一直是25%, 根本没有有效的利用硬件资源。于是尝试用多线程的程序进行提速,CPU耗到了接近100%, 程序内存耗到了200M,最终花了14分钟把数据跑完,提速了6分钟,感觉还不错。因为对JDK的多线程框架还不熟悉,多线程的代码改写了好几版,中间遇到了各种各样的错误。把最好的一版拿出来,分享下,同时也做个笔记。
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.URLDecoder;
import java.util.HashSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class Test {
	static HashSet<String>	<span style="white-space:pre">	</span>set	= new HashSet<String>();
	static PrintWriter		pw	= null;
	static Pattern			p	= Pattern.compile("\".+dd_sdk_apk[^%]+pin\":\"([^%]*%25.+)\"");

	public static void main(String[] args) {

		ThreadPoolExecutor es = new ThreadPoolExecutor(4, 8, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(100000),
				new ThreadPoolExecutor.CallerRunsPolicy());

		try {
			String log = null;
			pw = new PrintWriter(new File("E:/chinese_user6.txt"));
			InputStreamReader fis = new InputStreamReader(new FileInputStream("E:\\data\\export\\Logs\\im.jd.com\\tracker.log"));
			BufferedReader br = new BufferedReader(fis);

			while ((log = br.readLine()) != null) {
				es.execute(new Task(log));
			}

			es.awaitTermination(5, TimeUnit.MINUTES);

			fis.close();
			br.close();
			pw.close();
			System.out.println("--------------------------finish!-------------------------------------");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static String decode(String codedName) {
		String lastName = null;
		try {
			for (int idx = 0;; idx++) {
				lastName = codedName;
				codedName = URLDecoder.decode(codedName, "UTF-8");
				if (codedName.equals(lastName)) {
					return codedName;
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return codedName;
	}

	private static class Task implements Runnable {
		private String	log	= null;

		public Task(String value) {
			this.log = value;
		}

		@Override
		public void run() {
			try {
				Matcher matcher = p.matcher(log);
				if (matcher.find()) {
					String codedName = matcher.group(1);
					if (codedName.length() < 200) {
						String trueName = decode(codedName);
						if (!set.contains(trueName)) {
							set.add(trueName);
							pw.println(trueName);
						}
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
			}

		}
	}
}

    简单的注释下,LinkedBlockingQueue,是链式阻塞队列。一旦队列装满了,就没法再向里面放任务了。这时候采取ThreadPoolExecutor.CallerRunsPolicy的策略,从字面上理解就是,装不下的任务交由线程的调用者(caller)来运行(run)。这样可以避免装不下的任务被扔掉!

    new LinkedBlockingQueue<Runnable>(100000),这个参数的泛型类型一定要是Runnable,不是你自己实现Runnable的那个类 。

    es.execute(new Task(log));这行代码大家可能认为每读取一行,java进程就会创建一个新的线程。非也!ThreadPoolExecutor是一个线程池,每次扔进去的是一个任务,这个任务是拿线程池里的线程来跑,跑完之后,线程还在那里,任务被扔掉。

ThreadPoolExecutor并行编程Demo