My logo
Published on

并发编程-线程的基础-01

小知识:顺序,并行与并发

  • 顺序(sequential)用于表示多个操作"依次处理"。比如把十个操作交给一个人处理时,这个人要一个一个地按顺序来处理。
  • 并行(parallel)用于表示多个操作“同时处理”。比如十个操作分给两个人处理时,这两个人就会并行来处理。
  • 并发(concurrent)相对于顺序和并行来说比较抽象,用于表示“将一个操作分割成多个部分并且允许无序处理”。比如将十个操作分成相对独立的两类,这样便能够开始并发处理了。如果一个人来处理,这个人就是顺序处理分开的并发操作,而如果是两个人,这两个人就可以并行处理同一个操作。

多线程程序的评价标准:

  • 安全性-不损坏对象

所谓安全性(safety)就是不损坏对象。这是程序正常运行的必要条件之一。
对象损坏只是一种比喻,实际上,对象是内存上的一种虚拟事物,并不会实际损坏。对象损坏是指对象的状态和设计者的原意不一致,通常是指对象的字段的值并非预期值。
以序章1中介绍的银行账户为例,假设银行账户的可用余额变为了负数,而设计者此前并没有设想它会变为负数。这时就可以说,表示银行账户的对象"损坏"了。
如果一个类即使被多个线程同时使用,也可确保安全性,那么这个类就称为线程安全(thread-safe)类。由于类库中还存在着非线程安全的类,所以在多线程程序中使用类时一定要特别注意。例如,java.util.Vector类是线程安全的类,而java.util.ArrayList则是非线程安全的类。一般在 API文档中能够查到各个类是否是线程安全的。

  • 生存性-必要的处理能够被执行

生存性(liveness)是指无论是什么时候,必要的处理都一定能够被执行。这也是程序正常运行的必要条件之一(也有人也将liveness 翻译为"活性")。
即使对象没有损坏,也不代表程序就一定好。极端一点说,假如程序在运行过程中突然停止了,这时,由于处理已经停止,对象的状态就不会发生变化了,所以对象状态也就不会异常。这虽然符合前面讲的"安全性"条件,但无法运行的程序根本没有任何意义。无论是什么时候,必要的处理都一定能够被执行——这就是生存性。
有时候安全性和生存性会相互制约。例如,有时只重视安全性,生存性就会下降。最典型的事例就是死锁(deadlock),即多个线程互相等待对方释放锁的情形。关于死锁的详细内容将在第 1章"Single Threaded Execution模式"中讲解。

  • 可复用性-类可重复利用

可复用性(reusability)是指类能够重复利用。这虽然不是程序正常运行的必要条件,但却是提高程序质量的必要条件。
类如果能够作为组件从正常运行的软件中分割出来,那么就说明这个类有很高的可复用性。在编写多线程程序时,如果能够巧妙地将线程的互斥机制和方针隐藏到类中,那这就是一个可复用性高的程序。J2SE 5.0中引入的java.uti1.concurrent 包中就提供了便于多线程编程的可复用性高的类。

  • 性能-能快速,大批地执行处理

性能(performance)是指能快速、大批量地执行处理。这也不是程序正常运行的必要条件,但却是提高程序质量时应该考虑的条件。
影响性能的因素有好多种。下面是从 Doug Lea 的性能分类中摘录出的主要部分。
吞吐量(throughput)是指单位时间内完成的处理数量。能完成的处理越多,则表示吞吐量越大。
响应性(responsiveness)是指从发出请求到收到响应的时间。时间越短,响应性也就越好。在GUI程序中,相比于到处理"结束"时的时间,到处理"开始"时的时间更为重要。前者是指实际处理所花费的时间,而后者是到程序开始响应用户所花费的时间。相比于按下按钮后无任何反应,10秒后才提示"处理完毕"这种方式,在按下按钮时立刻提示"处理开始"这种方式的响应性更高,即便到处理结束花费的时间稍多一点也没关系。响应性好也称为等待时间(latency)短。
容量(capacity)是指可同时进行的处理数量。例如,服务器能同时处理的客户端数或文件数等。其他的诸如效率(efficiency)、可伸缩性(scalability)、降级(degradation)等,也可作为性能的评价标准。

总结:
安全性(safety)和生存性(liveness)是必须遵守的标准。既不能损坏对象,也一定要执行必要的处理。设计多线程系统时,请务必遵守安全性和生存性这两个标准。
重要的是,还要在满足这两个必要条件的基础上,考虑如何提高可复用性(reusability)和性能(performance)。
从下列评价标准来分析各个模式。
●安全性和生存性∶必要条件
● 可复用性和性能∶提高质量

1-线程的生命周期

Java中线程的生命周期分为6种状态:

thread01-1

thread01-1

public static enum State {   
    NEW,       //新建
    RUNNABLE, //可执行:包含操作系统的就绪、运行两种状态
    BLOCKED,  //阻塞
    WAITING,  //等待
    TIMED_WAITING,//限时等待
    TERMINATED; //终止
}

在Thread.State定义的6种状态中,有4种是比较常见的状态,它们是:NEW(新建)状态、RUNNABLE(可执行)状态、TERMINATED(终止)状态、TIMED_WAITING(限时等待)状态。
1.NEW状态
Java源码对NEW状态的说明是:创建成功但是没有调用start()方法启动的Thread线程实例都处于NEW状态。
当然,并不是Thread线程实例的start()方法一经调用,其状态就从NEW状态到RUNNABLE状态,此时并不意味着线程立即获取CPU时间片并且立即执行,中间需要一系列操作系统的内部操作。
2.RUNNABLE状态
JVM的幕后工作和操作系统的线程调度有关。Java中的线程管理是通过JNI本地调用的方式委托操作系统的线程管理API完成的。当Java线程的Thread实例的start()方法被调用后,操作系统中的对应线程进入的并不是运行状态,而是就绪状态,而Java线程并没有这个就绪状态。操作系统中线程的就绪状态是什么状态的呢?
JVM的线程状态与其幕后的操作系统线程状态之间的转换关系简化后如图1-11所示。

