首页 > 代码库 > Java多线程系列七——ExecutorService

Java多线程系列七——ExecutorService

java.util.concurrent.ExecutorService接口提供了许多线程管理的方法

Method 说明
shutdown 拒绝接收新的任务,待已提交的任务执行后关闭,且宿主线程不阻塞,若需要阻塞可借助awaitTermination实现
shutdownNow 停止所有正在执行的任务,挂起未执行的任务并关闭,且宿主线程不阻塞,若需要阻塞可借助awaitTermination实现
awaitTermination 当发生shutdown时,阻塞宿主线程直到约定的时间已过或者所有任务完成
submit 提交任务Callable/Runnable,可利用Future的get()方法使宿主线程阻塞直到任务结束后返回结果

有了以上方法,便可以基于此接口实现线程池的各种功能(例如java.util.concurrent.ThreadPoolExecutor/java.util.concurrent.ScheduledThreadPoolExecutor),以java.util.concurrent.ThreadPoolExecutor为例,其参数的详解

Name Type 说明
corePoolSize int 线程池中最小的线程数
maximumPoolSize int 线程池中最大的线程数
keepAliveTime long 线程空闲时间,若线程数大于corePoolSize,空闲时间超过该值的线程将被终止回收
unit TimeUnit keepAliveTime的时间单位
workQueue BlockingQueue<Runnable> 已提交但未执行的任务队列
threadFactory ThreadFactory 创建新线程的工厂
handler RejectedExecutionHandler 当线程池或队列达到上限拒绝新任务抛出异常时的处理类

同时,java.util.concurrent.Executors类提供了基于java.util.concurrent.ThreadPoolExecutor类的工具方法,常用方法有

Method 说明
newFixedThreadPool 线程池中含固定数量的线程
newSingleThreadExecutor 线程池中仅含一个工作线程
newCachedThreadPool 按需创建线程,若线程池中无可用线程,则创建新的线程并加入,直到线程数达到上限值(Integer.MAX_VALUE)

测试代码如下

import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.junit.Assert;
import org.junit.Test;

/**
 * @Description: 测试ExecutorService
 */
public class ThreadExecutorServiceTest {
    private static final String THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION = "This is shutdownWithAwaitTermination";
    private static final int RESULT = 111;

    private static boolean submitRunnable() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<?> future = executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("This is submitRunnable");
            }
        });
        return future.get() == null;
    }

    private static Integer submitRunnableWithResult() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("This is submitRunnableWithResult");
            }
        }, RESULT);
        return future.get();
    }

    private static Integer submitBlockCallable() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        Future<Integer> future = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("This is submitBlockCallable");
                return RESULT;
            }
        });
        return future.get();// 阻塞
    }

    private static boolean submitNonBlockCallable() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        Future<Integer> future = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("This is submitNonBlockCallable");
                return RESULT;
            }
        });
        while (!future.isDone()) {// 非阻塞
            System.out.println(new Date());
        }
        return future.isDone();
    }

    private static String shutdown() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        final StringBuilder sb = new StringBuilder();
        executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Thread.sleep(10000);
                sb.append("This is shutdown");
                return RESULT;
            }
        });
        executorService.shutdown();
        return sb.toString();
    }

    private static String shutdownWithAwaitTermination() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        final StringBuilder sb = new StringBuilder();
        executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Thread.sleep(10000);
                sb.append(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION);
                return RESULT;
            }
        });
        executorService.shutdown();
        executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
        return sb.toString();
    }

    @Test
    public void test() throws InterruptedException, ExecutionException {
        Assert.assertTrue(submitRunnable());
        Assert.assertEquals(RESULT, submitRunnableWithResult().intValue());
        Assert.assertEquals(RESULT, submitBlockCallable().intValue());
        Assert.assertTrue(submitNonBlockCallable());
        Assert.assertTrue(shutdown().isEmpty());
        Assert.assertEquals(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION, shutdownWithAwaitTermination());
    }

}

 

Java多线程系列七——ExecutorService