My logo
Published on

高并发设计模式-07

7.1 线程安全的单例模式

单例模式是常见的一种设计模式,一般用于全局对象管理,比如XML读写实例、系统配置实例、任务调度实例、数据库连接池实例等。

7.1.1 饿汉式单例

public class Singleton1 {

    // 私有构造区
    private Singleton1() {
    }

    // 静态成员
    private static final Singleton1 single = new Singleton1();

    public static Singleton1 getInstance() {
        return single;
    }
}

饿汉单例模式的优点是足够简单、安全。**其缺点是:单例对象在类被加载时,实例就直接被初始化了。**很多时候,在类被加载时并不需要进行单例初始化,所以需要对单例的初始化予以延迟,一直到实例使用的时候初始化。

7.1.2 懒汉式单例

/**
 * 简单的懒汉单例模式
 *
 * @author: wangyj
 * @create: 2022-04-21
 * @version: 1.0.0
 **/
public class ASingleton {

    // 静态成员
    static ASingleton instance;

    // 私有构造区
    private ASingleton() {
    }

    // 获取单例的方法
    static ASingleton getInstance() {
        if (instance == null) {
            instance = new ASingleton();
        }
        return instance;
    }
}

以上懒汉单例模式的实现大家都很熟悉,估计也编写过类似的代码。以上参考实现在单线程场景中是合理的、安全的。在第一次被调用时,getInstance()方法会新建一个ASingleton实例,但之后访问时返回的是第一次新建的ASingleton实例。
多线程并发访问getInstance()方法时,问题就出来了:不同的线程有可能同时进入代码①处的条件判断,多次执行代码②,从而新建多个ASingleton对象。
假设Thread A、B两个线程并发通过getInstance()方法获取ASingleton的单例,可能出现一种执行次序,具体如表:

06-thread-37

通过表8-1可以看到,instance被实例化了两次,违背了单例模式的初衷。也就是说,以上单例模式的实现在并发执行场景存在着单例被多次创建的问题。

使用内置锁保护懒汉式单例

如何确保单例只创建一次,可以使用synchronized内置锁进行单例获取同步,确保同时只能有一个线程进入临界区执行。

/**
 * 简单的懒汉单例模式
 * 使用内置锁 synchronized 进行单例获取同步
 * @author: wangyj
 * @create: 2022-04-21
 * @version: 1.0.0
 **/
public class BSingleton {

    // 静态成员
    static BSingleton instance;

    // 私有构造区
    private BSingleton() {
    }

    // 获取单例的方法
    static synchronized BSingleton getInstance() {
        if (instance == null) {
            instance = new BSingleton();
        }
        return instance;
    }
}

getInstance()方法加synchronized关键字之后,可以保证在并发执行时不出错。问题是:每次执行getInstance()方法都要用到同步,在争用激烈的场景下,内置锁会升级为重量级锁,开销大、性能差,所以不推荐高并发线程使用这种方式的单例模式。

7.1.3 双重检查锁单例模式

实际上,单例模式的加锁操作只有单例在第一次创建的时候才需要用到,之后的单例获取操作都没必要再加锁。所以,可以先判断单例对象是否已经被初始化,如果没有,加锁后再初始化,这种模式被叫作双重检查锁(Double Checked Locking)单例模式。示例代码如下:

/**
 * 简单的懒汉单例模式
 * 使用内置锁 synchronized 进行单例获取同步
 *
 * @author: wangyj
 * @create: 2022-04-21
 * @version: 1.0.0
 **/
public class ESingleton {

    // 静态成员
    static ESingleton instance;

    // 私有构造区
    private ESingleton() {
    }

    // 获取单例的方法
    static synchronized ESingleton getInstance() {
        if (instance == null) { // 1。检查
            synchronized (ESingleton.class) {    // 加锁
                if (instance == null) {      // 2。检查
                    instance = new ESingleton();
                }
            }

        }
        return instance;
    }
}