thread01-1

就绪状态和运行状态都是操作系统中的线程状态。在Java语言中,并没有细分这两种状态,而是将这两种状态合并成同一种状态——RUNNABLE状态。因此,在Thread.State枚举类中,没有定义线程的就绪状态和运行状态,只是定义了RUNNABLE状态。这就是Java线程状态和操作系统中线程状态不同的地方。
总之,NEW状态的Thread实例调用了start()方法后,线程的状态将变成RUNNABLE状态。尽管如此,线程的run()方法不一定会马上被并发执行,需要在线程获取了CPU时间片之后才真正启动并发执行。
3.TERMINATED状态
处于RUNNABLE状态的线程在run()方法执行完成之后就变成终止状态TERMINATED了。当然,如果在run()方法执行过程中发生了运行时异常而没有被捕获,run()方法将被异常终止,线程也会变成TERMINATED状态。
4.TIMED_WAITING状态
线程处于一种特殊的等待状态,准确地说,线程处于限时等待状态。能让线程处于限时等待状态的操作大致有以下几种:
(1)Thread.sleep(int n):使得当前线程进入限时等待状态,等待时间为n毫秒。
(2)Object.wait():带时限的抢占对象的monitor锁。
(3)Thread.join():带时限的线程合并。
(4)LockSupport.parkNanos():让线程等待,时间以纳秒为单位。
(5)LockSupport.parkUntil():让线程等待,时间可以灵活设置。
状态变化图:

thread01-1

1.2 线程的interrupt 操作

Java语言提供了stop()方法终止正在运行的线程,但是Java将Thread的stop()方法设置为过时,不建议大家使用。为什么呢?因为使用stop()方法是很危险的,就像突然关闭计算机电源,而不是按正常程序关机。在程序中,我们是不能随便中断一个线程的,我们无法知道这个线程正运行在什么状态,它可能持有某把锁,强行中断线程可能导致锁不能释放的问题;或者线程可能在操作数据库,强行中断线程可能导致数据不一致的问题。正是由于调用stop()方法来终止线程可能会产生不可预料的结果,因此不推荐调用stop()方法。
一个线程什么时候可以退出呢?当然只有线程自己才能知道。所以,这里介绍一下Thread的interrupt()方法,此方法本质不是用来中断一个线程,而是将线程设置为中断状态。
当我们调用线程的interrupt()方法时,它有两个作用:
(1)如果此线程处于阻塞状态(如调用了Object.wait()方法),就会立马退出阻塞,并抛出InterruptedException异常,线程就可以通过捕获InterruptedException来做一定的处理,然后让线程退出。更确切地说,如果线程被Object.wait()、Thread.join()和Thread.sleep()三种方法之一阻塞,此时调用该线程的interrupt()方法,该线程将抛出一个InterruptedException中断异常(该线程必须事先预备好处理此异常),从而提早终结被阻塞状态。
(2)如果此线程正处于运行之中,线程就不受任何影响,继续运行,仅仅是线程的中断标记被设置为true。所以,程序可以在适当的位置通过调用isInterrupted()方法来查看自己是否被中断,并执行退出操作。

1.3 线程的join操作

线程的合并是一个比较难以说清楚的概念,什么是线程的合并呢?举一个例子,假设有两个线程A和B。现在线程A在执行过程中对另一个线程B的执行有依赖,具体的依赖为:线程A需要将线程B的执行流程合并到自己的执行流程中(至少表面如此),这就是线程合并,被动方线程B可以叫作被合并线程。
调用join()方法的要点:
(1)join()方法是实例方法,需要使用被合并线程的句柄(或者指针、变量)去调用,如threadb.join()。执行threadb.join()这行代码的当前线程为合并线程(甲方),进入TIMED_WAITING等待状态,让出CPU。
(2)如果设置了被合并线程的执行时间millis(或者millis+nanos),并不能保证当前线程一定会在millis时间后变为RUNNABLE。
(3)如果主动方合并线程在等待时被中断,就会抛出InterruptedException受检异常。
调用join()方法的语句可以理解为合并点,合并的本质是:线程A需要在合并点等待,一直等到线程B执行完成,或者等待超时。

thread01-1

如果乙方线程无限制长时间地执行,甲方线程可以进行限时等待:甲方线程等待乙方线程执行一定时间后,如果乙方还没有完成,甲方线程再继续执行。
调用join()方法的优势是比较简单,劣势是join()方法没有办法直接取得乙方线程的执行结果

1.4 线程的yield操作

**线程的yield(让步)**操作的作用是让目前正在执行的线程放弃当前的执行,让出CPU的执行权限,使得CPU去执行其他的线程。处于让步状态的JVM层面的线程状态仍然是RUNNABLE状态,但是该线程所对应的操作系统层面的线程从状态上来说会从执行状态变成就绪状态。线程在yield时,线程放弃和重占CPU的时间是不确定的,可能是刚刚放弃CPU,马上又获得CPU执行权限,重新开始执行。
yield()方法是Thread类提供的一个静态方法,它可以让当前正在执行的线程暂停,但它不会阻塞该线程,只是让线程转入就绪状态。yield只是让当前线程暂停一下,让系统的线程调度器重新调度一次。
Thread.yeid()方法有以下特点:
(1)yield仅能使一个线程从运行状态转到就绪状态,而不是阻塞状态。
(2)yield不能保证使得当前正在运行的线程迅速转换到就绪状态。
(3)即使完成了迅速切换,系统通过线程调度机制从所有就绪线程中挑选下一个执行线程时,就绪的线程有可能被选中,也有可能不被选中,其调度的过程受到其他因素(如优先级)的影响。

1.5 Executors的4种快捷创建线程池的方法

thread01-1

