My logo
Published on

CompletableFuture异步回调-09

9.1 CompletableFuture的UML类关系

Future接口大家已经非常熟悉了,接下来介绍一下ompletionStage接口。CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会进入另一个阶段。一个阶段可以理解为一个子任务,每一个子任务会包装一个Java函数式接口实例,表示该子任务所要执行的操作。

06-thread-37

9.2 CompletionStage接口

每个CompletionStage子任务所包装的可以是一个Function、Consumer或者Runnable函数式接口实例。这三个常用的函数式接口的特点如下:

(1)Function
Function接口的特点是:有输入、有输出。包装了Function实例的CompletionStage子任务需要一个输入参数,并会产生一个输出结果到下一步。
(2)Runnable
Runnable接口的特点是:无输入、无输出。包装了Runnable实例的CompletionStage子任务既不需要任何输入参数,又不会产生任何输出。
(3)Consumer
Consumer接口的特点是:有输入、无输出。包装了Consumer实例的CompletionStage子任务需要一个输入参数,但不会产生任何输出。

9.3 线程池的使用

默认情况下,通过静态方法runAsync()、supplyAsync()创建的CompletableFuture任务会使用公共的ForkJoinPool线程池,默认的线程数是CPU的核数。当然,它的线程数可以通过以下JVM参数设置:

option:-Djava.util.concurrent.ForkJoinPool.common.parallelism

问题是,如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的IO操作,就会导致线程池中的所有线程都阻塞在IO操作上,造成线程饥饿,进而影响整个系统的性能。所以,强烈建议大家根据不同的业务类型创建不同的线程池,以避免互相干扰。
第1章为大家介绍了三
种线程池:IO密集型任务线程池、CPU密集型任务线程池和混合型任务线程池
,大家可以根据不同的任务类型确定线程池的类型和线程数。

9.4 异步任务的串行执行

如果两个异步任务需要串行(一个任务依赖另一个任务)执行,可以通过CompletionStage接口的thenApply()、thenAccept()、thenRun()和thenCompose()四个方法来实现。

1.thenApply()

06-thread-37

thenApply的三个重载版本有一个共同的参数fn,该参数表示要串行执行的第二个异步任务,它的类型为Function。fn的类型声明涉及两个泛型参数,具体如下:
·泛型参数 T:上一个任务所返回结果的类型。
·泛型参数 U:当前任务的返回值类型。

示例,调用thenApply分两步计算(10+10)*2,代码如下:

@Test
public void thenApplyDemo() throws Exception {
    CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
        @Override
        public Long get() {
            long firstStep = 10L + 10L;
            Print.tco("firstStep outcome is " + firstStep);
            return firstStep;
        }
    }).thenApplyAsync(new Function<Long, Long>() {
        @Override
        public Long apply(Long firstStepOutCome) {
            long secondStep = firstStepOutCome * 2;
            Print.tco("secondStep outcome is " + secondStep);
            return secondStep;
        }
    });
    long result = future.get();
    Print.tco(" future is " + future);
    Print.tco(" outcome is " + result);
}
}

Function<T,R>接口既能接收参数又支持返回值,所以thenApply可以将前一个任务的结果通过FunctionR apply(T t)方法传递到第二个任务,并且能输出第二个任务的执行结果。

2.thenRun() 方法

thenRun()方法与thenApply()方法不同的是,不关心任务的处理结果。只要前一个任务执行完成,就开始执行后一个串行任务。

06-thread-37

从方法的声明可以看出,thenRun()方法同thenApply()方法类似,不同的是前一个任务处理完成后,thenRun()并不会把计算的结果传给后一个任务,而且后一个任务也没有结果输出。
thenRun系列方法中的action参数是Runnable类型的,所以thenRun()既不能接收参数又不支持返回值。

3.thenAccept() 方法

06-thread-37

Consumer接口的accept()方法可以接收一个参数,但是不支持返回值,所以thenAccept()可以将前一个任务的结果及该阶段性的结果通过void accept(T t)方法传递到下一个任务。但是Consumer接口的accept()方法没有返回值,所以thenAccept()方法也不能提供第二个任务的执行结果。

4.thenCompose()方法

