首页 > 代码库 > 任务执行

任务执行

  大多数并发应用程序都是围绕“任务执行(Task  Execution)”来构造的:任务通常是一些抽象的且离散的工作单元。

  在生产环境中,“为每个任务分配一个线程”这种方法存在一些缺陷,尤其是当需要创建大量线程时:

  • 线程生命周期的开销非常高。线程的创建与销毁并不是没有代价的。
  • 资源消耗。活跃的线程会消耗系统资源,尤其是内存。  
  • 稳定性。在可创建线程的数量上存在一个限制。

  在一定范围内,增加线程可以提高系统的吞吐率,但如果超出了这个范围,再创建更多的线程只会降低程序的执行速度,并且如果过多地创建一个线程,那么整个应用程序将崩溃,要想避免这种危险,就应该对应用程序可以创建的线程数量进行限制,并且全面的测试应用程序,从而确保在线程数量达到限制时,程序也不会耗尽资源。

  在Java类库中,任务执行的主要抽象不是Thread,而是Executor。虽然Executor是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。

  Executor基于生产者-消费者模式,提交任务的操作相当于生产者(生产待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。如果要在程序中实现一个生产者-消费者的设计,那么最简单的方式通常就是使用Executor。

  

public class TaskExecutionWebServer {
    private static final int NTHREADS = 100;
    private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);

    public static void main(String[] args) throws IOException{
        ServerSocket serverSocket = new ServerSocket(80);
        while (true){
            final Socket connection = serverSocket.accept();
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    handleRequest(connection);
                }
            };
            exec.execute(task);
        }
    }
}

  线程池,从字面啊啊含义来看,是指管理一组同构工作线程的资源池。线程池是与工作队列密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

  为了解决执行服务的生命周期问题,Executor扩展了ExecutorService接口,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法)。ExecutorService的生命周期有3种状态:运行、关闭和已终止。ExecutorService在初始创建时处于运行状态。shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括哪些还未开始执行的任务。shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

  CompletionService将Executor和BlockingQueue的功能融合在一起。你可以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成时将被封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。

public class Renderer {
    private final ExecutorService executorService;

    Renderer(ExecutorService executorService){
        this.executorService = executorService;
    }

    void renderPage(CharSequence source){
        List<ImageInfo> info = scanForImageInfo(source);
        CompletionService<ImageData> completionService =
                new ExecutorCompletionService<ImageData>(executorService);
        for (final ImageInfo imageInfo:info){
            completionService.submit(new Callable<ImageData>() {
                @Override
                public ImageData call() throws Exception {
                    return imageInfo.downloadImage();
                }
            });
        }
        renderText(source);

        try{
            for (int i=0;i<info.size();i++){
                Future<ImageData> f = completionService.take();
                ImageData imageData = f.get();
                renderImage(imageData);
            }
        } catch (InterruptedException e){
            e.printStackTrace();
        } catch (ExecutionException e){
            e.printStackTrace();
        }

    }
}

  有时候,如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。例如,某个web应用程序从外部的广告服务器上获取广告信息,但如果该应用程序在两秒内得不到响应,那么将显示一个默认的广告,这样即使不能获得广告信息,也不会降低站点的响应性能。在支持时间限制的Future.get中支持这种需求:当结果可用时,它将立即返回,如果在执行时间内没有计算出结果,那么将抛出TimeoutException。

  

public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo,
                                                  Set<TravelCompany> companies,
                                                  Comparator<TravelQuote> ranking,long time,
                                                  TimeUnit unit)throws InterruptedException{
       List<QuoteTask> tasks = new ArrayList<QuoteTask>();
       for (TravelCompany company:companies){
           tasks.add(new QuoteTask(company,travelInfo));
       }

       List<Future<TravelQuote>> futures = executorService.invokeAll(tasks,time,unit);

       List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
       Iterator<QuoteTask> taskIterator = tasks.iterator();

       for (Future<TravelQuote> f:futures){
           QuoteTask task = taskIterator.next();
           try {
               quotes.add(f.get());
           } catch (ExecutionException e){
               quotes.add(task.getFailureQuote(e.getCause()));
           }catch (CancellationException e){
               quotes.add(task.getTimeoutQuote(e));
           }
       }

       Collections.sort(quotes,ranking);
       return quotes;
   }