1.newSingleThreadExecutor创建“单线程化线程池”

该方法用于创建一个“单线程化线程池”,也就是只有一个线程的线程池,所创建的线程池用唯一的工作线程来执行任务,使用此方法创建的线程池能保证所有任务按照指定顺序(如FIFO)执行。

/**
 * @author: wangyj
 * @create: 2022-04-02
 * @version: 1.0.0
 * <p>
 * newSingleThreadExecutor创建“单线程化线程池”
 * 该方法用于创建一个“单线程化线程池”,也就是只有一个线程的线程池,所创建的线程池用唯一的工作线程来执行任务,
 * 使用此方法创建的线程池能保证所有任务按照指定顺序(如FIFO)执行。
 **/
public class CreateThreadPoolDemo {

    public static final int SLEEP_GAP = 500;
    public static final int MAX_TURN = 5;

    //异步的执行目标类
    public static class TargetTask implements Runnable {
        static AtomicInteger taskNo = new AtomicInteger(1);
        protected String taskName;

        public TargetTask() {
            taskName = "task-" + taskNo.get();
            taskNo.incrementAndGet();
        }

        public void run() {

            Print.tco("任务:" + taskName + " doing");
            // 线程睡眠一会
            sleepMilliSeconds(SLEEP_GAP);
            Print.tco(taskName + " 运行结束.");
        }

        @Override
        public String toString() {
            return "TargetTask{" + taskName + '}';
        }
    }

    //测试用例:只有一条线程的线程池
    @Test
    public void testSingleThreadExecutor() {
        ExecutorService pool = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            pool.execute(new TargetTask());
            pool.submit(new TargetTask());
        }
        sleepSeconds(1000);
        //关闭线程池
        pool.shutdown();
    }
}

从以上输出中可以看出,该线程池有以下特点: (1)单线程化的线程池中的任务是按照提交的次序顺序执行的。 (2)池中的唯一线程的存活时间是无限的。 (3)当池中的唯一线程正繁忙时,新提交的任务实例会进入内部的阻塞队列中,并且其阻塞队列是无界的。

总体来说,单线程化的线程池所适用的场景是:任务按照提交次序,一个任务一个任务地逐个执行的场景。
以上用例在最后调用shutdown()方法来关闭线程池。执行shutdown()方法后,线程池状态变为SHUTDOWN,此时线程池将拒绝新任务,不能再往线程池中添加新任务,否则会抛出RejectedExecutionException异常。此时,线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成才会退出。还有一个与shutdown()类似的方法,叫作shutdownNow(),执行shutdownNow()方法后,线程池状态会立刻变成STOP,并试图停止所有正在执行的线程,并且不再处理还在阻塞队列中等待的任务,会返回那些未执行的任务。

2.newFixedThreadPool创建“固定数量的线程池”

该方法用于创建一个“固定数量的线程池”,其唯一的参数用于设置池中线程的“固定数量”。

public class CreateThreadPoolDemo {

    public static final int SLEEP_GAP = 500;
    public static final int MAX_TURN = 5;

    //异步的执行目标类
    public static class TargetTask implements Runnable {
        static AtomicInteger taskNo = new AtomicInteger(1);
        protected String taskName;

        public TargetTask() {
            taskName = "task-" + taskNo.get();
            taskNo.incrementAndGet();
        }

        public void run() {

            Print.tco("任务:" + taskName + " doing");
            // 线程睡眠一会
            sleepMilliSeconds(SLEEP_GAP);
            Print.tco(taskName + " 运行结束.");
        }

        @Override
        public String toString() {
            return "TargetTask{" + taskName + '}';
        }
    }

    //2.测试用例:只有3条线程固定大小的线程池
    @Test
    public void testNewFixedThreadPool() {
        ExecutorService pool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 5; i++) {
            pool.execute(new TargetTask());
            pool.submit(new TargetTask());
        }
        sleepSeconds(1000);
        //关闭线程池
        pool.shutdown();
    }
}

在测试用例中,创建了一个线程数为3的“固定数量线程池”,然后向其中提交了10个任务。从输出结果可以看到,该线程池同时只能执行3个任务,剩余的任务会排队等待。
“固定数量的线程池”的特点大致如下:
(1)如果线程数没有达到“固定数量”,每次提交一个任务线程池内就创建一个新线程,直到线程达到线程池固定的数量。
(2)线程池的大小一旦达到“固定数量”就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
(3)在接收异步任务的执行目标实例时,如果池中的所有线程均在繁忙状态,新任务会进入阻塞队列中(无界的阻塞队列)。
“固定数量的线程池”的适用场景:需要任务长期执行的场景。“固定数量的线程池”的线程数能够比较稳定地保证一个数,能够避免频繁回收线程和创建线程,故适用于处理CPU密集型的任务,在CPU被工作线程长时间占用的情况下,能确保尽可能少地分配线程。
“固定数量的线程池”的弊端:内部使用无界队列来存放排队任务,当大量任务超过线程池最大容量需要处理时,队列无限增大,使服务器资源迅速耗尽。

3.newCachedThreadPool创建“可缓存线程池”

该方法用于创建一个“可缓存线程池”,如果线程池内的某些线程无事可干成为空闲线程,“可缓存线程池”可灵活回收这些空闲线程。

public class CreateThreadPoolDemo {

    public static final int SLEEP_GAP = 500;
    public static final int MAX_TURN = 5;

    //异步的执行目标类
    public static class TargetTask implements Runnable {
        static AtomicInteger taskNo = new AtomicInteger(1);
        protected String taskName;

        public TargetTask() {
            taskName = "task-" + taskNo.get();
            taskNo.incrementAndGet();
        }

        public void run() {

            Print.tco("任务:" + taskName + " doing");
            // 线程睡眠一会
            sleepMilliSeconds(SLEEP_GAP);
            Print.tco(taskName + " 运行结束.");
        }