thenCompose()方法在功能上与thenApply()、thenAccept()、thenRun()一样,可以对两个任务进行串行的调度操作,第一个任务操作完成时,将它的结果作为参数传递给第二个任务。

06-thread-37

thenCompose()方法要求第二个任务的返回值是一个CompletionStage异步实例。因此,可以调用CompletableFuture.supplyAsync()方法将第二个任务所要调用的普通异步方法包装成一个CompletionStage异步实例。 作为演示,调用thenCompose分两步计算(10+10)*2,代码如下:

    @Test
    public void thenComposeDemo() throws Exception {
        CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
            @Override
            public Long get() {
                long firstStep = 10L + 10L;
                Print.tco("firstStep outcome is " + firstStep);

                return firstStep;
            }
        }).thenCompose(new Function<Long, CompletionStage<Long>>() {
            @Override
            public CompletionStage<Long> apply(Long firstStepOutCome) {
                return CompletableFuture.supplyAsync(new Supplier<Long>() {
                    @Override
                    public Long get() {
                        long secondStep = firstStepOutCome * 2;
                        Print.tco("secondStep outcome is " + secondStep);
                        return secondStep;
                    }
                });
            }
        });
        long result = future.get();
        Print.tco(" outcome is " + result);
    }
}

这段程序的执行结果与调用thenApply()分两步计算(10+10)*2的结果是一样的。但是,thenCompose()所返回的不是第二个任务所要执行的普通异步方法Supplier.get()的直接计算结果,而是调用CompletableFuture.supplyAsync()方法将普通异步方法Supplier.get()包装成一个CompletionStage异步实例并返回。

4个任务串行方法的区别:

thenApply()、thenRun()、thenAccept()这三个方法的不同之处主要在于其核心参数fn、action、consumer的类型不同,分别为Function<T,R>、Runnable、Consumer<? super T>类型。
但是,thenCompose()方法与thenApply()方法有本质的不同:
(1)thenCompose()的返回值是一个新的CompletionStage实例,可以持续用来进行下一轮CompletionStage任务的调度。
具体来说,thenCompose()返回的是包装了普通异步方法的CompletionStage任务实例,通过该实例还可以进行下一轮CompletionStage任务的调度和执行,比如可以持续进行CompletionStage链式(或者流式)调用。
(2)thenApply()的返回值则简单多了,直接就是第二个任务的普通异步方法的执行结果,它的返回类型与第二步执行的普通异步方法的返回类型相同,通过thenApply()所返回的值不能进行下一轮CompletionStage链式(或者流式)调用。

9.5 异步任务的合并执行

如果某个任务同时依赖另外两个异步任务的执行结果,就需要对另外两个异步任务进行合并。以泡茶喝为例,“泡茶喝”任务需要对“烧水”任务与“清洗”任务进行合并。
对两个异步任务的合并可以通过CompletionStage接口的thenCombine()、runAfterBoth()、thenAcceptBoth()三个方法来实现。这三个方法的不同之处主要在于其核心参数fn、action、consumer的类型不同,分别为Function<T,R>、Runnable、Consumer<? super T>类型。

1.thenCombine()方法

06-thread-37

thenCombine()方法的调用者为第一步的CompletionStage实例,该方法的第一个参数为第二步的CompletionStage实例,该方法的返回值为第三步的CompletionStage实例。在逻辑上,thenCombine()方法的功能是将第一步、第二步的结果合并到第三步上。
thenCombine系列方法有两个核心参数:
·other参数:表示待合并的第二步任务的CompletionStage实例。
·fn参数:表示第一个任务和第二个任务执行完成后,第三步需要执行的逻辑。
fn参数的类型为BiFunction<? super T,? super U,? extends V>,该类型的声明涉及三个泛型参数,具体如下:
·泛型参数 T:表示第一个任务所返回结果的类型。
·泛型参数 U:表示第二个任务所返回结果的类型。
·泛型参数 V:表示第三个任务所返回结果的类型。

