- Published on
CompletableFuture异步回调-09
9.1 CompletableFuture的UML类关系
Future
接口大家已经非常熟悉了,接下来介绍一下ompletionStage
接口。CompletionStage
代表异步计算过程中的某一个阶段,一个阶段完成以后可能会进入另一个阶段。一个阶段可以理解为一个子任务,每一个子任务会包装一个Java函数式接口实例,表示该子任务所要执行的操作。
9.2 CompletionStage接口
每个CompletionStage子任务所包装的可以是一个Function、Consumer或者Runnable函数式接口实例。这三个常用的函数式接口的特点如下:
(1)Function
Function接口的特点是:有输入、有输出。包装了Function实例的CompletionStage子任务需要一个输入参数,并会产生一个输出结果到下一步。
(2)Runnable
Runnable接口的特点是:无输入、无输出。包装了Runnable实例的CompletionStage子任务既不需要任何输入参数,又不会产生任何输出。
(3)Consumer
Consumer接口的特点是:有输入、无输出。包装了Consumer实例的CompletionStage子任务需要一个输入参数,但不会产生任何输出。
9.3 线程池的使用
默认情况下,通过静态方法runAsync()、supplyAsync()创建的CompletableFuture任务会使用公共的ForkJoinPool线程池,默认的线程数是CPU的核数。当然,它的线程数可以通过以下JVM参数设置:
option:-Djava.util.concurrent.ForkJoinPool.common.parallelism
问题是,如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的IO操作,就会导致线程池中的所有线程都阻塞在IO操作上,造成线程饥饿,进而影响整个系统的性能。所以,强烈建议大家根据不同的业务类型创建不同的线程池,以避免互相干扰。
第1章为大家介绍了三种线程池:IO密集型任务线程池、CPU密集型任务线程池和混合型任务线程池,大家可以根据不同的任务类型确定线程池的类型和线程数。
9.4 异步任务的串行执行
如果两个异步任务需要串行(一个任务依赖另一个任务)执行,可以通过CompletionStage接口的thenApply()、thenAccept()、thenRun()和thenCompose()四个方法来实现。
1.thenApply()
thenApply的三个重载版本有一个共同的参数fn,该参数表示要串行执行的第二个异步任务,它的类型为Function。fn的类型声明涉及两个泛型参数,具体如下:
·泛型参数 T:上一个任务所 返回结果的类型。
·泛型参数 U:当前任务的返回值类型。
示例,调用thenApply分两步计算(10+10)*2,代码如下:
@Test
public void thenApplyDemo() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
@Override
public Long get() {
long firstStep = 10L + 10L;
Print.tco("firstStep outcome is " + firstStep);
return firstStep;
}
}).thenApplyAsync(new Function<Long, Long>() {
@Override
public Long apply(Long firstStepOutCome) {
long secondStep = firstStepOutCome * 2;
Print.tco("secondStep outcome is " + secondStep);
return secondStep;
}
});
long result = future.get();
Print.tco(" future is " + future);
Print.tco(" outcome is " + result);
}
}
Function<T,R>
接口既能接收参数又支持返回值,所以thenApply
可以将前一个任务的结果通过Function
的R apply(T t)
方法传递到第二个任务,并且能输出第二个任务的执行结果。
2.thenRun() 方法
thenRun()方法与thenApply()方法不同的是,不关心任务的处理结果。只要前一个任务执行完成,就开始执行后一个串行任务。
从方法的声明可以看出,thenRun()方法同thenApply()方法类似,不同的是前一个任务处理完成后,thenRun()并不会把计算的结果传给后一个任务,而且后一个任务也没有结果输出。
thenRun系列方法中的action参数是Runnable类型的,所以thenRun()既不能接收参数又不支持返回值。
3.thenAccept() 方法
Consumer接口的accept()方法可以接收一个参数,但是不支持返回值,所以thenAccept()可以将前一个任务的结果及该阶段性的结果通过void accept(T t)方法传递到下一个任务。但是Consumer接口的accept()方法没有返回值,所以thenAccept()方法也不能提供第二个任务的执行结果。
4.thenCompose()方法
thenCompose()方法在功能上与thenApply()、thenAccept()、thenRun()一样,可以对两个任务进行串行的调度操作,第一个任务操作完成时,将它的结果作为参数传递给第二个任务。
thenCompose()方法要求第二个任务的返回值是一个CompletionStage异步实例。因此,可以调用CompletableFuture.supplyAsync()方法将第二个任务所要调用的普通异步方法包装成一个CompletionStage异步实例。 作为演示,调用thenCompose分两步计算(10+10)*2,代码如下:
@Test
public void thenComposeDemo() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
@Override
public Long get() {
long firstStep = 10L + 10L;
Print.tco("firstStep outcome is " + firstStep);
return firstStep;
}
}).thenCompose(new Function<Long, CompletionStage<Long>>() {
@Override
public CompletionStage<Long> apply(Long firstStepOutCome) {
return CompletableFuture.supplyAsync(new Supplier<Long>() {
@Override
public Long get() {
long secondStep = firstStepOutCome * 2;
Print.tco("secondStep outcome is " + secondStep);
return secondStep;
}
});
}
});
long result = future.get();
Print.tco(" outcome is " + result);
}
}
这段程序的执行结果与调用thenApply()分两步计算(10+10)*2的结果是一样的。但是,thenCompose()所返回的不是第二个任务所要执行的普通异步方法Supplier.get()的直接计算结果,而是调用CompletableFuture.supplyAsync()方法将普通异步方法Supplier.get()包装成一个CompletionStage异步实例并返回。
4个任务串行方法的区别:
thenApply()、thenRun()、thenAccept()
这三个方法的不同之处主要在于其核心参数fn、action、consumer的类型不同,分别为Function<T,R>、Runnable、Consumer<? super T>
类型。
但是,thenCompose()方法与thenApply()方法有本质的不同:
(1)thenCompose()的返回值是一个新的CompletionStage实例,可以持续用来进行下一轮CompletionStage任务的调度。
具体来说,thenCompose()返回的是包装了普通异步方法的CompletionStage任务实例,通过该实例还可以进行下一轮CompletionStage任务的调度和执行,比如可以持续进行CompletionStage链式(或者流式)调用。
(2)thenApply()的返回值则简单多了,直接就是第二个任务的普通异步方法的执行结果,它的返回类型与第二步执行的普通异步方法的返回类型相同,通过thenApply()所返回的值不能进行下一轮CompletionStage链式(或者流式)调用。
9.5 异步任务的合并执行
如果某个任务同时依赖另外两个异步任务的执行结果,就需要对另外两个异步任务进行合并。以泡茶喝为例,“泡茶喝”任务需要对“烧水”任务与“清洗”任务进行合并 。
对两个异步任务的合并可以通过CompletionStage接口的thenCombine()、runAfterBoth()、thenAcceptBoth()
三个方法来实现。这三个方法的不同之处主要在于其核心参数fn、action、consumer的类型不同,分别为Function<T,R>、Runnable、Consumer<? super T>
类型。
1.thenCombine()方法
thenCombine()方法的调用者为第一步的CompletionStage实例,该方法的第一个参数为第二步的CompletionStage实例,该方法的返回值为第三步的CompletionStage实例。在逻辑上,thenCombine()方法的功能是将第一步、第二步的结果合并到第三步上。
thenCombine系列方法有两个核心参数:
·other参数:表示待合并的第二步任务的CompletionStage实例。
·fn参数:表示第一个任务和第二个任务执行完成后,第三步需要执行的逻辑。
fn参数的类型为BiFunction<? super T,? super U,? extends V>
,该类型的声明涉及三个泛型参数,具体如下:
·泛型参数 T:表示第一个任务所返回结果的类型。
·泛型参数 U:表示第二个任务所返回结果的类型。
·泛型参数 V:表示第三个任务所返回结果的类型。
作为示例,接下来调用thenCombine分三步计算(10+10)*(10+10),代码如下:
@Test
public void thenCombineDemo() throws Exception {
CompletableFuture<Integer> future1 =
CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
Integer firstStep = 10 + 10;
Print.tco("firstStep outcome is " + firstStep);
return firstStep;
}
});
CompletableFuture<Integer> future2 =
CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
Integer secondStep = 10 + 10;
Print.tco("secondStep outcome is " + secondStep);
return secondStep;
}
});
CompletableFuture<Integer> future3 = future1.thenCombine(future2,
new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer step1OutCome, Integer step2OutCome) {
return step1OutCome * step2OutCome;
}
});
Integer result = future3.get();
Print.tco(" outcome is " + result);
}
2.runAfterBoth()方法
runAfterBoth()方法跟thenCombine()方法不一样的是,runAfterBoth()方法不关心每一步任务的输入参数和处理结果。runAfterBoth()方法有三个重载版本,声明如下:
在逻辑上,第一步任务和第二步任务是并行执行的,thenCombine()方法的功能是将第一步、第二步的结果合并到第三步任务上。
与thenCombine系列方法不同,runAfterBoth系列方法的第二个参数action为Runnable类型,表示它的第一步任务、第二步任务、第三步任务既没有输入值,又没有输出值。
3.thenAcceptBoth()方法
thenAcceptBoth()方法对runAfterBoth()方法和thenCombine()方法的特点进行了折中,调用该方法,第三个任务可以接收其合并过来的第一个任务、第二个任务的处理结果,但是第三个任务(合并任务)却不能返回结果。
thenAcceptBoth()方法有三个重载版本,三个版本的声明如下:
4.allOf()等待所有的任务结束
CompletionStage接口的allOf()会等待所有的任务结束,以合并所有的任务。thenCombine()只能合并两个任务,如果需要合并多个异步任务,那么可以调用allOf()。
@Test
public void allOfDemo() throws Exception
{
CompletableFuture<Void> future1 =
CompletableFuture.runAsync(() -> Print.tco("模拟异步任务1"));
CompletableFuture<Void> future2 =
CompletableFuture.runAsync(() -> Print.tco("模拟异步任务2"));
CompletableFuture<Void> future3 =
CompletableFuture.runAsync(() -> Print.tco("模拟异步任务3"));
CompletableFuture<Void> future4 =
CompletableFuture.runAsync(() -> Print.tco("模拟异步任务4"));
CompletableFuture<Void> all =
CompletableFuture.allOf(future1, future2, future3, future4);
all.join();
}
9.6 异步任务的选择执行
CompletableFuture对异步任务的选择执行不是按照某种条件进行选择的,而是按照执行速度进行选择的:前面两个并行任务,谁的结果返回速度快,谁的结果将作为第三步任务的输入。
对两个异步任务的选择可以通过CompletionStage接口的applyToEither()、runAfterEither()和acceptEither()三个方法来实现。这三个方法的不同之处在于它的核心参数fn、action、consumer的类型不同,分别为Function<T,R>、Runnable、Consumer<? super T>
类型。
1.applyToEither()方法
两个CompletionStage谁返回结果的速度快,applyToEither()方法就用这个最快的CompletionStage的结果进行下一步(第三步)的回调操作。
@Test
public void applyToEitherDemo() throws Exception {
CompletableFuture<Integer> future1 =
CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
Integer firstStep = 10 + 10;
Print.tco("firstStep outcome is " + firstStep);
return firstStep;
}
});
CompletableFuture<Integer> future2 =
CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
Integer secondStep = 100 + 100;
Print.tco("secondStep outcome is " + secondStep);
return secondStep;
}
});
CompletableFuture<Integer> future3 = future1.applyToEither(future2,
new Function<Integer, Integer>() {
@Override
public Integer apply(Integer eitherOutCome) {
return eitherOutCome;
}
});
Integer result = future3.get();
Print.tco(" outcome is " + result);
}
2.runAfterEither()方法
runAfterEither()方法的功能为:前面两个CompletionStage实例,任何一个完成了都会执行第三步回调操作。三个任务的回调函数都是Runnable类型的。
runAfterEither()方法的调用者为第一步任务的CompletionStage实例,runAfterEither()方法的第一个参数为第二步任务的CompletionStage实例,runAfterEither()方法的返回值为第三步任务的CompletionStage实例。
调用runAfterEither()方法,只要前面两个CompletionStage实例其中一个执行完成,就开始执行第三步的CompletionStage实例。
3.acceptEither()方法
acceptEither()方法对applyToEither()方法和runAfterEither()方法的特点进行了折中,两个CompletionStage谁返回结果的速度快,acceptEither()就用那个最快的CompletionStage的结果作为下一步(第三步)的输入,但是第三步没有输出。
acceptEither()方法有三个重载版本,声明如下:
9.7 CompletableFuture的综合案例
基于CompletableFuture来实现前面介绍的泡茶喝实例和RPC异步调用实例。
,在下面的程序中,我们分3个任务:任务1负责洗水壶、烧开水,任务2负责洗茶壶、洗茶杯和拿茶叶,任务3负责泡茶。其中任务3要等待任务1和任务2都完成后才能开始。
public class DrinkTea {
private static final int SLEEP_GAP = 3;//等待3秒
public static void main(String[] args) {
// 任务 1:洗水壶 -> 烧开水
CompletableFuture<Boolean> hotJob =
CompletableFuture.supplyAsync(() ->
{
Print.tcfo("洗好水壶");
Print.tcfo("烧开水");
//线程睡眠一段时间,代表烧水中
sleepSeconds(SLEEP_GAP);
Print.tcfo("水开了");
return true;
});
// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<Boolean> washJob =
CompletableFuture.supplyAsync(() ->
{
Print.tcfo("洗茶杯");
//线程睡眠一段时间,代表清洗中
sleepSeconds(SLEEP_GAP);
Print.tcfo("洗完了");
return true;
});
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
CompletableFuture<String> drinkJob =
hotJob.thenCombine(washJob, (hotOk, washOK) ->
{
if (hotOk && washOK) {
Print.tcfo("泡茶喝,茶喝完");
return "茶喝完了";
}
return "没有喝到茶";
});
// 等待任务 3 执行结果
Print.tco(drinkJob.join());
}
}
9.8 使用CompletableFuture进行多个RPC调用
参考代码如下:
public class IntegrityDemo {
/**
* 模拟模拟RPC调用1
*/
public String rpc1() {
//睡眠400ms,模拟执行耗时
sleepMilliSeconds(600);
Print.tcfo("模拟RPC调用:服务器 server 1");
return "sth. from server 1";
}
/**
* 模拟模拟RPC调用2
*/
public String rpc2() {
//睡眠400ms,模拟执行耗时
sleepMilliSeconds(600);
Print.tcfo("模拟RPC调用:服务器 server 2");
return "sth. from server 2";
}
@Test
public void rpcDemo() throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() ->
{
return rpc1();
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> rpc2());
CompletableFuture<String> future3 = future1.thenCombine(future2,
(out1, out2) ->
{
return out1 + " & " + out2;
});
String result = future3.get();
Print.tco("客户端合并最终的结果:" + result);
}
}
9.9 使用RxJava模拟RPC异步回调
类似的代码如下:
@Test
public void rxJavaDemo() throws Exception {
Observable<String> observable1 = Observable.fromCallable(() ->
{
return rpc1();
}).subscribeOn(Schedulers.newThread());
Observable<String> observable2 = Observable
.fromCallable(() -> rpc2()).subscribeOn(Schedulers.newThread());
Observable.merge(observable1, observable2)
.observeOn(Schedulers.newThread())
.toList()
.subscribe((result) -> Print.tco("客户端合并最终的结果:" + result));
sleepSeconds(Integer.MAX_VALUE);
}