        @Override
        public String toString() {
            return "TargetTask{" + taskName + '}';
        }
    }

    //测试用例:“可缓存线程池”
    @Test
    public void testNewCacheThreadPool() {
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            pool.execute(new TargetTask());
            pool.submit(new TargetTask());
        }
        sleepSeconds(1000);
        //关闭线程池
        pool.shutdown();
    }
}

“可缓存线程池”的特点大致如下:
(1)在接收新的异步任务target执行目标实例时,如果池内所有线程繁忙,此线程池就会添加新线程来处理任务。
(2)此线程池不会对线程池大小进行限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
(3)如果部分线程空闲,也就是存量线程的数量超过了处理任务数量,就会回收空闲(60秒不执行任务)线程。
**“可缓存线程池”的适用场景:**需要快速处理突发性强、耗时较短的任务场景,如Netty的NIO处理场景、REST API接口的瞬时削峰场景。“可缓存线程池”的线程数量不固定,只要有空闲线程就会被回收;接收到的新异步任务执行目标,查看是否有线程处于空闲状态,如果没有就直接创建新的线程。
**“可缓存线程池”的弊端:**线程池没有最大线程数量限制,如果大量的异步任务执行目标实例同时提交,可能会因创建线程过多而导致资源耗尽。

4.newScheduledThreadPool创建“可调度线程池”

该方法用于创建一个“可调度线程池”,即一个提供**“延时”和“周期性”任务调度功能的ScheduledExecutorService类型的线程池。

//方法一:创建一个可调度线程池,池内仅含有一个线程
public static ScheduledExecutorService newSingleThreadScheduledExecutor();
//方法二:创建一个可调度线程池,池内含有N个线程,N的值为输入参数corePoolSize
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) ;

**newSingleThreadScheduledExecutor 工厂方法所创建的仅含有一个线程的可调度线程池适用于调度串行化任务,也就是一个任务一个任务地串行化调度执行。

public class CreateThreadPoolDemo {

    public static final int SLEEP_GAP = 500;
    public static final int MAX_TURN = 5;

    //异步的执行目标类
    public static class TargetTask implements Runnable {
        static AtomicInteger taskNo = new AtomicInteger(1);
        protected String taskName;

        public TargetTask() {
            taskName = "task-" + taskNo.get();
            taskNo.incrementAndGet();
        }
        public void run() {

            Print.tco("任务:" + taskName + " doing");
            // 线程睡眠一会
            sleepMilliSeconds(SLEEP_GAP);
            Print.tco(taskName + " 运行结束.");
        }
        @Override
        public String toString() {
            return "TargetTask{" + taskName + '}';
        }
    }

    //测试用例:“可调度线程池”
    @Test
    public void testNewScheduledThreadPool() {
        ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
        for (int i = 0; i < 2; i++) {
            scheduled.scheduleAtFixedRate(new TargetTask(),0,500, TimeUnit.MILLISECONDS);
            //以上的参数中:
            // 0表示首次执行任务的延迟时间,500表示每次执行任务的间隔时间
            //TimeUnit.MILLISECONDS所设置的时间的计时单位为毫秒
        }
        sleepSeconds(1000);
        //关闭线程池
        scheduled.shutdown();
    }
}

1.6 线程池的任务调度流程

线程池的任务调度流程(包含接收新任务和执行下一个任务)大致如下:
(1)如果当前工作线程数量小于核心线程数量,执行器总是优先创建一个任务线程,而不是从线程队列中获取一个空闲线程。
(2)如果线程池中总的任务数量大于核心线程池数量,新接收的任务将被加入阻塞队列中,一直到阻塞队列已满。在核心线程池数量已经用完、阻塞队列没有满的场景下,线程池不会为新任务创建一个新线程。
(3)当完成一个任务的执行时,执行器总是优先从阻塞队列中获取下一个任务,并开始执行,一直到阻塞队列为空,其中所有的缓存任务被取光。
(4)在核心线程池数量已经用完、阻塞队列也已经满了的场景下,如果线程池接收到新的任务,将会为新任务创建一个线程(非核心线程),并且立即开始执行新任务。
(5)在核心线程都用完、阻塞队列已满的情况下,一直会创建新线程去执行新任务,直到池内的线程总数超出maximumPoolSize。如果线程池的线程总数超过maximumPoolSize,线程池就会拒绝接收任务,当新任务过来时,会为新任务执行拒绝策略

thread01-1

1.任务阻塞队列

Java中的阻塞队列(BlockingQueue)与普通队列相比有一个重要的特点:在阻塞队列为空时会阻塞当前线程的元素获取操作。具体来说,在一个线程从一个空的阻塞队列中获取元素时线程会被阻塞,直到阻塞队列中有了元素;当队列中有元素后,被阻塞的线程会自动被唤醒(唤醒过程不需要用户程序干预)。
Java线程池使用BlockingQueue实例暂时接收到的异步任务,BlockingQueue是JUC包的一个超级接口,比较常用的实现类有:
(1)ArrayBlockingQueu****e:是一个数组实现的有界阻塞队列(有界队列),队列中的元素按FIFO排序。ArrayBlockingQueue在创建时必须设置大小,接收的任务超出corePoolSize数量时,任务被缓存到该阻塞队列中,任务缓存的数量只能为创建时设置的大小,若该阻塞队列已满,则会为新的任务创建线程,直到线程池中的线程总数大于maximumPoolSize。
(2)LinkedBlockingQueue:是一个基于链表实现的阻塞队列,按FIFO排序任务,可以设置容量(有界队列),不设置容量则默认使用Integer.Max_VALUE作为容量(无界队列)。该队列的吞吐量高于ArrayBlockingQueue。
如果不设置LinkedBlockingQueue的容量(无界队列),当接收的任务数量超出corePoolSize时,则新任务可以被无限制地缓存到该阻塞队列中,直到资源耗尽。有两个快捷创建线程池的工厂方法Executors.newSingleThreadExecutor和Executors.newFixedThreadPool使用了这个队列,并且都没有设置容量(无界队列)。
(3)PriorityBlockingQueue:是具有优先级的无界队列。
(4)DelayQueue:这是一个无界阻塞延迟队列,底层基于PriorityBlockingQueue实现,队列中每个元素都有过期时间,当从队列获取元素(元素出队)时,只有已经过期的元素才会出队,队列头部的元素是过期最快的元素。快捷工厂方法Executors.newScheduledThreadPool所创建的线程池使用此队列。
(5)SynchronousQueue:(同步队列)是一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程的调用移除操作,否则插入操作一直处于阻塞状态,其吞吐量通常高于LinkedBlockingQueue。快捷工厂方法Executors.newCachedThreadPool所创建的线程池使用此队列。与前面的队列相比,这个队列比较特殊,它不会保存提交的任务,而是直接新建一个线程来执行新来的任务。

