参考文档:Java线程池实现原理及其在美团业务中的实践 - 美团技术团队 (meituan.com)

线程池的好处

1.降低资源消耗

2.提高响应速度

3.提高线程的可管理性:线程无限创建而带来的不合理分布的问题导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。

4.提供更多更强大的功能:线程池有可扩展性,允许开发人员向其中增加更多的功能。

线程池解决的问题

解决的核心问题:资源管理问题。

线程池核心设计与实现

Java中使用的是ThreadPoolExecutor类

总体设计

基于JDK 1.8源码分析。下面是ThradPoolExecutor的UML类图。

Executor接口(顶层):将任务提交和任务执行进行解耦。用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。

ExecutorService接口:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法。(2)提供管理线程池的方法,如停止线程池的运行。AbstractExecutorService是上层的抽象类,将执行任务的流程串流,保证下层的实现只需要关注一个执行任务的方法。实现类ThreadPoolExecutor实现最复杂的运行部分,一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

ThreadPoolExecutor运行机制:

线程池运行分为两部分:任务管理(生产者)、线程管理(消费者)。

任务管理:任务提交后,线程池会判断该任务后续的流转:1.直接申请线程执行任务2.缓冲到队列中等待线程执行3.拒绝该任务。

线程管理:被统一维护在线程池内,根据任务请求进行线程分配,当线程执行完成后会继续获取新的任务去执行,最终当线程获取不了任务时被回收。

生命周期管理

线程池的运行状态是由内部来维护。线程池内用1个变量维护两个值:运行状态(runState)和线程数量(workerCount)。在具体实现中,这两个关键参数的维护放在一起:

1
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段。高3位保存runState,低29位保存workerCount,两变量互不干扰,可避免出现不一致的情况,节省锁资源。

获取生命周期状态、获取线程池线程数量方法:

1
2
3
private static int runStateOf(int c)     { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl

ThreadPoolExecutor的5种运行状态:

img

其生命周期转换如下所示:

图3 线程池生命周期

任务执行机制

任务调度

任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务如何执行是由这个阶段决定的。

首先,所有任务调度是由execute方法完成,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程。执行过程如下:

图4 任务调度流程

其中,线程数为workerCount,核心(线程)数为corePoolSize,最大线程数为maximumPoolSize。

任务缓冲

任务缓冲模块是线程池能够管理任务的核心部分。通过阻塞队列实现任务和线程两者的解耦。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

线程队列(BlockingQueue)是支持两个附加操作的队列:在队列为空时,获取元素的线程会等待队列变为非空;当队列满时,储存元素的线程会等待队列可用。生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。

下面线程1往阻塞队列中添加元素,线程2从阻塞队列中语出元素:

图5 阻塞队列

下面是阻塞队列的成员:

img

任务申请

从队列中获取任务的流程(getTask):

图6 获取任务流程图

工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收。

任务拒绝

当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时拒绝任务。

拒绝策略是预购接口

1
2
3
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

用户可以通过实现该接口定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略,其特点如下:

img

Worker线程管理

Worker线程

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。

1
2
3
4
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}

Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

图7 Worker执行任务

线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。

Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

1.lock方法一旦获取了独占锁,表示当前线程正在执行任务中。 2.如果正在执行任务,则不应该中断线程。 3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。 4.线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

图8 线程池回收过程

Worker线程增加

通过线程池中addWorker()方法增加。该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。

addWorker两个参数:firstTask用于指定新增的线程执行的第一个任务,可为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。

图9 申请线程执行流程图

Worker线程回收

线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。

1
2
3
4
5
6
7
try {
while (task != null || (task = getTask()) != null) {
//执行任务
}
} finally {
processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
}

线程回收的工作是在processWorkerExit方法完成的。

图10 线程销毁流程

Worker线程执行任务

在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:

1.while循环不断地通过getTask()方法获取任务。 2.getTask()方法从阻塞队列中取任务。 3.如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。 4.执行任务。 5.如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。

图11 执行任务流程