首页 > 代码库 > ThreadPoolExecutor并行编程Demo
ThreadPoolExecutor并行编程Demo
今天要把线上600M的日志进行信息抽取,提取出被多次编码的中文账号。日志文件有200W行,用单线程的程序跑,花了20分钟。感觉速度不够快,而且CPU的使用率一直是25%, 根本没有有效的利用硬件资源。于是尝试用多线程的程序进行提速,CPU耗到了接近100%, 程序内存耗到了200M,最终花了14分钟把数据跑完,提速了6分钟,感觉还不错。因为对JDK的多线程框架还不熟悉,多线程的代码改写了好几版,中间遇到了各种各样的错误。把最好的一版拿出来,分享下,同时也做个笔记。
import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.URLDecoder; import java.util.HashSet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; public class Test { static HashSet<String> <span style="white-space:pre"> </span>set = new HashSet<String>(); static PrintWriter pw = null; static Pattern p = Pattern.compile("\".+dd_sdk_apk[^%]+pin\":\"([^%]*%25.+)\""); public static void main(String[] args) { ThreadPoolExecutor es = new ThreadPoolExecutor(4, 8, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(100000), new ThreadPoolExecutor.CallerRunsPolicy()); try { String log = null; pw = new PrintWriter(new File("E:/chinese_user6.txt")); InputStreamReader fis = new InputStreamReader(new FileInputStream("E:\\data\\export\\Logs\\im.jd.com\\tracker.log")); BufferedReader br = new BufferedReader(fis); while ((log = br.readLine()) != null) { es.execute(new Task(log)); } es.awaitTermination(5, TimeUnit.MINUTES); fis.close(); br.close(); pw.close(); System.out.println("--------------------------finish!-------------------------------------"); } catch (Exception e) { e.printStackTrace(); } } public static String decode(String codedName) { String lastName = null; try { for (int idx = 0;; idx++) { lastName = codedName; codedName = URLDecoder.decode(codedName, "UTF-8"); if (codedName.equals(lastName)) { return codedName; } } } catch (Exception e) { e.printStackTrace(); } return codedName; } private static class Task implements Runnable { private String log = null; public Task(String value) { this.log = value; } @Override public void run() { try { Matcher matcher = p.matcher(log); if (matcher.find()) { String codedName = matcher.group(1); if (codedName.length() < 200) { String trueName = decode(codedName); if (!set.contains(trueName)) { set.add(trueName); pw.println(trueName); } } } } catch (Exception e) { e.printStackTrace(); } } } }
简单的注释下,LinkedBlockingQueue,是链式阻塞队列。一旦队列装满了,就没法再向里面放任务了。这时候采取ThreadPoolExecutor.CallerRunsPolicy的策略,从字面上理解就是,装不下的任务交由线程的调用者(caller)来运行(run)。这样可以避免装不下的任务被扔掉!
new LinkedBlockingQueue<Runnable>(100000),这个参数的泛型类型一定要是Runnable,不是你自己实现Runnable的那个类 。
es.execute(new Task(log));这行代码大家可能认为每读取一行,java进程就会创建一个新的线程。非也!ThreadPoolExecutor是一个线程池,每次扔进去的是一个任务,这个任务是拿线程池里的线程来跑,跑完之后,线程还在那里,任务被扔掉。
ThreadPoolExecutor并行编程Demo
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。