2.线程池的拒绝策略

在线程池的任务缓存队列为有界队列(有容量限制的队列)的时候,如果队列满了,提交任务到线程池的时候就会被拒绝。总体来说,任务被拒绝有两种情况:
(1)线程池已经被关闭。
(2)工作队列已满且maximumPoolSize已满。
无论以上哪种情况任务被拒绝,线程池都会调用RejectedExecutionHandler实例的rejectedExecution方法。RejectedExecutionHandler是拒绝策略的接口,JUC为该接口提供了以下几种实现:
·AbortPolicy:拒绝策略。
使用该策略时,如果线程池队列满了,新任务就会被拒绝,并且抛出RejectedExecutionException异常。该策略是线程池默认的拒绝策略。
·DiscardPolicy:抛弃策略。
该策略是AbortPolicy的Silent(安静)版本,如果线程池队列满了,新任务就会直接被丢掉,并且不会有任何异常抛出。
·DiscardOldestPolicy:抛弃最老任务策略。
抛弃最老任务策略,也就是说如果队列满了,就会将最早进入队列的任务抛弃,从队列中腾出空间,再尝试加入队列。因为队列是队尾进队头出,队头元素是最老的,所以每次都是移除队头元素后再尝试入队。
·CallerRunsPolicy:调用者执行策略。
调用者执行策略。在新任务被添加到线程池时,如果添加失败,那么提交任务线程会自己去执行该任务,不会使用线程池中的线程去执行新任务。
·自定义策略。

thread01-1

3.线程池的优雅关闭

优雅地关闭线程池主要涉及的方法有3个:
(1)shutdown:是JUC提供的一个有序关闭线程池的方法,此方法会等待当前工作队列中的剩余任务全部执行完成之后,才会执行关闭,但是此方法被调用之后线程池的状态转为SHUTDOWN,线程池不会再接收新的任务。

thread01-1

(2)shutdownNow:是JUC提供的一个立即关闭线程池的方法,此方法会打断正在执行的工作线程,并且会清空当前工作队列中的剩余任务,返回的是尚未执行的任务。
(3)awaitTermination:等待线程池完成关闭。在调用线程池的shutdown()与shutdownNow()方法时,当前线程会立即返回,不会一直等待直到线程池完成关闭。如果需要等到线程池关闭完成,可以调用awaitTermination()方法。

**大家可以结合shutdown()、shutdownNow()、awaitTermination()**三个方法优雅地关闭一个线程池,大致分为以下几步:
(1)执行shutdown()方法,拒绝新任务的提交,并等待所有任务有序地执行完毕。
(2)执行awaitTermination(long timeout,TimeUnit unit)方法,指定超时时间,判断是否已经关闭所有任务,线程池关闭完成。
(3)如果awaitTermination()方法返回false,或者被中断,就调用shutDownNow()方法立即关闭线程池所有任务。
(4)补充执行awaitTermination(long timeout,TimeUnit unit)方法,判断线程池是否关闭完成。如果超时,就可以进入循环关闭,循环一定的次数(如1000次),不断关闭线程池,直到其关闭或者循环结束。

1.7 Executors快捷创建线程池的潜在问题

在很多公司(如阿里、华为等)的编程规范中,非常明确地禁止使用Executors快捷创建线程池,为什么呢?这里从源码讲起,介绍使用Executors工厂方法快捷创建线程池将会面临的潜在问题。

1.使用Executors创建“固定数量的线程池”的潜在问题

使用newFixedThreadPool工厂方法创建“固定数量的线程池”的源码如下:

thread01-1

newFixedThreadPool工厂方法返回一个ThreadPoolExecutor实例,该线程池实例的corePoolSize数量为参数nThread,其maximumPoolSize数量也为参数nThread,其workQueue属性的值为LinkedBlockingQueue()无界阻塞队列。
使用Executors创建“固定数量的线程池”的潜在问题主要存在于其workQueue上,其值为LinkedBlockingQueue(无界阻塞队列)。如果任务提交速度持续大于任务处理速度,就会造成队列中大量的任务等待。如果队列很大,很有可能导致JVM出现OOM(Out Of Memory)异常,即内存资源耗尽。

2.使用Executors创建“单线程化线程池”的潜在问题

使用newSingleThreadExecutor工厂方法创建“单线程化线程池”的源码如下:

thread01-1

以上代码首先通过调用工厂方法newFixedThreadPool(1)创建一个数量为1的“固定大小的线程池”,然后使用FinalizableDelegatedExecutorService对该“固定大小的线程池”进行包装,这一层包装的作用是防止线程池的corePoolSize被动态地修改。

    @org.junit.Test
    public void testNewFixedThreadPool2() {
        //创建一个固定大小线程池
        ExecutorService fixedExecutorService = Executors.newFixedThreadPool(1);
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedExecutorService;
        Print.tco(threadPoolExecutor.getMaximumPoolSize());
        //设置核心线程数
        threadPoolExecutor.setCorePoolSize(8);

        //创建一个单线程化的线程池
        ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
        //转换成普通线程池, 会抛出运行时异常 java.lang.ClassCastException
        ((ThreadPoolExecutor) singleExecutorService).setCorePoolSize(8);
    }

