首页 > 代码库 > Java ExecutorService 多线程实践(一)

Java ExecutorService 多线程实践(一)

需要实现一个多线程并发的业务场景,启动若干子线程,最后要所有子线程运行结束才结束。(类似 .NET 里的 Task WaitAll )

Java 中的 ExecutorService 多线程编程模型提供这样一个机制,通过代码来介绍一下。

方法一:ExecutorService#awaitTermination

/**
     * Blocks until all tasks have completed execution after a shutdown
     * request, or the timeout occurs, or the current thread is
     * interrupted, whichever happens first.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return <tt>true</tt> if this executor terminated and
     *         <tt>false</tt> if the timeout elapsed before termination
     * @throws InterruptedException if interrupted while waiting
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

通过 ExecutorService#submit(或者execute) 加入子线程并启动,awaitTermination 阻塞等待所有子线程执行完毕。

sample:

ExecutorService es = Executors.newCachedThreadPool();
		List<Future<String>> futureList = Lists.newArrayList();

		futureList.add(es.submit(new Callable<String>() {
			@Override
			public String call() throws Exception {
				System.out.println("Threading1 is running");				Thread.sleep(1000);
				return "1";
			}
		}));

		futureList.add(es.submit(new Callable<String>() {
			@Override
			public String call() throws Exception {
				System.out.println("Threading2 is running");
				Thread.sleep(3000);
				return "2";
			}
		}));

		futureList.add(es.submit(new Callable<String>() {
			@Override
			public String call() throws Exception {
				System.out.println("Threading3 is running");
				Thread.sleep(2000);
				return "3";
			}
		}));

		es.shutdown();

		try {
			boolean completed = es.awaitTermination(2000, TimeUnit.MILLISECONDS);
			System.out.println("all tasks completed: " + completed);

			//es.wait(3000);

			for(Future future : futureList) {
				try {
					if (future.isDone()) {
						System.out.println("result: " + future.get());
					}
				} catch (CancellationException ex) {
					ex.printStackTrace();
				} finally {
					future.cancel(true);
				}
			}


		} catch (Exception e) {
			e.printStackTrace();
		}

输出结果:

Threading 1 is runningThreading 2 is runningThreading 3 is runningall tasks completed: falseresult: 1result: 3


只有1和3输出了结果,因为 awaitTermination 超时设置了 2000 ms,Threading2 模拟了 3000 ms 因此被超时取消了。通过 Future#isDone() 可以判断对应线程是否处理完毕,
这个场景里如果 isDone() == false 那么就可以认为是被超时干掉了。

需要注意的是,在添加完所有的子线程并启动后调用了 ExecutorService#shutdown 方法:一方面是不再接受新的子线程"提交",另一方面ExecutorService 其实自己也是一个work线程,如果不shutdown 其实当前线程并不会结束。调用shutdown 和不调用shutdown 从main运行后控制台状态就可以看出差异。

最后在 finally 里调用了 Future#cancel() 主要是当await之后,被超时处理的线程可能还在运行,直接取消掉。


方法二:ExecutorService#invokeAll

invokeAll 很像上述实现的整体包装,但细节略有不同。首先将 Callable 统一创建好放在List里交给 invokeAll 方法执行并设置超时时间。

ExecutorService es = Executors.newCachedThreadPool();
		List<Callable<String>> tasks = Lists.newArrayList();

		tasks.add(new Callable<String>() {
			@Override
			public String call() throws Exception {
				System.out.println("Threading 1 is running");
				Thread.sleep(1000);
				return "1";
			}
		});

		tasks.add(new Callable<String>() {
			@Override
			public String call() throws Exception {
				System.out.println("Threading 2 is running");
				Thread.sleep(3000);
				return "2";
			}
		});

		tasks.add(new Callable<String>() {
			@Override
			public String call() throws Exception {
				System.out.println("Threading 3 is running");
				Thread.sleep(2000);
				return "3";
			}
		});


		try {
			List<Future<String>> futureList = es.invokeAll(tasks, 2000, TimeUnit.MILLISECONDS);

			es.shutdown();

			for(Future future : futureList) {
				try {
					if (future.isDone()) {
						System.out.println("result: " + future.get());
					}
				} catch (CancellationException ex) {
					ex.printStackTrace();
				}
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		}
输出结果:

Threading 1 is running
Threading 2 is running
Threading 3 is running
result: 1
result: 3
java.util.concurrent.CancellationException

ExecutorService#invokeAll() 执行后所有 future.isDone() 都是 true,在 future.get() 拿结果的时候,被超时的Future会抛出 CancellationException 。因为在 invokeAll 内部调用了Future#cancel() 方法。

源码如下:

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null || unit == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            long lastTime = System.nanoTime();

            // Interleave time checks and calls to execute in case
            // executor doesn‘t have any/much parallelism.
            Iterator<Future<T>> it = futures.iterator();
            while (it.hasNext()) {
                execute((Runnable)(it.next()));
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
                if (nanos <= 0)
                    return futures;
            }

            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    if (nanos <= 0)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

当然在 ExecutorService 编程模型外,自己定义Threading,通过CountDownLatch 控制也是可以实现的。




Java ExecutorService 多线程实践(一)