作为示例,接下来调用thenCombine分三步计算(10+10)*(10+10),代码如下:

    @Test
    public void thenCombineDemo() throws Exception {
        CompletableFuture<Integer> future1 =
                CompletableFuture.supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        Integer firstStep = 10 + 10;
                        Print.tco("firstStep outcome is " + firstStep);
                        return firstStep;
                    }
                });
        CompletableFuture<Integer> future2 =
                CompletableFuture.supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        Integer secondStep = 10 + 10;
                        Print.tco("secondStep outcome is " + secondStep);
                        return secondStep;
                    }
                });

        CompletableFuture<Integer> future3 = future1.thenCombine(future2,
                new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer step1OutCome, Integer step2OutCome) {
                        return step1OutCome * step2OutCome;
                    }
                });
        Integer result = future3.get();
        Print.tco(" outcome is " + result);
    }

2.runAfterBoth()方法

runAfterBoth()方法跟thenCombine()方法不一样的是,runAfterBoth()方法不关心每一步任务的输入参数和处理结果。runAfterBoth()方法有三个重载版本,声明如下:

06-thread-37

在逻辑上,第一步任务和第二步任务是并行执行的,thenCombine()方法的功能是将第一步、第二步的结果合并到第三步任务上。
与thenCombine系列方法不同,runAfterBoth系列方法的第二个参数action为Runnable类型,表示它的第一步任务、第二步任务、第三步任务既没有输入值,又没有输出值。

3.thenAcceptBoth()方法

thenAcceptBoth()方法对runAfterBoth()方法和thenCombine()方法的特点进行了折中,调用该方法,第三个任务可以接收其合并过来的第一个任务、第二个任务的处理结果,但是第三个任务(合并任务)却不能返回结果。
thenAcceptBoth()方法有三个重载版本,三个版本的声明如下:

06-thread-37

4.allOf()等待所有的任务结束

CompletionStage接口的allOf()会等待所有的任务结束,以合并所有的任务。thenCombine()只能合并两个任务,如果需要合并多个异步任务,那么可以调用allOf()。

    @Test
    public void allOfDemo() throws Exception
    {
        CompletableFuture<Void> future1 =
                CompletableFuture.runAsync(() -> Print.tco("模拟异步任务1"));

        CompletableFuture<Void> future2 =
                CompletableFuture.runAsync(() -> Print.tco("模拟异步任务2"));
        CompletableFuture<Void> future3 =
                CompletableFuture.runAsync(() -> Print.tco("模拟异步任务3"));
        CompletableFuture<Void> future4 =
                CompletableFuture.runAsync(() -> Print.tco("模拟异步任务4"));

        CompletableFuture<Void> all =
                CompletableFuture.allOf(future1, future2, future3, future4);
        all.join();
    }

9.6 异步任务的选择执行

CompletableFuture对异步任务的选择执行不是按照某种条件进行选择的,而是按照执行速度进行选择的:前面两个并行任务,谁的结果返回速度快,谁的结果将作为第三步任务的输入。
对两个异步任务的选择可以通过CompletionStage接口的applyToEither()、runAfterEither()和acceptEither()三个方法来实现。这三个方法的不同之处在于它的核心参数fn、action、consumer的类型不同,分别为Function<T,R>、Runnable、Consumer<? super T>类型。

1.applyToEither()方法

两个CompletionStage谁返回结果的速度快,applyToEither()方法就用这个最快的CompletionStage的结果进行下一步(第三步)的回调操作。

06-thread-37

   @Test
    public void applyToEitherDemo() throws Exception {
        CompletableFuture<Integer> future1 =
                CompletableFuture.supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        Integer firstStep = 10 + 10;
                        Print.tco("firstStep outcome is " + firstStep);
                        return firstStep;
                    }
                });
        CompletableFuture<Integer> future2 =
                CompletableFuture.supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        Integer secondStep = 100 + 100;
                        Print.tco("secondStep outcome is " + secondStep);
                        return secondStep;
                    }
                });
        CompletableFuture<Integer> future3 = future1.applyToEither(future2,
                new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer eitherOutCome) {
                        return eitherOutCome;
                    }
                });
        Integer result = future3.get();
        Print.tco(" outcome is " + result);
    }

2.runAfterEither()方法

runAfterEither()方法的功能为:前面两个CompletionStage实例,任何一个完成了都会执行第三步回调操作。三个任务的回调函数都是Runnable类型的。