双重检查锁单例模式主要包括以下三步:
(1)检查单例对象是否被初始化,如果已被初始化,就立即返回单例对象。这是第一次检查,对应示例代码中的检查①,此次检查不需要使用锁进行线程同步,用于提高获取单例对象的性能。
(2)如果单例没有被初始化,就试图进入临界区进行初始化操作,此时才去获取锁。
(3)进入临界区之后,再次检查单例对象是否已经被初始化,如果还没被初始化,就初始化一个实例。这是第二次检查,对应代码中的检查②,此次检查在临界区内进行。
**为什么在临界区内还需要执行一次检查呢?**答案是:在多个线程竞争的场景下,可能同时不止一个线程通过了第一次检查(检查①),此时第一个通过“检查①”的线程将首先进入临界区,而其他通过“检查①”的线程将被阻塞,在第一个线程实例化单例对象释放锁之后,其他线程可能获取到锁进入临界区,实际上单例已经被初始化了,所以哪怕进入了临界区,其他线程并没有办法通过“检查②”的条件判断,无法执行重复的初始化。
双重检查不仅避免了单例对象在多线程场景中的反复初始化,而且除了初始化的时候需要现加锁外,后续的所有调用都不需要加锁而直接返回单例,从而提升了获取单例时的性能。

7.1.4 使用双重检查锁+volatile

表面上,使用双重检查锁机制的单例模式一切看上去都很完美,其实并不是这样的。那么问题出现在哪里呢?下面这行代码实际大有玄机:

//初始化单例
instance = new Singleton();

这行初始化单例代码转换成汇编指令(具有原子性的指令)后,大致会细分成三个:
(1)分配一块内存M。
(2)在内存M上初始化Singleton对象。
(3)M的地址赋值给instance变量。
编译器、CPU都可能对没有内存屏障、数据依赖关系的操作进行重排序,上述的三个指令优化后可能就变成了这样:
(1)分配一块内存M。
(2)将M的地址赋值给instance变量。
(3)在内存M上初始化Singleton对象。
指令重排之后,获取单例可能导致问题的发生,这里假设两个线程以下面的次序执行:
(1)线程A先执行getInstance()方法,当执行到分配一块内存并将地址赋值给M后,恰好发生了线程切换。此时,线程A还没来得及将M指向的内存初始化。
(2)线程B刚进入getInstance()方法,判断if语句instance是否为空,此时的instance不为空,线程B直接获取到了未初始化的instance变量。
由于线程B得到的是一个未初始化完全的对象,因此访问instance成员变量的时候可能发生异常。如何确保线程B获取的是一个完成初始化的单例呢?可以通过volatile禁止指令重排。双重检查锁+volatile相结合的单例模式实现大致的代码如下

/**
 * 双重检查锁 + volatile相结合的单例模式的实现
 * @author: wangyj
 * @create: 2022-04-21
 * @version: 1.0.0
 **/
public class FSingleton {

    // 静态成员
    // 保持单例的静态成员具有内存可见性
    static volatile FSingleton instance;

    // 私有构造区
    private FSingleton() {
    }

    // 获取单例的方法
    static synchronized FSingleton getInstance() {
        if (instance == null) { // 1。检查
            synchronized (FSingleton.class) {    // 加锁
                if (instance == null) {      // 2。检查
                    instance = new FSingleton();
                }
            }

        }
        return instance;
    }
}

7.1.5 使用静态内部类实现懒汉式单例模式

虽然通过双重检查锁+volatile相结合的方式能实现高性能、线程安全的单例模式,但是该实现的底层原理比较复杂,写法烦琐。另一种易于理解、编程简单的单例模式的实现为使用静态内部类实例懒汉式单例模式,参考代码如下:

/**
 * 使用静态内部类实现懒汉式单例模式
 *
 * @author: wangyj
 * @create: 2022-04-21
 * @version: 1.0.0
 **/
public class Singleton {

    // 静态内部类
    private static class LazyHolder {
        // 通过final 保障初始化时的线程安全
        public static final Singleton INSTANCE = new Singleton();
        ;
    }

    // 私有构造区
    private Singleton() {
    }

    //获取单例的方法
    public static final Singleton getInstance() {
        // 返回内部类的静态,最终成员
        return LazyHolder.INSTANCE;
    }
}

使用静态内部类实现懒汉式单例模式只有在getInstance()被调用时才去加载内部类并且初始化单例,该方式既解决了线程安全问题,又解决了写法烦琐问题。

7.2 Master-Worker模式

