My logo
Published on

高并发核心模式之异步回调模式-08

在Netty源码中大量使用了异步回调技术,并且基于Java的异步回调设计了自己的一整套异步回调接口和实现。
这里从java future异步回调技术入手,然后介绍比较常用的第三方异步回调技术——谷歌的guava future相关技术,最后介绍Netty的异步回调技术

8.1 从泡茶的案例讲起

为了异步执行整个泡茶流程,分别设计三个线程:泡茶线程(MainThread,主线程)、烧水线程(HotWaterThread)和清洗线程(WashThread)。泡茶线程的工作是:启动清洗线程、启动烧水线程,等清洗、烧水的工作完成后,泡茶喝;清洗线程的工作是:洗茶壶、洗茶杯;烧水线程的工作是:洗好水壶,灌上凉水,放在火上,一直等水烧开。
阻塞模式实现泡茶实例首先从基础的多线程join合并实验入手。join操作的原理是阻塞当前的线程,直到待合并的目标线程执行完成。
Java中线程的合并流程是:假设线程A调用线程B的join()方法去合并B线程,那么线程A进入阻塞状态,直到线程B执行完成。
在泡茶的例子中,主线程通过分别调用烧水线程和清洗线程的join()方法,等待烧水线程和清洗线程执行完成,然后执行主线程自己的泡茶操作。具体的执行流程如图:

06-thread-37

调用join()实现异步泡茶喝

8.2 调用join()实现泡茶喝是一个异步阻塞版本

public class JoinDemo {

    public static final int SLEEP_GAP = 500;

    public static String getCurThreadName() {
        return Thread.currentThread().getName();
    }

    static class HotWaterThread extends Thread {
        public HotWaterThread() {
            super("** 烧水-Thread");
        }

        public void run() {
            try {
                Print.tcfo("洗好水壶");
                Print.tcfo("灌上凉水");
                Print.tcfo("放在火上");

                //线程睡眠一段时间,代表烧水中
                Thread.sleep(SLEEP_GAP);
                Print.tcfo("水开了");

            } catch (InterruptedException e) {
                Print.tcfo(" 发生异常被中断.");
            }
            Print.tcfo(" 运行结束.");
        }
    }

    static class WashThread extends Thread {
        public WashThread() {
            super("$$ 清洗-Thread");
        }

        public void run() {

            try {
                Print.tcfo("洗茶壶");
                Print.tcfo("洗茶杯");
                Print.tcfo("拿茶叶");
                //线程睡眠一段时间,代表清洗中
                Thread.sleep(SLEEP_GAP);
                Print.tcfo("洗完了");

            } catch (InterruptedException e) {
                Print.tcfo(" 发生异常被中断.");
            }
            Print.tcfo(" 运行结束.");
        }
    }

    public static void main(String[] args) {
        Thread hThread = new HotWaterThread();
        Thread wThread = new WashThread();

        hThread.start();
        wThread.start();

        try {
            // 合并烧水-线程
            hThread.join();
            // 合并清洗-线程
            wThread.join();

            Thread.currentThread().setName("主线程");
        } catch (InterruptedException e) {
            Print.tcfo(getCurThreadName() + "发生异常被中断.");
        }
        Print.tcfo(getCurThreadName() + " 运行结束.");
    }
}

join()方法详解
join()方法的应用场景如下:
A线程调用B线程的join()方法,等待B线程执行完成,在B线程没有完成前,A线程阻塞。
Join()方法有三个重载版本:
(1)void join():A线程等待B线程执行结束后,A线程重启执行。
(2)void join(long millis):A线程等待B线程执行一段时间,最长等待时间为millis毫秒。超过millis毫秒后,不论B线程是否结束,A线程重启执行。
(3)void join(long millis,int nanos):等待乙方线程执行一段时间,最长等待时间为millis毫秒加nanos纳秒。超过时间后,不论乙方是否结束,甲方线程都重启执行。
强调一下容易混淆的几点:
(1)join()是实例方法不是静态方法,需要使用线程对象去调用,如thread.join()。
(2)调用join()时,不是thread所指向的目标线程阻塞,而是当前线程阻塞。
(3)只有等到thread所指向的线程执行完成或者超时,当前线程才能启动执行。
join()有一个问题被合并线程没有返回值。比如,在烧水的实例中,如果烧水线程执行结束,main线程是没有办法知道结果的。同样,清洗线程的执行结果,main线程(泡茶线程)也是没有办法知道的。形象地说,join线程合并就像一个闷葫芦,只能发起合并线程,不能取到执行结果。
调用join()缺少很多灵活性,比如实际项目中很少自己单独创建线程,而是使用Executor,这进一步减少了join()的使用场景,所以join()的使用多数停留在Demo演示上。