06-thread-37

runAfterEither()方法的调用者为第一步任务的CompletionStage实例,runAfterEither()方法的第一个参数为第二步任务的CompletionStage实例,runAfterEither()方法的返回值为第三步任务的CompletionStage实例。
调用runAfterEither()方法,只要前面两个CompletionStage实例其中一个执行完成,就开始执行第三步的CompletionStage实例。

3.acceptEither()方法

acceptEither()方法对applyToEither()方法和runAfterEither()方法的特点进行了折中,两个CompletionStage谁返回结果的速度快,acceptEither()就用那个最快的CompletionStage的结果作为下一步(第三步)的输入,但是第三步没有输出。
acceptEither()方法有三个重载版本,声明如下:

06-thread-37

9.7 CompletableFuture的综合案例

基于CompletableFuture来实现前面介绍的泡茶喝实例和RPC异步调用实例。
,在下面的程序中,我们分3个任务:任务1负责洗水壶、烧开水,任务2负责洗茶壶、洗茶杯和拿茶叶,任务3负责泡茶。其中任务3要等待任务1和任务2都完成后才能开始。

public class DrinkTea {

    private static final int SLEEP_GAP = 3;//等待3秒

    public static void main(String[] args) {
        // 任务 1:洗水壶 -> 烧开水
        CompletableFuture<Boolean> hotJob =
                CompletableFuture.supplyAsync(() ->
                {
                    Print.tcfo("洗好水壶");
                    Print.tcfo("烧开水");

                    //线程睡眠一段时间,代表烧水中
                    sleepSeconds(SLEEP_GAP);
                    Print.tcfo("水开了");
                    return true;

                });

        // 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
        CompletableFuture<Boolean> washJob =
                CompletableFuture.supplyAsync(() ->
                {
                    Print.tcfo("洗茶杯");
                    //线程睡眠一段时间,代表清洗中
                    sleepSeconds(SLEEP_GAP);
                    Print.tcfo("洗完了");

                    return true;
                });

        // 任务 3:任务 1 和任务 2 完成后执行:泡茶
        CompletableFuture<String> drinkJob =
                hotJob.thenCombine(washJob, (hotOk, washOK) ->
                {
                    if (hotOk && washOK) {
                        Print.tcfo("泡茶喝,茶喝完");
                        return "茶喝完了";
                    }
                    return "没有喝到茶";
                });

        // 等待任务 3 执行结果
        Print.tco(drinkJob.join());
    }
}

9.8 使用CompletableFuture进行多个RPC调用

参考代码如下:

public class IntegrityDemo {

    /**
     * 模拟模拟RPC调用1
     */
    public String rpc1() {
        //睡眠400ms,模拟执行耗时
        sleepMilliSeconds(600);
        Print.tcfo("模拟RPC调用:服务器 server 1");
        return "sth. from server 1";
    }

    /**
     * 模拟模拟RPC调用2
     */
    public String rpc2() {
        //睡眠400ms,模拟执行耗时
        sleepMilliSeconds(600);
        Print.tcfo("模拟RPC调用:服务器 server 2");
        return "sth. from server 2";
    }

    @Test
    public void rpcDemo() throws Exception {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() ->
        {
            return rpc1();
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> rpc2());
        CompletableFuture<String> future3 = future1.thenCombine(future2,
                (out1, out2) ->
                {
                    return out1 + " & " + out2;
                });
        String result = future3.get();
        Print.tco("客户端合并最终的结果:" + result);
    }
}

9.9 使用RxJava模拟RPC异步回调

类似的代码如下:

@Test
    public void rxJavaDemo() throws Exception {
        Observable<String> observable1 = Observable.fromCallable(() ->
        {
            return rpc1();
        }).subscribeOn(Schedulers.newThread());
        Observable<String> observable2 = Observable
                .fromCallable(() -> rpc2()).subscribeOn(Schedulers.newThread());

        Observable.merge(observable1, observable2)
                .observeOn(Schedulers.newThread())
                .toList()
                .subscribe((result) -> Print.tco("客户端合并最终的结果:" + result));

        sleepSeconds(Integer.MAX_VALUE);
    }