Master-Worker模式是一种常见的高并发模式,它的核心思想是任务的调度和执行分离,调度任务的角色为Master,执行任务的角色为Worker,Master负责接收、分配任务和合并(Merge)任务结果,Worker负责执行任务。Master-Worker模式是一种归并类型的模式。
举一个例子,在TCP服务端的请求处理过程中,大量的客户端连接相当于大量的任务,Master需要将这些任务存储在一个任务队列中,然后分发给各个Worker,每个Worker是一个工作线程,负责完成连接的传输处理。
Master-Worker模式的整体结构如图:

06-thread-37

7.2.1 Master-Worker模式的参考实现

假设一个场景,需要执行N个任务,将这些任务的结果进行累加求和,如果任务太多,就可以采用Master-Worker模式来实现。Master持有workerCount个Worker,并且负责接收任务,然后分发给Worker,最后在回调函数中对Worker的结果进行归并求和。
Master的参考代码:

/**
 * @author: wangyj
 * @create: 2022-04-21
 * @version: 1.0.0
 **/
public class Master<T extends Task, R> {
    // 所有work的集合
    private HashMap<String, Worker<T, R>> workers = new HashMap<>();

    // 任务的集合
    private LinkedBlockingQueue<T> taskQueue = new LinkedBlockingQueue<>();

    // 任务处理结果集
    protected Map<String, R> resultMap = new ConcurrentHashMap<>();

    // Master 的任务调度线程
    private Thread thread = null;

    // 保持最终的和
    private AtomicInteger sum = new AtomicInteger(0);

    public Master(int workerCount) {
        // 每个worker 对象都需要持有queue的引用,用于领任务与提交任务
        for (int i = 0; i < workerCount; i++) {
            Worker<T, R> worker = new Worker<>();
            workers.put("子节点:" + i, worker);
        }
        thread = new Thread(() -> this.execute());
        thread.start();
    }

    // 提交任务
    public void submit(T task) {
        taskQueue.add(task);
    }