8.3 使用FutureTask实现异步泡茶喝

前面的join版本泡茶示例中有一个很大的问题,就是主线程获取不到异步线程的返回值。打个比方,如果烧水线程出了问题,或者清洗线程出了问题,main线程(泡茶线程)没有办法知道。哪怕不具备泡茶条件,main线程(泡茶线程)也只能继续泡茶喝。
使用FutureTask实现异步泡茶喝,main线程可以获取烧水线程、清洗线程的执行结果,然后根据结果判断是否具备泡茶条件,如果具备泡茶条件再泡茶。
使用FutureTask实现异步泡茶喝的执行流程具体如图:

06-thread-37

1.通过FutureTask获取异步执行结果的步骤:
通过FutureTask类和Callable接口的联合使用可以创建能获取异步执行结果的线程。具体的步骤重复介绍如下:
(1)创建一个Callable接口的实现类,并实现它的call()方法,编写好异步执行的具体逻辑,并且可以有返回值。
(2)使用Callable实现类的实例构造一个FutureTask实例。
(3)使用FutureTask实例作为Thread构造器的target入参,构造新的Thread线程实例。
(4)调用Thread实例的start()方法启动新线程,启动新线程的run()方法并发执行。其内部的执行过程为:启动Thread实例的run()方法并发执行后,会执行FutureTask实例的run()方法,最终会并发执行Callable实现类的call()方法。
(5)调用FutureTask对象的get()方法阻塞性地获得并发线程的执行结果。
** 2.使用FutureTask类和Callable接口进行泡茶喝的实战,代码如下:**

public class JavaFutureDemo {

    public static final int SLEEP_GAP = 500;

    static class HotWaterJob implements Callable<Boolean> {  //①
        @Override
        public Boolean call() throws Exception {    //②
            try {
                Print.tcfo("洗好水壶");
                Print.tcfo("灌上凉水");
                Print.tcfo("放在火上");

                //线程睡眠一段时间,代表烧水中
                Thread.sleep(SLEEP_GAP);
                Print.tcfo("水开了");

            } catch (InterruptedException e) {
                Print.tcfo(" 发生异常被中断.");
                return false;
            }
            Print.tcfo(" 运行结束.");

            return true;
        }
    }

    static class WashJob implements Callable<Boolean> {
        @Override
        public Boolean call() throws Exception {
            try {
                Print.tcfo("洗茶壶");
                Print.tcfo("洗茶杯");
                Print.tcfo("拿茶叶");
                //线程睡眠一段时间,代表清洗中
                Thread.sleep(SLEEP_GAP);
                Print.tcfo("洗完了");

            } catch (InterruptedException e) {
                Print.tcfo(" 清洗工作 发生异常被中断.");
                return false;
            }
            Print.tcfo(" 清洗工作  运行结束.");
            return true;
        }
    }

    public static void drinkTea(boolean waterOk, boolean cupOk) {
        if (waterOk && cupOk) {
            Print.tcfo("泡茶喝");
        } else if (!waterOk) {
            Print.tcfo("烧水失败,没有茶喝了");
        } else if (!cupOk) {
            Print.tcfo("杯子洗不了,没有茶喝了");
        }
    }