以上代码在运行时会抛出异常。观察所抛出的异常,可以知道FinalizableDelegatedExecutorService实例无法被转型为ThreadPoolExecutor类型,所以也就无法修改其corePoolSize属性,从而确保“单线程化线程池”在运行过程中corePoolSize不会被调整,其线程数始终唯一,做到了真正的Single。反过来说,如果没有被FinalizableDelegatedExecutorService包装,原始的ThreadPoolExecutor实例是可以动态调整corePoolSize属性的。
使用Executors创建的“单线程化线程池”与“固定大小的线程池”一样,其潜在问题仍然存在于其workQueue属性上,该属性的值为LinkedBlockingQueue(无界阻塞队列)。如果任务提交速度持续大于任务处理速度,就会造成队列大量阻塞。如果队列很大,很有可能导致JVM的OOM异常,甚至造成内存资源耗尽。

3.使用Executors创建“可缓存线程池”的潜在问题

使用newCachedThreadPool工厂方法创建“可缓存线程池”的源码如下:

thread01-1

以上代码通过调用ThreadPoolExecutor标准构造器创建一个核心线程数为0、最大线程数不设限制的线程池。所以,理论上“可缓存线程池”可以拥有无数个工作线程,即线程数量几乎无限制。“可缓存线程池”的workQueue为SynchronousQueue同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,正因为“可缓存线程池”可以无限制地创建线程,不会有任务等待,所以才使用SynchronousQueue。
当“可缓存线程池”有新任务到来时,新任务会被插入SynchronousQueue实例中,由于SynchronousQueue是同步队列,因此会在池中寻找可用线程来执行,若有可用线程则执行,若没有可用线程,则线程池会创建一个线程来执行该任务。
SynchronousQueue是一个比较特殊的阻塞队列实现类,SynchronousQueue没有容量,每一个插入操作都要等待对应的删除操作,反之每个删除操作都要等待对应的插入操作。也就是说,如果使用SynchronousQueue,提交的任务不会被真实地保存,而是将新任务交给空闲线程执行,如果没有空闲线程,就创建线程,如果线程数都已经大于最大线程数,就执行拒绝策略。使用这种队列需要将maximumPoolSize设置得非常大,从而使得新任务不会被拒绝。
使用Executors创建的“可缓存线程池”的潜在问题存在于其最大线程数量不设限上。由于其maximumPoolSize的值为Integer.MAX_VALUE(非常大),可以认为可以无限创建线程,如果任务提交较多,就会造成大量的线程被启动,很有可能造成OOM异常,甚至导致CPU线程资源耗尽。

4.使用Executors创建“可调度线程池”的潜在问题

使用newScheduledThreadPool工厂方法创建“可调度线程池”的源码如下:

thread01-1

Executors的newScheduledThreadPool工厂方法调用了ScheduledThreadPoolExecutor实现类的构造器,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor的普通线程池类,在其构造器内部进一步调用了该父类的构造器,具体的代码如下:

thread01-1

以上代码创建了一个ThreadPoolExecutor实例,其corePoolSize为传递来的参数,maximumPoolSize为Integer.MAX_VALUE,表示线程数不设上限,其workQueue为一个DelayedWorkQueue实例,这是一个按到期时间升序排序的阻塞队列。
使用Executors创建的“可缓存线程池”的潜在问题存在于其最大线程数量不设限上。由于其线程数量不设限,如果到期任务太多,就会导致CPU的线程资源耗尽。

5.总结起来,使用Executors创建线程池主要的弊端如下:

(1)FixedThreadPool和SingleThreadPool
这两个工厂方法所创建的线程池,工作队列(任务排队的队列)的长度都为Integer.MAX_VALUE,可能会堆积大量的任务,从而导致OOM(即耗尽内存资源)。
(2)CachedThreadPool和ScheduledThreadPool
这两个工厂方法所创建的线程池允许创建的线程数量为Integer.MAX_VALUE,可能会导致创建大量的线程,从而导致OOM。
虽然Executors工厂类提供了构造线程池的便捷方法,但是对于服务器程序而言,大家应该杜绝使用这些便捷方法,而是直接使用线程池ThreadPoolExecutor的构造器,从而有效避免由于使用无界队列可能导致的内存资源耗尽,或者由于对线程个数不做限制而导致的CPU资源耗尽等问题。
所以,大厂的编程规范都不允许使用Executors创建线程池,而是要求使用标准构造器ThreadPoolExecutor创建线程池。

1.8 按照任务类型对线程池进行分类

使用标准构造器ThreadPoolExecutor创建线程池时,会涉及线程数的配置,而线程数的配置与异步任务类型是分不开的。这里将线程池的异步任务大致分为以下三类:
(1)IO密集型任务
此类任务主要是执行IO操作。由于执行IO操作的时间较长,导致CPU的利用率不高,这类任务CPU常处于空闲状态。Netty的IO读写操作为此类任务的典型例子。
(2)CPU密集型任务
此类任务主要是执行计算任务。由于响应时间很快,CPU一直在运行,这种任务CPU的利用率很高。
(3)混合型任务
此类任务既要执行逻辑计算,又要进行IO操作(如RPC调用、数据库访问)。相对来说,由于执行IO操作的耗时较长(一次网络往返往往在数百毫秒级别),这类任务的CPU利用率也不是太高。Web服务器的HTTP请求处理操作为此类任务的典型例子。

1.为IO密集型任务确定线程数

由于IO密集型任务的CPU使用率较低,导致线程空余时间很多,因此通常需要开CPU核心数两倍的线程。当IO线程空闲时,可以启用其他线程继续使用CPU,以提高CPU的使用率。
Netty的IO处理任务就是典型的IO密集型任务。所以,Netty的Reactor(反应器)实现类(定制版的线程池)的IO处理线程数默认正好为CPU核数的两倍,以下是其相关的代码:

