首页 > 代码库 > JAVA进阶-多线程(3)
JAVA进阶-多线程(3)
1.以前使用线程API并没有返回结果,Callable/Future/FutureTask正是解决了此问题,并在调用过程中
作出对异常的捕获
-Callable执行call()方法返回Object对象,也可抛出异常;调用Callable并不像Thread,而是调用<T> Future ExecutorService.submit(Callable<T> task);
-Future 返回值,调用该接口的get()方法,可返回对应的对象
------------------
SalesCalculateSample.java>通过多线程计算矩阵每行结果并叠加;
/** * * @author Lean @date:2014-9-30 */ public class SalesCalculateSample { private static final int NUMBER_OF_MONTH=12; private static final int NUMBER_OF_CUSTOMER=100; private static int[][] cells; static class Summer implements Callable<Integer>{ public int customerID; public Summer(int companyId){ this.customerID=companyId; } @Override public Integer call() throws Exception { int sum=0; for (int i = 0; i < NUMBER_OF_MONTH; i++) { sum+=cells[customerID][i]; } System.out.printf("customerID:%d ,sum:%d\n",customerID,sum); return sum; } } public static void main(String[] args) { generateMatrix(); ExecutorService executor=Executors.newFixedThreadPool(10); Set<Future<Integer>> futures=new HashSet<Future<Integer>>(); for (int i = 0; i < NUMBER_OF_CUSTOMER; i++) { Callable<Integer> caller=new Summer(i); futures.add(executor.submit(caller)); } // caculate the sum int sum=0; for (Future<Integer> future : futures) { try { sum+=future.get(); } catch (Exception e) { e.printStackTrace(); } } System.out.println("sum is>>"+sum); executor.shutdown(); } private static void generateMatrix() { cells=new int[NUMBER_OF_CUSTOMER][NUMBER_OF_MONTH]; for (int i = 0; i < NUMBER_OF_CUSTOMER; i++) { for (int j = 0; j < NUMBER_OF_MONTH; j++) { cells[i][j]=(int)(Math.random()*100); } } } }
------------------
CanCancelProcessors>随机取消提交的订单(Future提供了可取消的结果执行)
/** * * @author Lean @date:2014-10-7 */ public class CanCancelProcessors { private static ExecutorService service=Executors.newFixedThreadPool(100); private static final int ORDERS_COUNT=2000; private static ArrayList<Future<Integer>> futures=new ArrayList<Future<Integer>>(); public static void main(String[] args) { for (int i = 0; i <ORDERS_COUNT; i++) { futures.add(service.submit(new OrderExcutor(i))); } new Thread(new EvilThread(futures)).start(); try { service.awaitTermination(6,TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } int count=0; for (Future<Integer> future : futures) { if (future.isCancelled()) { count++; } } System.out.println("----------"+count+" orders canceled!---------"); service.shutdown(); } static class OrderExcutor implements Callable<Integer>{ private int mId; public OrderExcutor(int id){ this.mId=id; } @Override public Integer call() throws Exception { try { Thread.sleep(1000); } catch (InterruptedException e) { } System.out.println("successfully execute orderid : "+mId); return mId; } } static class EvilThread implements Runnable{ private ArrayList<Future<Integer>> futures; public EvilThread(ArrayList<Future<Integer>> futures){ this.futures=futures; } @Override public void run() { for (int i = 0; i < ORDERS_COUNT; i++) { try { Thread.sleep(200); boolean flag=futures.get(i).cancel(true); System.out.println("cancel order >"+flag +" by id>> "+i); } catch (InterruptedException e) { } } } } }------------------
-FutureTask 集成了Runnable与Future接口的功能,因此拥有异步,返回数据的功能;
------------------
FutureTaskSample
/** * * @author Lean @date:2014-9-30 */ public class FutureTaskSample { private static final int NUMBER_OF_MONTH=12; private static final int NUMBER_OF_CUSTOMER=100; private static int[][] cells; static class Summer implements Callable<Integer>{ public int customerID; public Summer(int companyId){ this.customerID=companyId; } @Override public Integer call() throws Exception { int sum=0; for (int i = 0; i < NUMBER_OF_MONTH; i++) { sum+=cells[customerID][i]; } System.out.printf("customerID:%d ,sum:%d\n",customerID,sum); return sum; } } public static void main(String[] args) { generateMatrix(); Set<FutureTask<Integer>> futures=new HashSet<FutureTask<Integer>>(); for (int i = 0; i < NUMBER_OF_CUSTOMER; i++) { Callable<Integer> caller=new Summer(i); FutureTask<Integer> futureTask=new FutureTask<Integer>(caller); futureTask.run(); futures.add(futureTask); } // caculate the sum int sum=0; for (FutureTask<Integer> future : futures) { try { sum+=future.get(); } catch (Exception e) { e.printStackTrace(); } } System.out.println("sum is>>"+sum); } private static void generateMatrix() { cells=new int[NUMBER_OF_CUSTOMER][NUMBER_OF_MONTH]; for (int i = 0; i < NUMBER_OF_CUSTOMER; i++) { for (int j = 0; j < NUMBER_OF_MONTH; j++) { cells[i][j]=(int)(Math.random()*100); } } } }------------------
2.Executors
.newFixedThreadPool()创建固定线程池
.newCachedThreadPool()创建固定线程池,每个线程在60秒内不再创建
.newSingleThreadExecutor()创建单线程 该线程不会被销毁
.newScheduledExecutorService()创建可定时,延时执行的线程
------------------
ScheduledCountSample>定时,延时度取数字的例子
/** * ScheduledExecutorServiceSample * * @author Lean @date:2014-10-7 */ public class ScheduledCountSample { private static ScheduledExecutorService mService=Executors.newScheduledThreadPool(10); private static final int AVG=4; private static final int mAllCount=400; private static int mCurrenCount=0; public static void main(String[] args) { //delay counting ! // int times = mAllCount/AVG; // for (int i = 0; i < times; i++) { // mService.schedule(new tryCount(i), i*1,TimeUnit.SECONDS); // } // mService.scheduleAtFixedRate(new EveryCount(), 0, 1, TimeUnit.SECONDS); mService.scheduleWithFixedDelay(new EveryCount(), 0, 1, TimeUnit.SECONDS); } static class EveryCount implements Runnable{ @Override public void run() { if (mAllCount>mCurrenCount) { System.out.println("ThreadId>>"+Thread.currentThread().getId()+" and count >>"+mCurrenCount++); } } } static class tryCount implements Callable<Integer>{ private int Index; public tryCount(int index) { this.Index=index; } @Override public Integer call() throws Exception { for (int i = Index*AVG+0; i < (AVG+Index*AVG); i++) { System.out.println("count >>"+i); } System.out.println("thread count end! "); return Index; } } }------------------
-ExecutorCompletionService
通常获取结果用get(),该方法形成阻塞.而通过take()获取已经结束的任务的结果
------------------
GetResultRightNow
/** * 获取非阻塞型结果 * * @author Lean @date:2014-10-7 */ public class GetResultRightNow { public static void main(String[] args) { int[] printNum={1000,200,200,30000,5000}; ArrayList<Future<Integer>> fetures=new ArrayList<Future<Integer>>(); ExecutorService executors = Executors.newFixedThreadPool(2); ExecutorCompletionService<Integer> service=new ExecutorCompletionService<Integer>(executors); for (int i : printNum) { fetures.add(service.submit(new getCurrnNum(i))); } try { executors.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < printNum.length; i++) { int num=0; try { //take it when there has a result; num = service.take().get(); //stop when there has no result no matter others having; // num=fetures.get(i).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("num is:"+num); } executors.shutdown(); } static class getCurrnNum implements Callable<Integer>{ private int printNum; public getCurrnNum(int i) { printNum=i; } @Override public Integer call() throws Exception { try { Thread.sleep(printNum); } catch (Exception e) { } return printNum; } } }------------------
JAVA进阶-多线程(3)
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。