    public static void main(String[] args) {
        Thread.currentThread().setName("主线程");
        Callable<Boolean> hJob = new HotWaterJob();	// //异步逻辑
        FutureTask<Boolean> hTask = new FutureTask<>(hJob);	//包装异步逻辑的异步任务实例
        Thread hotThread = new Thread(hTask, "** 烧水-Thread");//异步线程

        Callable<Boolean> wJob = new WashJob();//③
        FutureTask<Boolean> wTask =
                new FutureTask<>(wJob);//④
        Thread washThread = new Thread(wTask, "$$ 清洗-Thread");//⑤
        hotThread.start();
        washThread.start();

        try {

            boolean waterOk = hTask.get();
            boolean cupOk = wTask.get();
            drinkTea(waterOk, cupOk);
        } catch (InterruptedException e) {
            Print.tcfo(getCurThreadName() + "发生异常被中断.");
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        Print.tcfo(getCurThreadName() + " 运行结束.");
    }
}

FutureTask和Callable都是泛型类,泛型参数表示返回结果的类型。所以,在使用时它们两个实例的泛型参数需要保持一致。
最后,通过FutureTask实例取得异步线程的执行结果。一般来说,通过FutureTask实例的get方法可以获取线程的执行结果。
总之,FutureTask比join线程合并操作更加高明,能取得异步线程的结果。但是,也未必高明到哪里去。为什么呢?
因为通过FutureTask的get()方法获取异步结果时,主线程也会被阻塞。这一点FutureTask和join是一致的,它们都是异步阻塞模式。
异步阻塞的效率往往比较低,被阻塞的主线程不能干任何事情,唯一能干的就是傻傻等待。原生Java API除了阻塞模式的获取结果外,并没有实现非阻塞的异步结果获取方法。如果需要用到获取的异步结果,得引入一些额外的框架,接下来将会介绍谷歌的Guava框架。

8.4 Guava框架-异步回调与主动调用

在前面的泡茶喝实例中,不论主线程调用join()进行闷葫芦式线程同步,还是使用Future.get()获取异步线程的执行结果,都属于主动模式的调用。
主动调用是一种阻塞式调用,它是一种单向调用,“调用方”要等待“被调用方”执行完毕才返回。如果“被调用方”的执行时间很长,那么“调用方”线程需要阻塞很长一段时间。
如何将主动调用的方向进行反转呢?这就是异步回调。回调是一种反向的调用模式,也就是说,被调用方在执行完成后,会反向执行“调用方”所设置的钩子方法。
使用回调模式将泡茶线程和烧水(或者清洗)线程之间的“主动”关系进行反转,具体如图:

06-thread-37

实质上,在回调模式中负责执行回调方法的具体线程已经不再是调用方的线程(如示例中的泡茶线程),而是变成了异步的被调用方的线程(如烧水线程)。
Java中回调模式的标准实现类为CompletableFuture,由于该类出现的时间比较晚,因此很多著名的中间件如Guava、Netty等都提供了自己的异步回调模式API供开发者使用。开发者还可以使用RxJava响应式编程组件进行异步回调的开发
Guava是Google提供的Java扩展包,它提供了一种异步回调的解决方案。Guava中与异步回调相关的源码处于com.google.common.util.concurrent包中。包中的很多类都用于对java.util.concurrent的能力扩展和能力增强。比如,Guava的异步任务接口ListenableFuture扩展了Java的Future接口,实现了异步回调的能力。

详解FutureCallback
总体来说,Guava主要增强了Java而不是另起炉灶。为了实现异步回调方式获取异步线程的结果,Guava做了以下增强:
·引入了一个新的接口ListenableFuture,继承了Java的Future接口,使得Java的Future异步任务在Guava中能被监控和非阻塞获取异步结果。
·引入了一个新的接口FutureCallback,这是一个独立的新接口。该接口的目的是在异步任务执行完成后,根据异步结果完成不同的回调处理,并且可以处理异步结果。
FutureCallback是一个新增的接口,用来填写异步任务执行完后的监听逻辑。FutureCallback拥有两个回调方法:
·onSuccess()方法,在异步任务执行成功后被回调。调用时,异步任务的执行结果作为onSuccess方法的参数被传入。
·onFailure()方法,在异步任务执行过程中抛出异常时被回调。调用时,异步任务所抛出的异常作为onFailure方法的参数被传入。
注意,Guava的FutureCallback与Java的Callable名字相近,实质不同,存在本质的区别:
(1)Java的Callable接口代表的是异步执行的逻辑。
(2)Guava的FutureCallback接口代表的是Callable异步逻辑执行完成之后,根据成功或者异常两种情形所需要执行的善后工作。
Guava是对Java Future异步回调的增强,使用Guava异步回调也需要用到Java的Callable接口。简单地说,只有在Java的Callable任务执行结果出来后,才可能执行Guava中的FutureCallback结果回调。
Guava如何实现异步任务Callable和结果回调FutureCallback之间的监控关系呢?Guava引入了一个新接口ListenableFuture,它继承了Java的Future接口,增强了被监控的能力。

8.5 Guava 实现泡茶喝的实例

基于Guava异步回调模式的泡茶喝程序的执行流程如图:

代码如下:

public class GuavaFutureDemo {