thread01-1

2.为CPU密集型任务确定线程数

CPU密集型任务也叫计算密集型任务,其特点是要进行大量计算而需要消耗CPU资源,比如计算圆周率、对视频进行高清解码等。CPU密集型任务虽然也可以并行完成,但是并行的任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以要最高效地利用CPU,CPU密集型任务并行执行的数量应当等于CPU的核心数。
比如4个核心的CPU,通过4个线程并行地执行4个CPU密集型任务,此时的效率是最高的。但是如果线程数远远超出CPU核心数量,就需要频繁地切换线程,线程上下文切换时需要消耗时间,反而会使得任务效率下降。因此,对于CPU密集型的任务来说,线程数等于CPU数就行。

3.为混合型任务确定线程数

混合型任务既要执行逻辑计算,又要进行大量非CPU耗时操作(如RPC调用、数据库访问、网络通信等),所以混合型任务CPU的利用率不是太高,非CPU耗时往往是CPU耗时的数倍。比如在Web应用中处理HTTP请求时,一次请求处理会包括DB操作、RPC操作、缓存操作等多种耗时操作。一般来说,一次Web请求的CPU计算耗时往往较少,大致在100~500毫秒,而其他耗时操作会占用500~1000毫秒,甚至更多的时间。
在为混合型任务创建线程池时,如何确定线程数呢?业界有一个比较成熟的估算公式,具体如下:
**最佳线程数 = ((线程等待时间+线程CPU时间) / 线程CPU时间) _ CPU核数_
经过简单的换算,以上公式可进一步转换为:
**最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1) _ CPU核数_
通过公式可以看出:等待时间所占的比例越高,需要的线程就越多;CPU耗时所占的比例越高,需要的线程就越少。下面举一个例子:比如在Web服务器处理HTTP请求时,假设平均线程CPU运行时间为100毫秒,而线程等待时间(比如包括DB操作、RPC操作、缓存操作等)为900毫秒,如果CPU核数为8,那么根据上面这个公式,估算如下:
(900毫秒 + 100毫秒) / 100毫秒 _ 8 = 10 _ 8 = 80

2-并发控制工具类

2.1 Synchronized 使用

synchronized方法的同步锁实质上使用了this对象锁,这样就免去了手工设置同步锁的工作。而使用synchronized代码块需要手工设置同步锁。

  • 静态的同步方法

thread01-1

静态方法属于Class实例而不是单个Object实例,在静态方法内部是不可以访问Object实例的this引用(也叫指针、句柄)的。所以,修饰static方法的synchronized关键字就没有办法获得Object实例的this对象的监视锁。
实际上,使用synchronized关键字修饰static方法时,synchronized的同步锁并不是普通Object对象的监视锁,而是类所对应的Class对象的监视锁。
为了以示区分,这里将Object对象的监视锁叫作对象锁,将Class对象的监视锁叫作类锁。当synchronized关键字修饰static方法时,同步锁为类锁;当synchronized关键字修饰普通的成员方法(非静态方法)时,同步锁为类锁。由于类的对象实例可以有很多,但是每个类只有一个Class实例,因此使用类锁作为synchronized的同步锁时会造成同一个JVM内的所有线程只能互斥地进入临界区段。

thread01-1

所以,使用synchronized关键字修饰static方法是非常粗粒度的同步机制。
通过synchronized关键字所抢占的同步锁什么时候释放呢?一种场景是synchronized块(代码块或者方法)正确执行完毕,监视锁自动释放;另一种场景是程序出现异常,非正常退出synchronized块,监视锁也会自动释放。所以,使用synchronized块时不必担心监视锁的释放问题。

2.1.1生产者消费者的实现

2.1.2 无锁、偏向锁、轻量级锁和重量级锁

1.无锁状态
Java对象刚创建时还没有任何线程来竞争,说明该对象处于无锁状态(无线程竞争它),这时偏向锁标识位是0,锁状态是01。无锁状态下对象的Mark Word如图:

thread01-1

2.偏向锁状态
偏向锁是指一段同步代码一直被同一个线程所访问,那么该线程会自动获取锁,降低获取锁的代价。如果内置锁处于偏向状态,当有一个线程来竞争锁时,先用偏向锁,表示内置锁偏爱这个线程,这个线程要执行该锁关联的同步代码时,不需要再做任何检查和切换。偏向锁在竞争不激烈的情况下效率非常高。
偏向锁状态的Mark Word会记录内置锁自己偏爱的线程ID,内置锁会将该线程当作自己的熟人。偏向锁状态下对象的Mark Word如图:

thread01-1

3.轻量级锁状态
当有两个线程开始竞争这个锁对象时,情况就发生变化了,不再是偏向(独占)锁了,锁会升级为轻量级锁,两个线程公平竞争,哪个线程先占有锁对象,锁对象的Mark Word就指向哪个线程的栈帧中的锁记录。轻量级锁状态下对象的Mark Word如图:

thread01-1

当锁处于偏向锁,又被另一个线程企图抢占时,偏向锁就会升级为轻量级锁。企图抢占的线程会通过自旋的形式尝试获取锁,不会阻塞抢锁线程,以便提高性能。
自旋原理非常简单,如果持有锁的线程能在很短时间内释放锁资源,那么那些等待竞争锁的线程就不需要进行内核态和用户态之间的切换来进入阻塞挂起状态,它们只需要等一等(自旋),等持有锁的线程释放锁后即可立即获取锁,这样就避免了用户线程和内核切换的消耗。
但是,线程自旋是需要消耗CPU的,如果一直获取不到锁,那么线程也不能一直占用CPU自旋做无用功,所以需要设定一个自旋等待的最大时间。JVM对于自旋周期的选择,JDK 1.6之后引入了适应性自旋锁,适应性自旋锁意味着自旋的时间不是固定的,而是由前一次在同一个锁上的自旋时间以及锁的拥有者的状态来决定的。线程如果自旋成功了,下次自旋的次数就会更多,如果自旋失败了,自旋的次数就会减少。
如果持有锁的线程执行的时间超过自旋等待的最大时间仍没有释放锁,就会导致其他争用锁的线程在最大等待时间内还是获取不到锁,自旋不会一直持续下去,这时争用线程会停止自旋进入阻塞状态,该锁膨胀为重量级锁。