    // 启动所有的子任务
    public void execute() {
        for (; ; ) {
            // 从任务队列中获取任务,然后worker节点轮询,轮流分配任务
            for (Map.Entry<String, Worker<T, R>> entry : workers.entrySet()) {
                T task = null;
                try {
                    task = this.taskQueue.take();
                    Worker worker = entry.getValue();
                    worker.submit(task, this::resultCallBack);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // 结果处理的回调函数
    private void resultCallBack(Object o) {
        Task<R> task = (Task<R>) o;
        String taskName = "Worker:" + task.getWorkId() + "-" + "Task:" + task.getId();
        //        Print.tco(taskName + ":" + task.getResult());
        R result = task.getResult();
        resultMap.put(taskName, result);

        sum.getAndAdd((Integer) result);
    }

    // 获取最终的结果
    public void printResult() {
        Print.tco("----------sum is :" + sum.get());
        for (Map.Entry<String, R> entry : resultMap.entrySet()) {
            String taskName = entry.getKey();
            Print.fo(taskName + ":" + entry.getValue());
        }
    }
}

Worker的参考代码:

/**
 * @author: wangyj
 * @create: 2022-04-21
 * @version: 1.0.0
 **/
public class Worker<T extends Task, R> {

    // 接收任务的阻塞队列
    private LinkedBlockingQueue<T> taskQueue = new LinkedBlockingQueue<>();
    // worker 的编号
    static AtomicInteger index = new AtomicInteger(1);
    private int workerId;
    // 执行任务的线程
    private Thread thread = null;

    public Worker() {
        this.workerId = index.getAndIncrement();
        thread = new Thread(() -> this.run());
        thread.start();
    }

    /**
     * 轮询执行任务
     */
    public void run() {
        // 轮询启动所有的子任务
        for (; ; ) {
            try {
                // 从阻塞队列中提取任务
                T task = this.taskQueue.take();
                task.setWorkId(workerId);
                task.execute();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    // 接收任务到异步队列
    public void submit(T task, Consumer<R> action) {
        task.resultAction = action;
        try {
            this.taskQueue.put(task);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Worker接收Master分配的任务,同样也通过阻塞队列对局部任务进行缓存。Worker所拥有的线程作为局部任务的阻塞队列的消费者,不断从阻塞队列获取任务并执行,执行完成后回调Master传递过来的回调函数。
异步任务类:

/**
 * 异步任务类在执行子类任务的doExecute()方法之后,回调一下Master传递过来的回调函数,将执行完成后的任务进行回填。
 *
 * @author: wangyj
 * @create: 2022-04-21
 * @version: 1.0.0
 **/
@Data
public class Task<R> {

    static AtomicInteger index = new AtomicInteger(1);
    // 任务的回调函数
    public Consumer<Task<R>> resultAction;
    // 任务的id
    private int id;

    // worker ID
    private int workId;

    // 计算结果
    R result = null;

    public Task() {
        this.id = index.getAndIncrement();
    }

    public void execute() {
        this.result = this.doExecute();
        // 执行回调函数
        resultAction.accept(this);
    }

    // 由子类实现
    protected R doExecute() {
        return null;
    }
}

测试用例:

/**
 * @author: wangyj
 * @create: 2022-04-21
 * @version: 1.0.0
 **/
public class MasterWorkerTest {

    // 简单任务
    static class SimpleTask extends Task<Integer> {

        @Override
        protected Integer doExecute() {
            Print.tcfo("task " + getId() + " is done ");
            return getId();
        }
    }

    public static void main(String[] args) {
        //创建Master ,包含四个worker,并启动master的执行线程
        Master<SimpleTask, Integer> master = new Master<>(4);

        //定期向master提交任务
        ThreadUtil.scheduleAtFixedRate(() -> master.submit(
                new SimpleTask()),
                2, TimeUnit.SECONDS);

        //定期从master提取结果
        ThreadUtil.scheduleAtFixedRate(
                () -> master.printResult(),
                5, TimeUnit.SECONDS);
    }
}

7.2.2 Netty中Master-Worker模式的实现

Netty服务器程序中需要设置两个EventLoopGroup轮询组,一个组负责新连接的监听和接收,另一个组负责IO传输事件的轮询与分发,两个轮询组的职责具体如下:
(1)负责新连接的监听和接收的EventLoopGroup轮询组中的反应器完成查询通道的新连接IO事件查询,这些反应器有点像负责招工的包工头,因此该轮询组可以形象地称为“包工头”(Boss)轮询组。
(2)另一个轮询组中的反应器完成查询所有子通道的IO事件,并且执行对应的Handler处理程序完成IO处理,例如数据的输入和输出(有点像搬砖),这个轮询组可以形象地称为“工人”(Worker)轮询组。
Netty中的Reactor模式如图8-3所示。
Netty是基于Reactor模式的具体实现,体现了Master-Worker模式的思想。Netty的EventLoop(Reactor角色)可以对应到Master-Worker模式的Worker角色,而Netty的EventLoopGroup轮询组则可以对应到Master-Worker模式的Master角色。

06-thread-37

7.2.3 Nginx中Master-Worker模式的实现

Nginx的Master进程主要负责调度Worker进程,比如加载配置、启动工作进程、接收来自外界的信号、向各Worker进程发送信号、监控Worker进程的运行状态等。Master进程负责创建监听套接口,交由Worker进程进行连接监听。Worker进程主要用来处理网络事件,当一个Worker进程在接收一条连接通道之后,就开始读取请求、解析请求、处理请求,处理完成产生的数据后,再返回给客户端,最后断开连接通道。
Nginx的架构非常直观地体现了Master-Worker模式的思想。Nginx的Master进程可以对应到Master-Worker模式的Master角色,Nginx的Worker进程可以对应到Master-Worker模式的Worker角色。

06-thread-37

7.3 ForkJoin模式

“分而治之”是一种思想,所谓“分而治之”,就是把一个复杂的算法问题按一定的“分解”方法分为规模较小的若干部分,然后逐个解决,分别找出各部分的解,最后把各部分的解组成整个问题的解。“分而治之”思想在软件体系结构设计、模块化设计、基础算法中得到了非常广泛的应用。许多基础算法都运用了“分而治之”的思想,比如二分查找、快速排序等。
Master-Worker模式是“分而治之”思想的一种应用,本节所介绍的ForkJoin模式则是“分而治之”思想的另一种应用。与Master-Worker模式不同,ForkJoin模式没有Master角色,其所有的角色都是Worker,ForkJoin模式中的Worker将大的任务分割成小的任务,一直到任务的规模足够小,可以使用很简单、直接的方式来完成。

7.3.1 ForkJoin模式的原理

ForkJoin模式先把一个大任务分解成许多个独立的子任务,然后开启多个线程并行去处理这些子任务。有可能子任务还是很大而需要进一步分解,最终得到足够小的任务。ForkJoin模式的任务分解和执行过程大致如图:

06-thread-37

7.3.2 ForkJoin框架

JUC包提供了一套ForkJoin框架的实现,具体以ForkJoinPool线程池的形式提供,并且该线程池在Java 8的Lambda并行流框架中充当着底层框架的角色。JUC包的ForkJoin框架包含如下组件:
(1)ForkJoinPool:执行任务的线程池,继承了AbstractExecutorService类。
(2)ForkJoinWorkerThread:执行任务的工作线程(ForkJoinPool线程池中的线程)。每个线程都维护着一个内部队列,用于存放“内部任务”该类继承了Thread类。
(3)ForkJoinTask:用于ForkJoinPool的任务抽象类,实现了Future接口。
(4)RecursiveTask:带返回结果的递归执行任务,是ForkJoinTask的子类,在子任务带返回结果时使用。
(5)RecursiveAction:不返回结果的递归执行任务,是ForkJoinTask的子类,在子任务不带返回结果时使用。
因为ForkJoinTask比较复杂,并且其抽象方法比较多,故在日常使用时一般不会直接继承ForkJoinTask来实现自定义的任务类,而是通过继承ForkJoinTask两个子类RecursiveTask或者RecursiveAction之一去实现自定义任务类,自定义任务类需要实现这些子类的compute()方法,该方法的执行流程一般如下:

06-thread-37

7.3.3 ForkJoin框架使用实战

假设需要计算0~100的累加求和,可以使用ForkJoin框架完成。首先需要设计一个可以递归执行的异步任务子类。
1.可递归执行的异步任务类AccumulateTask

public class AccumulateTask extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 10;
    //累加的起始编号
    private int start;
    //累加的结束编号
    private int end;

    public AccumulateTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        //判断任务的规模: 规模小则可以直接计算
        boolean canCompute = (end - start) <= THRESHOLD;
        //任务已经足够小,则可以直接计算
        if (canCompute) {
            //直接计算并返回结果,Recursive结束
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            Print.tco("执行任务,计算" + start + "到" + end + "的和,结果是:" + sum);
        } else {
            //任务过大,需要切割,Recursive 递归计算
            Print.tco("切割任务:将" + start + "到" + end + "的和一分为二");
            int middle = (start + end) / 2;
            //切割成两个子任务
            AccumulateTask lTask = new AccumulateTask(start, middle);
            AccumulateTask rTask = new AccumulateTask(middle + 1, end);
            // 依次调用每个子任务的fork方法执行子任务
            lTask.fork();
            rTask.fork();
            //等待子任务的完成,依次调用每个子任务的join方法合并执行结果
            Integer leftResult = lTask.join();
            Integer rightResult = rTask.join();
            //合并子任务执行结果
            sum = leftResult + rightResult;
        }
        return sum;
    }
}

2.使用ForkJoinPool调度AccumulateTask()

public class ForkJoinTest {

    @Test
    public void testAccumulateTask() throws ExecutionException, InterruptedException, TimeoutException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //创建一个累加任务,计算 由1加到10
        AccumulateTask countTask = new AccumulateTask(1, 100);
        ForkJoinTask<Integer> future = forkJoinPool.submit(countTask);
        Integer sum = future.get(1, TimeUnit.SECONDS);
        Print.tcfo("最终的计算结果:" + sum);
        //预期的结果为5050
        Assert.assertTrue(sum == 5050);
    }
}

7.3.4 ForkJoin框架的核心API

ForkJoin框架的核心是ForkJoinPool线程池。该线程池使用一个无锁的栈来管理空闲线程,如果一个工作线程暂时取不到可用的任务,则可能被挂起,而挂起的线程将被压入由ForkJoinPool维护的栈中,待有新任务到来时,再从栈中唤醒这些线程。
1.ForkJoinPool的构造器

06-thread-37

对以上构造器的4个参数具体介绍如下:
(1)parallelism:可并行级别
_ForkJoin框架将依据parallelism设定的级别决定框架内并行执行的线程数量。并行的每一个任务都会有一个线程进行处理,但parallelism属性并不是ForkJoin框架中最大的线程数量,该属性和ThreadPoolExecutor线程池中的corePoolSize、maximumPoolSize属性有区别,因为ForkJoinPool的结构和工作方式与ThreadPoolExecutor完全不一样。_ForkJoin框架中可存在的线程数量和parallelism参数值并不是绝对关联的。
(2)factory:线程创建工厂
当ForkJoin框架创建一个新的线程时,同样会用到线程创建工厂。只不过这个线程工厂不再需要实现ThreadFactory接口,而是需要实现ForkJoinWorkerThreadFactory接口。后者是一个函数式接口,只需要实现一个名叫newThread()的方法。在ForkJoin框架中有一个默认的ForkJoinWorkerThreadFactory接口实现DefaultForkJoinWorkerThreadFactory。
(3)handler:异常捕获处理程序
当执行的任务中出现异常,并从任务中被抛出时,就会被handler捕获。
(4)asyncMode:异步模式
_asyncMode参数表示任务是否为异步模式,其默认值为false。如果asyncMode为true,就表示子任务的执行遵循FIFO(先进先出)顺序,并且子任务不能被合并;如果asyncMode为false,就表示子任务的执行遵循LIFO(后进先出)顺序,并且子任务可以被合并。
虽然从字面意思来看asyncMode是指异步模式,它并不是指ForkJoin框架的调度模式采用是同步模式还是异步模式工作,仅仅指任务的调度方式。_ForkJoin框架中为每一个独立工作的线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。asyncMode模式的主要意思指的是待执行任务可以使用FIFO(先进先出)的工作模式,也可以使用LIFO(后进先出)的工作模式,工作模式为FIFO(先进先出)的任务适用于工作线程只负责运行异步事件,不需要合并结果的异步任务。
2.向ForkJoinPool线程池提交任务的方式
(1)外部任务(External/Submissions Task)提交
向ForkJoinPool提交外部任务有三种方式:方式一是调用invoke()方法,该方法提交任务后线程会等待,等到任务计算完毕返回结果;方式二是调用execute()方法提交一个任务来异步执行,无返回结果;方式三是调用submit()方法提交一个任务,并且会返回一个ForkJoinTask实例,之后适当的时候可通过ForkJoinTask实例获取执行结果。
(2)子任务(Worker Task)提交
向ForkJoinPool提交子任务的方法相对比较简单,由任务实例的fork()方法完成。当任务被分割之后,内部会调用ForkJoinPool.WorkQueue.push()方法直接把任务放到内部队列中等待被执行。

7.3.4 ForkJoin 工作窃取算法

ForkJoinPool线程池的任务分为“外部任务”和“内部任务”,两种任务的存放位置不同:
(1)外部任务存放在ForkJoinPool的全局队列中。
(2)子任务会作为“内部任务”放到内部队列中,ForkJoinPool池中的每个线程都维护着一个内部队列,用于存放这些“内部任务”。
由于ForkJoinPool线程池通常有多个工作线程,与之相对应的就会有多个任务队列,这就会出现任务分配不均衡的问题:有的队列任务多,忙得不停,有的队列没有任务,一直空闲。那么有没有一种机制帮忙将任务从繁忙的线程分摊给空闲的线程呢?答案是使用工作窃取算法。
**工作窃取算法的核心思想是:**工作线程自己的活干完了之后,会去看看别人有没有没干完的活,如果有就拿过来帮忙干。工作窃取算法的主要逻辑:每个线程拥有一个双端队列(本地队列),用于存放需要执行的任务,当自己的队列没有任务时,可以从其他线程的任务队列中获得一个任务继续执行,如图:

06-thread-37

在实际进行任务窃取操作的时候,操作线程会进行其他线程的任务队列的扫描和任务的出队尝试。为什么说是尝试?因为完全有可能操作失败,主要原因是并行执行肯定涉及线程安全的问题,假如在窃取过程中该任务已经开始执行,那么任务的窃取操作就会失败。
如何尽量避免在任务窃取中发生的线程安全问题呢?一种简单的优化方法是:在线程自己的本地队列采取LIFO(后进先出)策略,窃取其他任务队列的任务时采用FIFO(先进先出)策略。简单来说,获取自己队列的任务时从头开始,窃取其他队列的任务时从尾开始。由于窃取的动作十分快速,会大量降低这种冲突,也是一种优化方式,如图:

06-thread-37

7.3.5 ForkJoin 框架原理总结

ForkJoin框架的核心原理大致如下:
(1)ForkJoin框架的线程池ForkJoinPool的任务分为“外部任务”和“内部任务”。
(2)“外部任务”放在ForkJoinPool的全局队列中。
(3)ForkJoinPool池中的每个线程都维护着一个任务队列,用于存放“内部任务”,线程切割任务得到的子任务会作为“内部任务”放到内部队列中。
(4)当工作线程想要拿到子任务的计算结果时,先判断子任务有没有完成,如果没有完成,再判断子任务有没有被其他线程“窃取”,如果子任务没有被窃取,就由本线程来完成;一旦子任务被窃取了,就去执行本线程“内部队列”的其他任务,或者扫描其他的任务队列并窃取任务。
(5)当工作线程完成其“内部任务”,处于空闲状态时,就会扫描其他的任务队列窃取任务,尽可能不会阻塞等待。
总之,ForkJoin线程在等待一个任务完成时,要么自己来完成这个任务,要么在其他线程窃取了这个任务的情况下,去执行其他任务,是不会阻塞等待的,从而避免资源浪费,除非所有任务队列都为空。
工作窃取算法的优点如下:
(1)线程是不会因为等待某个子任务的执行或者没有内部任务要执行而被阻塞等待、挂起的,而是会扫描所有的队列窃取任务,直到所有队列都为空时才会被挂起。
(2)ForkJoin框架为每个线程维护着一个内部任务队列以及一个全局的任务队列,而且任务队列都是双向队列,可从首尾两端来获取任务,极大地减少了竞争的可能性,提高并行的性能。
ForkJoinPool适合需要“分而治之”的场景,特别是分治之后递归调用的函数,例如快速排序、二分搜索、大整数乘法、矩阵乘法、棋盘覆盖、归并排序、线性时间选择、汉诺塔问题等。ForkJoinPool适合调度的任务为CPU密集型任务,如果任务存在I/O操作、线程同步操作、sleep()睡眠等较长时间阻塞的情况,最好配合使用ManagedBlocker进行阻塞管理。总体来说,ForkJoinPool不适合进行IO密集型、混合型的任务调度。

7.4 生产者-消费者模式

生产者-消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案,是高并发编程过程中常用的一种设计模式。
在实际的软件开发过程中,经常会碰到如下场景:某些模块负责产生数据,另一些模块负责消费数据(此处的模块可以是类、函数、线程、进程等)。产生数据的模块可以形象地称为生产者,而消费数据的模块可以称为消费者。然而,仅仅抽象出来生产者和消费者还不够,该模式还需要有一个数据缓冲区作为生产者和消费者之间的中介:生产者把数据放入缓冲区,而消费者从缓冲区取出数据。生产者-消费者模式的结构大概如图:

06-thread-37

前面已经编写了多个不同版本的生产者-消费者模式的实现,这里对该模式的实现不做赘述。

7.5 Future模式

06-thread-37

假设一次远程调用的时间为500毫秒,则一个Client异步并发对三个Server分别进行一次RPC调用的总时间只要耗费500毫秒。使用Future模式异步并发地进行RPC调用,客户端在得到一个RPC的返回结果前并不急于获取该结果,而是充分利用等待时间去执行其他的耗时操作(如其他RPC调用),这就是Future模式的核心所在。
Future模式的核心思想是异步调用,有点类似于异步的Ajax请求。当调用某个耗时方法时,可以不急于立刻获取结果,而是让被调用者立刻返回一个契约(或异步任务),并且将耗时的方法放到另外的线程中执行,后续凭契约再去获取异步执行的结果。
在具体的实现上,Future模式和异步回调模式既有区别,又有联系。Java的Future实现类并没有支持异步回调,仍然需要主动获取耗时任务的结果;而Java 8中的CompletableFuture组件实现了异步回调模式。