    public static final int SLEEP_GAP = 3000;

    static class HotWaterJob implements Callable<Boolean> //①
    {
        @Override
        public Boolean call() throws Exception //②
        {
            try {
                Print.tcfo("洗好水壶");
                Print.tcfo("烧开水");

                //线程睡眠一段时间,代表烧水中
                Thread.sleep(SLEEP_GAP);
                Print.tcfo("水开了");

            } catch (InterruptedException e) {
                Print.tcfo(" 发生异常被中断.");
                return false;
            }
            Print.tcfo(" 烧水工作,运行结束.");

            return true;
        }
    }

    static class WashJob implements Callable<Boolean> {

        @Override
        public Boolean call() throws Exception {
            try {
                Print.tcfo("洗茶杯");
                //线程睡眠一段时间,代表清洗中
                Thread.sleep(SLEEP_GAP);
                Print.tcfo("洗完了");

            } catch (InterruptedException e) {
                Print.tcfo(" 清洗工作 发生异常被中断.");
                return false;
            }
            Print.tcfo(" 清洗工作  运行结束.");
            return true;
        }

        //泡茶喝的工作
        static class DrinkJob {

            boolean waterOk = false;
            boolean cupOk = false;

            public void drinkTea() {
                if (waterOk && cupOk) {
                    Print.tcfo("泡茶喝,茶喝完");
                    this.waterOk = false;
                }
            }
        }

        public static void main(String[] args) {
            Thread.currentThread().setName("泡茶喝线程");
            //新起一个线程,作为泡茶主线程
            DrinkJob drinkJob = new DrinkJob();

            // 烧水的业务逻辑
            Callable<Boolean> hotJob = new HotWaterJob();
            // 清洗的业务逻辑
            Callable<Boolean> washJob = new WashJob();
            // 创建java 线程池
            ExecutorService jPool = Executors.newFixedThreadPool(10);

            // 包装Java 线程池,构造guava 线程池
            ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);

            // 烧水的回调钩子
            FutureCallback<Boolean> hotWateHook = new FutureCallback<Boolean>() {
                @Override
                public void onSuccess(Boolean r) {
                    if (r) {
                        drinkJob.waterOk = true;
                        // 执行回调方法
                        drinkJob.drinkTea();
                    }
                }

                @Override
                public void onFailure(Throwable t) {
                    Print.tcfo("烧水失败,没有茶喝了");
                }
            };

            // 启动烧水线程
            ListenableFuture<Boolean> hotFuture = gPool.submit(hotJob);
            // 设置烧水任务的回调钩子
            Futures.addCallback(hotFuture, hotWateHook);

            //启动清洗线程
            ListenableFuture<Boolean> washFuture = gPool.submit(washJob);
            // 使用匿名实例,作为清洗之后的回调钩子
            Futures.addCallback(washFuture, new FutureCallback<Boolean>() {
                @Override
                public void onSuccess(Boolean r) {
                    if (r) {
                        drinkJob.cupOk = true;
                        //执行回调方法
                        drinkJob.drinkTea();
                    }
                }

                @Override
                public void onFailure(Throwable throwable) {
                    Print.tcfo("杯子洗不了,没有茶喝了");
                }
            });

            Print.tcfo("干点其他事情....");
            sleepSeconds(1);
            Print.tcfo("执行完成");
        }
    }
}