轻量级锁主要有两种:普通自旋锁和自适应自旋锁。
1.普通自旋锁
所谓普通自旋锁,就是指当有线程来竞争锁时,抢锁线程会在原地循环等待,而不是被阻塞,直到那个占有锁的线程释放锁之后,这个抢锁线程才可以获得锁。
默认情况下,自旋的次数为10次,用户可以通过**-XX:PreBlockSpin**选项来进行更改。
2.自适应自旋锁
所谓自适应自旋锁,就是等待线程空循环的自旋次数并非是固定的,而是会动态地根据实际情况来改变自旋等待的次数,自旋次数由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。自适应自旋锁的大概原理是:
(1)如果抢锁线程在同一个锁对象上之前成功获得过锁,JVM就会认为这次自旋很有可能再次成功,因此允许自旋等待持续相对更长的时间。
(2)如果对于某个锁,抢锁线程很少成功获得过,那么JVM将可能减少自旋时间甚至省略自旋过程,以避免浪费处理器资源。
自适应自旋解决的是“锁竞争时间不确定”的问题。自适应自旋假定不同线程持有同一个锁对象的时间基本相当,竞争程度趋于稳定。总的思想是:根据上一次自旋的时间与结果调整下一次自旋的时间。

4.重量级锁状态
重量级锁会让其他申请的线程之间进入阻塞,性能降低。重量级锁也叫同步锁,这个锁对象Mark Word再次发生变化,会指向一个监视器对象,该监视器对象用集合的形式来登记和管理排队的线程。重量级锁状态下对象的Mark Word如图:

thread01-1

2.1.3 偏向锁、轻量级锁与重量级锁的对比

总结一下synchronized的执行过程,大致如下:
1)线程抢锁时,JVM首先检测内置锁对象Mark Word中的biased_lock(偏向锁标识)是否设置成1,lock(锁标志位)是否为01,如果都满足,确认内置锁对象为可偏向状态。
2)在内置锁对象确认为可偏向状态之后,JVM检查Mark Word中的线程ID是否为抢锁线程ID,如果是,就表示抢锁线程处于偏向锁状态,抢锁线程快速获得锁,开始执行临界区代码。
(3)如果Mark Word中的线程ID并未指向抢锁线程,就通过CAS操作竞争锁。如果竞争成功,就将Mark Word中的线程ID设置为抢锁线程,偏向标志位设置为1,锁标志位设置为01,然后执行临界区代码,此时内置锁对象处于偏向锁状态。
(4)如果CAS操作竞争失败,就说明发生了竞争,撤销偏向锁,进而升级为轻量级锁。
(5)JVM使用CAS将锁对象的Mark Word替换为抢锁线程的锁记录指针,如果成功,抢锁线程就获得锁。如果替换失败,就表示其他线程竞争锁,JVM尝试使用CAS自旋替换抢锁线程的锁记录指针,如果自旋成功(抢锁成功),那么锁对象依然处于轻量级锁状态。
(6)如果JVM的CAS替换锁记录指针自旋失败,轻量级锁就膨胀为重量级锁,后面等待锁的线程也要进入阻塞状态。
总体来说,偏向锁是在没有发生锁争用的情况下使用的;一旦有了第二个线程争用锁,偏向锁就会升级为轻量级锁;如果锁争用很激烈,轻量级锁的CAS自旋到达阈值后,轻量级锁就会升级为重量级锁。

2.2 线程间通信

线程间通信的方式可以有很多种:等待-通知、共享内存、管道流。每种方式用不同的方法来实现。

2.2.1 等待-通知的通信方式

“等待-通知”通信方式是Java中使用普遍的线程间通信方式,其经典的案例是“生产者-消费者”模式。
Java语言中“等待-通知”方式的线程间通信使用对象的wait()、notify()两类方法来实现。每个Java对象都有wait()、notify()两类实例方法,并且wait()、notify()方法和对象的监视器是紧密相关的。
说明:Wait()、notify()两类方法在数量上不止两个。wait()、notify()两类方法不属于Thread类,而是属于Java对象实例(Object实例或者Class实例)。

2.2.1 wait方法和notify方法的原理

1.对象的wait()方法
对象的wait()方法的主要作用是让当前线程阻塞并等待被唤醒。wait()方法与对象监视器紧密相关,使用wait()方法时一定要放在同步块中。
2.对象的notify()方法
对象的notify()方法的主要作用是唤醒在等待的线程。notify()方法与对象监视器紧密相关,调用notify()方法时也需要放在同步块中。
2.对象的notify()方法和notifyAll() 方法
一般来说,使用notifyAll() 时的代码要比使用notify() 时的更为健壮。

2.2 CountDownLatch(倒数闩)使用

CountDownLatch(倒数闩)是一个非常实用的等待多线程并发的工具类。 调用线程可以在倒数闩上进行等待,一直等待倒数闩的次数减少到0,才继续往下执行。每一个被等待的线程执行完成之后进行一次倒数。所有被等待的线程执行完成之后,倒数闩的次数减少到0,调用线程可以往下执行,从而达到并发等待的效果。
在使用CountDownLatch时,先创建了一个CountDownLatch实例,设置其倒数的总数,例子中值为10,表示等待10个线程执行完成。主线程通过调用latch.await()在倒数闩实例上执行等待,等到latch实例倒数到0才能继续执行。