首页 > 代码库 > CompletableFuture

CompletableFuture

  若你的意图是并发,而非并行,或者你的主要目标是在同一个CPU上执行几个松耦合的任务,充分利用CPU的核,让其足够忙碌,从而最大化程序的吞吐量,那么其实真正想做的避免因为等待远程服务的返回,或对数据库的查询而阻塞线程的执行,浪费计算资源。

  

  Future接口在Java 5中引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行计算结果的引用。使用Future只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService。可以调用get方法获取操作的结果。若操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成,返回响应的结果。Future还提供了一个参数设置超时时间,通过它可以定义线程等待Future结果的最长时间。

  Future接口的局限:  

    将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果

    等待Future集合中的所有任务都完成

    仅等待Future集合中最快结束的任务完成

    通过编程方式完成一个Future任务的执行

    应对Future的完成事件

  

  同步API和异步API

    同步API是对传统方法调用的另一种称呼:你调用了某个方法,调用方在被调用方运行的过程中会等待,被调用方运行结束返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方在不同的线程中运行,调用方还需要等待被调用方结束运行,这就是阻塞时调用这个名词的由来。异步API会直接返回,至少在被调用方计算完成之前,将它的剩余的计算任务交给另一个线程去做,该线程和调用方是异步的,这就是非阻塞式调用的由来。执行剩余计算任务的线程会将它的计算结果返回给调用方。返回方式要么是通过回调函数,要么是调用方再次执行一个“等待,直到计算完成”的方法调用。

 

  若调用的方法出现了错误,用于提示错误会被限制在当前异步的线程内,从而导致get方法返回结果的客户端永久的被阻塞。客户端可以使用重载版本的get方法,使用一个超时参数来避免发生这样的情况。当超时发生时,程序会得到通知发生了TimeoutException。也可以使用CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出。

    public Future<Double> getPrice(final String product){
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() ->{
            try{
                double price = calculatePrice(product);
                futurePrice.complete(price);
            }catch (Exception e){
                futurePrice.completeExceptionally(e);
            }

        }).start();
        return futurePrice;
    }

 

  CompletableFuture的supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程运行,但也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行线程执行生产者方法。

  因此上段代码可以简写成:

    public Future<Double> getPrice(final String product){

            return CompletableFuture.supplyAsync(() -> calculatePrice(product)); 

     }

 

  并行流及CompletableFuture

    并行流和CompletableFuture内部采用的都是同样的线程池,默认都是使用固定数目的线程,具体线程取决于Runtime.getRuntime().availableProcessors()的返回值。而CompletableFuture具有一定的优势,因为它允许你对执行器进行配置,尤其是线程池的大小,让它以更适应需求的方式进配置,满足程序的要求,这时并行流API无法提供的。

  若线程池中的线程数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换。若线程的数目过少,线程池大小与处理器的利用率之比可用下面的公式进行估算:

  N(threads) = N(cpu) * U(cps) * (1 + W/C);

    N(cpu)是处理器的核的数目,U(cpu)是期望的CPU的利用率,W/C是等待时间与计算时间的比值

  对集合进行并行计算有两种方式:一种是将其转化成并行流,利用map开展工作,另一种是枚举集合中的每个元素,创建新的线程,在CompletableFuture内进行操作。若进行的是计算密集型的操作,并且没有IO,推荐使用Stream接口。若涉及IO,则使用CompletableFuture接口。

 

  CompletableFuture.thenCompose()方法允许对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。thenComposeAsync()方法会将后续的任务提交到另个线程中运行。

  CompletableFuture.thenCombine()方法,它接受名为BiFunction的第二个参数,这个参数定义了当两个CompletableFuture对象完成计算后,结果如何合并。thenCombineAsync()方法会导致BiFunction中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行。

  Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(produce))

                            .thenCombine(Completable.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate);

 

  CompletableFuture.thenAccept()执行完毕后的返回值做参数。异步版本的方法对处理结果的消费者进行调度,从线程池中选择一个新的线程继续执行,不再由同一个线程完成CompletableFuture的所有任务。一旦CompletableFuture计算到结果,它就返回一个CompletableFuture<Void>。

  CompletableFuture.allOff工厂方法接收由一个CompletableFuture构成的数组,数组中的所有CompletableFuture对象执行完成之后,它返回一个CompletableFuture<Void>对象。若需要等待最初的Stream中所有的CompletableFuture对象执行完毕,可以对allOf方法返回的CompletableFuture执行join操作。

  CompletableFuture.anyOf方法接受一个CompletableFuture对象构成的数组。返回由第一个执行完毕的CompletableFuture对象的返回值构成的CompletableFuture<Object>。

  long start = System.nanoTime();

  CompletableFuture[] futures = findPriceStream("product").map(f -> f.thenAccept(s -> System.out.println(s + " done in " + ((System.nanoTime() - start)/ 1000000) +" msecs"))).toArray(size -> new CompletableFuture[size]);

  CompletableFuture.allOf(futures).join();

  System.out.println("All shops have now responsed in " + ((System.nanoTime() - start )/ 100000) + " msces");

CompletableFuture