南通建设局网站查询,石家庄百度关键词搜索,wordpress 站群管理,青州哪里做网站一、简介
随着计算机行业的飞速发展#xff0c;摩尔定律逐渐失效#xff0c;多核CPU成为主流。使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器。J.U.C提供的线程池#xff1a;ThreadPoolExecutor 类#xff0c;帮助开发人员管理线程并方便地执行并行任务。了…一、简介
随着计算机行业的飞速发展摩尔定律逐渐失效多核CPU成为主流。使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器。J.U.C提供的线程池ThreadPoolExecutor 类帮助开发人员管理线程并方便地执行并行任务。了解并合理使用线程池是一个开发人员必修的基本功。
本文开篇简述线程池概念和用途接着结合线程池的源码帮助读者领略线程池的设计思路最后回归实践通过案例讲述使用线程池遇到的问题并给出了一种动态化线程池解决方案。 1.1 线程池是什么
线程池Thread Pool是一种基于池化思想管理线程的工具经常出现在多线程服务器中如MySQL。
线程过多会带来额外的开销其中包括创建销毁线程的开销、调度线程的开销等等同时也降低了计算机的整体性能。线程池维护多个线程等待监督管理者分配可并发执行的任务。这种做法一方面避免了处理任务时创建销毁线程开销的代价另一方面避免了线程数量膨胀导致的过分调度问题保证了对内核的充分利用。
当然使用线程池可以带来一系列好处
降低资源消耗通过池化技术重复利用已创建的线程降低线程创建和销毁造成的损耗。提高响应速度任务到达时无需等待线程创建即可立即执行。提高线程的可管理性线程是稀缺资源如果无限制创建不仅会消耗系统资源还会因为线程的不合理分布导致资源调度失衡降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。提供更多更强大的功能线程池具备可拓展性允许开发人员向其中增加更多的功能。比如延时定时线程池 ScheduledThreadPoolExecutor就允许任务延期执行或定期执行。 1.2 线程池解决的问题是什么
线程池解决的核心问题就是资源管理问题。在并发环境下系统不能够确定在任意时刻中有多少任务需要执行有多少资源需要投入。这种不确定性将带来以下若干问题
频繁申请/销毁资源和调度资源将带来额外的消耗可能会非常巨大。对资源无限申请缺少抑制手段易引发系统资源耗尽的风险。系统无法合理管理内部的资源分布会降低系统的稳定性。
为解决资源分配这个问题线程池采用了“池化”Pooling思想。池化顾名思义是为了最大化收益并最小化风险而将资源统一在一起管理的一种思想。“池化”思想不仅仅能应用在计算机领域在金融、设备、人员管理、工作管理等领域也有相关的应用。
在计算机领域中的表现为统一管理IT资源包括服务器、存储、和网络资源等等。通过共享资源使用户在低投入中获益。除去线程池还有其他比较典型的几种使用策略包括
内存池(Memory Pooling)预先申请内存提升申请内存速度减少内存碎片。连接池(Connection Pooling)预先申请数据库连接提升申请连接的速度降低系统的开销。实例池(Object Pooling)循环使用对象减少资源在初始化和释放时的昂贵损耗。如 Spring的BeanFactory 二、线程池核心设计与实现
在前文中我们了解到线程池是一种通过“池化”思想帮助我们管理线程而获取并发性的工具在Java中的体现是 ThreadPoolExecutor 类。那么它的的详细设计与实现是什么样的呢我们会在本章进行详细介绍。 2.1 线程池整体设计
Java中的线程池核心实现类是 ThreadPoolExecutor本章基于JDK 1.8的源码来分析Java线程池的核心设计与实现。我们首先来看一下ThreadPoolExecutor 的UML类图了解下ThreadPoolExecutor的继承关系。 ThreadPoolExecutor 实现的顶层接口是 Executor顶层接口 Executor 提供了一种思想将任务提交和任务执行进行解耦。用户无需关注如何创建线程如何调度线程来执行任务用户只需提供Runnable对象将任务的运行逻辑提交到执行器(Executor)中由 Executor 框架完成线程的调配和任务的执行部分。 顶层接口 Executor 的子接口 ExecutorService 接口增加了一些能力
扩充执行任务的能力补充可以为一个或一批异步任务生成Future的方法提供了管控线程池的方法比如停止线程池的运行。 ExecutorService 的实现类 AbstractExecutorService 则是 ThreadPoolExecutor 上层的抽象类将执行任务的流程串联了起来保证下层的实现只需关注一个执行任务的方法即可。 最下层的实现类 ThreadPoolExecutor 实现最复杂的运行部分ThreadPoolExecutor 将会一方面维护自身的生命周期另一方面同时管理线程和任务使两者良好的结合从而执行并行任务。ThreadPoolExecutor 是如何运行如何同时维护线程和执行任务的呢其运行机制如下图所示 线程池在内部实际上构建了一个生产者消费者模型将线程和任务两者解耦并不直接关联从而良好的缓冲任务复用线程。 线程池的运行主要分成两部分
任务管理 充当生产者的角色当任务提交后线程池会判断该任务后续的流转 直接申请线程执行该任务缓冲到队列中等待线程执行拒绝该任务。线程管理 线程管理部分是消费者它们被统一维护在线程池内根据任务请求进行线程的分配当线程执行完任务后则会继续获取新的任务去执行最终当线程获取不到任务的时候线程就会被回收。
接下来我们会按照以下三个部分去详细讲解线程池运行机制
线程池如何维护自身状态。线程池如何管理任务。线程池如何管理线程。 2.1.1 ThreadPoolExecutor 关键属性
public class ThreadPoolExecutor extends AbstractExecutorService {// 控制变量-存放状态和线程数private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0));// 任务队列必须是阻塞队列private final BlockingQueueRunnable workQueue;// 工作线程集合存放线程池中所有的活跃的工作线程只有在持有全局锁mainLock的前提下才能访问此集合private final HashSetWorker workers new HashSet();// 全局锁private final ReentrantLock mainLock new ReentrantLock();// awaitTermination方法使用的等待条件变量private final Condition termination mainLock.newCondition();// 线程工厂用于创建新的线程实例private volatile ThreadFactory threadFactory;// 拒绝执行处理器对应不同的拒绝策略private volatile RejectedExecutionHandler handler;// 空闲线程等待任务的时间周期单位是纳秒private volatile long keepAliveTime;// 是否允许核心线程超时如果为true则keepAliveTime对核心线程也生效private volatile boolean allowCoreThreadTimeOut;// 核心线程数private volatile int corePoolSize;// 线程池容量private volatile int maximumPoolSize;//...... 省略其他代码
} 2.1.2 ThreadPoolExcutor构造方法
构造方法如下
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueueRunnable workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize 0 ||maximumPoolSize 0 ||maximumPoolSize corePoolSize ||keepAliveTime 0)throw new IllegalArgumentException();if (workQueue null || threadFactory null || handler null)throw new NullPointerException();this.acc System.getSecurityManager() null ?null :AccessController.getContext();this.corePoolSize corePoolSize;this.maximumPoolSize maximumPoolSize;this.workQueue workQueue;this.keepAliveTime unit.toNanos(keepAliveTime);this.threadFactory threadFactory;this.handler handler;
}
corePoolSize核心线程数量会一直存在除非allowCoreThreadTimeOut设置为true;刚刚创建ThreadPoolExecutor的时候线程并不会立即创建而是要等到有任务提交时才会创建除非调用了prestartCoreThread/prestartAllCoreThreads事先创建核心线程。maximumPoolSize线程池允许的最大线程池数量。keepAliveTime线程数量超过corePoolSize空闲线程的最大超时时间。unit超时时间的单位。workQueue工作队列(阻塞队列BlockingQueue)保存未执行的Runnable 任务。推荐使用有界队列有界队列有助于避免资源耗尽的情况发生。 threadFactory创建线程的工厂类。可使用Executors.defaultThreadFactory()返回的是DefaultThreadFactory对象DefaultThreadFactory是Executors的静态内部类。handler拒绝策略当线程已满工作队列也满了的时候会被调用。 ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常。【默认】ThreadPoolExecutor.DiscardPolicy 丢弃任务但是不抛出异常。如果线程队列已满则后续提交的任务都会被丢弃且是静默丢弃。ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务然后重新提交被拒绝的任务。ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务【一般为主线程】。此时主线程将在一段时间内不能提交任何任务从而使工作线程处理正在执行的任务。此时提交的线程将被保存在TCP队列中TCP队列满将会影响客户端这是一种平缓的性能降低自定义拒绝策略 实现 RejectedExecutionHandler 接口。 2.2 生命周期管理
线程池运行的状态并不是用户显式设置的而是伴随着线程池的运行由内部来维护。在具体实现中线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起内部使用一个 AtomicInteger 类型的变量 ctl 维护两个值
运行状态(runState) 线程池的运行状态高3位保存。线程数量 (workerCount) 线程池内有效线程的数量低29位保存。
用一个变量去存储两个值可避免在做相关决策时出现不一致的情况不必为了维护两者的一致而占用锁资源。通过阅读线程池源代码也可以发现经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算方式相比于基本运算速度也会快很多。
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示
private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS Integer.SIZE - 3;//29
private static final int CAPACITY (1 COUNT_BITS) - 1;// runState is stored in the high-order bits
private static final int RUNNING -1 COUNT_BITS;
private static final int SHUTDOWN 0 COUNT_BITS;
private static final int STOP 1 COUNT_BITS;
private static final int TIDYING 2 COUNT_BITS;
private static final int TERMINATED 3 COUNT_BITS;// Packing and unpacking ctl
private static int runStateOf(int c) { return c ~CAPACITY; }
private static int workerCountOf(int c) { return c CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
高位111为 RUNNING 注RUNNING为负数最小负数在计算机中以二进制补码表示高位000为SHUTDOWN高位001表示STOP高位010表示TIDYING高位100表示TERMINATEDworkerCountOf() 方法用来取低29位的数值返回线程池的线程数。runStateOf() 方法则用来取高3位的数值返回当前线程池的状态。 ThreadPoolExecutor的运行状态有5种分别为 其生命周期转换如下入所示 2.3 任务执行机制
2.3.1 任务调度——execute方法
任务调度是线程池的主要入口当用户提交了一个任务接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。
首先所有任务的调度都是由 execute() 方法完成的这部分完成的工作是检查现在线程池的运行状态、运行线程数、运行策略决定接下来执行的流程是直接申请线程执行或是缓冲到队列中执行亦或是直接拒绝该任务。源码如下
#ThreadPoolExecutor
public void execute(Runnable command) {if (command null)throw new NullPointerException();int c ctl.get();//当前线程数是否小于核心线程数if (workerCountOf(c) corePoolSize) {//执行addWork提交为核心线程,提交成功return。提交失败重新获取ctl//注意此处addWorker(command, true)返回false的情况// 1.线程池状态判断// 2.线程数判断wc (core ? corePoolSize : maximumPoolSize))// 即当线程数核心线程数返回falseif (addWorker(command, true))return;c ctl.get();}// 上面是当线程数量小于corePoolSize的时候下面是针对线程数量大于等于corePoolSize// 线程池处于运行状态 任务添加到队列if (isRunning(c) workQueue.offer(command)) {int recheck ctl.get();//再次check一下当前线程池是否是运行状态如果不是运行时状态则把刚刚添加到workQueue中的command移除掉if (!isRunning(recheck) remove(command))reject(command); //调用拒绝策略else if (workerCountOf(recheck) 0) // 如果【当前活动的线程数】为0则执行addWork方法创建addWorker(null, false);}//线程池关闭或者往workQueue提交任务失败则尝试创建非核心线程若创建失败则reject任务//注意此处addWorker(command, false)返回false的情况// 1.线程池状态判断// 2.线程数判断wc (core ? corePoolSize : maximumPoolSize))// 即当线程数最大线程数返回falseelse if (!addWorker(command, false))reject(command);
}
其执行过程如下
如果 workerCount corePoolSize则调用 addWorker(command, true) 方法创建并启动一个核心线程来执行新提交的任务 创建成功返回true直接return 创建失败返回false继续往下执行当workerCount corePoolSizeaddWorker(firstTask, true) 方法返回false。addWorker创建失败有多种情况 线程池状态判断线程数判断wc (core ? corePoolSize : maximumPoolSize)) 即当workerCount corePoolSize返回false如果 workerCount corePoolSize 若线程池是否处于 RUNNING 状态且线程池内的阻塞队列未满则将任务添加到该阻塞队列中。加入阻塞队列后会再次检查线程池运行状态 如果不是运行状态则把刚刚添加到 workQueue 中的 command 移除掉并调用拒绝策略如果是运行状态则判断当前活动线程数是否为0为0则创建并启动一个非核心线程并且传入的任务对象为null。若线程池关闭或者往 workQueue 提交任务失败则尝试调用 addWorker(command, false) 创建非核心线程若创建失败则reject任务。addWorker创建失败有多种情况 线程池状态判断线程数判断wc (core ? corePoolSize : maximumPoolSize)) 即当workerCount maximumPoolSize返回false 2.3.2 任务缓冲——阻塞队列BlockingQueue
2.3.2.1 简单介绍 BlockingQueue
1Queue 对于Queue我们比较熟悉队列是一个先进先出的数据结构可以往队尾加入元素从队头获取或弹出一个元素常见的方法有
boolean add(E element) 往队尾插入元素如果超出capacity则抛出异常boolean offer(E element) 往队尾插入元素成功返回true失败capacity restrictions返回false。E pool() 从对头弹出一个元素如果队列为空则返回null。E peek() 获取队头元素但不弹出如果队列为空则返回null。 2BlockingQueue 对于阻塞队列 BlockingQueue因为继承了Queue接口所以具备Queue的先进先出属性但是也对一系列方法进行了增加了重写常见方法有
boolan add(E element) 往队尾插入元素如果超出capacity则抛出异常。注意此方法不会阻塞。 【ps:和Queue方法含义一致】boolean offer(E element) 往队尾插入元素成功返回true失败capacity restrictions返回false 注意此方法不会阻塞。【ps:和Queue方法含义一致】void put(E element) 往队尾插入元素阻塞到直至加入成功为止因为阻塞的语义此方法可能抛出InterruptedException。boolean offer(E elementlong timeout,TimeUnit unit) 类似于put方法但是只会阻塞指定时间到时候没插入成功则返回false。E take() 获取并弹出队头元素阻塞直到成功为止E poll(long timeout,TimeUnit uint) 获取并弹出队头元素只会阻塞指定时间如果时间到则返回null。 2.3.2.2 任务缓冲
任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理而做到这一点最关键的思想就是将任务和线程两者解耦不让两者直接关联才可以做后续的分配工作。线程池中是以生产者消费者模式通过一个阻塞队列来实现的。阻塞队列缓存任务工作线程从阻塞队列中获取任务。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是
在队列为空时获取元素的线程会等待队列变为非空。当队列满时存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景生产者是往队列里添加元素的线程消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器而消费者也只从容器里拿元素。
下图中展示了线程1往阻塞队列中添加元素而线程2从阻塞队列中移除元素 使用不同的队列可以实现不一样的任务存取策略。在这里我们可以再介绍下阻塞队列的成员 2.3.3 任务申请——getTask方法
由上文的任务分配部分可知任务的执行有两种可能
任务直接由新创建的线程执行execute(command)方法内仅出现在线程初始创建的时候。线程从阻塞队列中获取任务然后执行runWorker()方法内的while循环里执行完任务的空闲线程会再次去从队列中申请任务再去执行是线程获取任务绝大多数的情况。
线程需要从任务缓存模块中不断地取任务执行帮助线程从阻塞队列中获取任务实现线程管理模块和任务管理模块之间的通信。这部分策略由 getTask() 方法实现源码如下
private Runnable getTask() {// 表示上次从阻塞队列中获取任务是否超时boolean timedOut false;for (; ; ) {int c ctl.get();int rs runStateOf(c);// Check if queue empty only if necessary./*** 同时满足如下两点则线程池中工作线程数减1并返回null** 1 rs SHUTDOWN表示线程池不是RUNNING状态* 2 rs STOP 表示STOP、TIDYING和TERMINATED这三个状态它们共同点就是【不接收新任务】也【不处理workQueue里的线程任务】or 阻塞队列workQueue为空*/if (rs SHUTDOWN (rs STOP || workQueue.isEmpty())) {decrementWorkerCount(); // 线程池中工作线程数减1return null;}int wc workerCountOf(c);// timed用于判断是否需要进行超时控制当allowCoreThreadTimeOut被设置为ture或者活跃线程数大于核心线程数则需要进行超时控制// allowCoreThreadTimeOut默认为false则表明核心线程不允许超时boolean timed allowCoreThreadTimeOut || wc corePoolSize;/*** 同时满足以下两种情况则线程池中工作线程数减1并返回null* case1当前活动线程数workCount大于最大线程数或者需要超时控制timedtrue并且上次从阻塞队列中获取任务发生了超时timedOuttrue* case2如果有效线程数大于1或者阻塞队列为空。*/if ((wc maximumPoolSize // 因为在执行该方法的同时被执行了setMaximumPoolSize导致最大线程数被缩小|| (timed timedOut)) (wc 1 || workQueue.isEmpty())) { //if (compareAndDecrementWorkerCount(c)) { // 线程池中工作线程数减1return null;}// 如果减1失败则循环重试continue;}try {// 如果需要超时控制则通过阻塞队列的poll方法进行超时控制// 否则直接获取如果队列为空task方法会阻塞直到队列不为空Runnable r timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // poll --若队列为空返回nullworkQueue.take(); // take -- 若队列为空发生阻塞等待元素if (r ! null) {return r;}// 如果rnull表示超时了则timeOut设置为true标记为上一次超时状态timedOut true;} catch (InterruptedException retry) {timedOut false;}}
}
其执行流程如下图所示 getTask() 这部分进行了多次判断为的是控制线程的数量使其符合线程池的状态。如果线程池现在不应该持有那么多线程则会返回null值。工作线程 Worker 会不断接收新任务去执行而当工作线程 Worker 接收不到任务的时候就会开始被回收。 2.3.4 任务拒绝
任务拒绝模块是线程池的保护部分线程池有一个最大的容量当线程池的任务缓存队列已满并且线程池中的线程数目达到 maximumPoolSize 时就需要拒绝掉该任务采取任务拒绝策略保护线程池。
拒绝策略是一个接口其设计如下
public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
用户可以通过实现这个接口去定制拒绝策略也可以选择JDK提供的四种已有拒绝策略其特点如下 2.4 Worker线程管理
2.4.1 Worker线程
线程池为了掌握线程的状态并维护线程的生命周期设计了线程池内的工作线程Worker。
首先在 ThreadPoolExecutor 中定义了一个 workers 集合去管理工作线程Worker 代码如下
private final HashSetWorker workers new HashSetWorker();
我们来看一下工作线程Worker的部分代码
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{final Thread thread;//Worker持有的线程由ThreadFactory创建Runnable firstTask;//初始化的任务可以为null// 记录每个线程完成的任务总数volatile long completedTasks;// 唯一的构造函数传入任务实例firstTask注意可以为nullWorker(Runnable firstTask) {// 禁止线程中断直到runWorker()方法执行setState(-1); // inhibit interrupts until runWorkerthis.firstTask firstTask;// 通过ThreadFactory创建线程实例注意一下Worker实例自身作为Runnable用于创建新的线程实例this.thread getThreadFactory().newThread(this);}// 委托到外部的runWorker()方法注意runWorker()方法是线程池的方法而不是Worker的方法public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.// 是否持有独占锁state值为1的时候表示持有锁state值为0的时候表示已经释放锁protected boolean isHeldExclusively() {return getState() ! 0;}// 独占模式下尝试获取资源这里没有判断传入的变量直接CAS判断0更新为1是否成功成功则设置独占线程为当前线程protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}// 独占模式下尝试释放资源这里没有判断传入的变量直接把state设置为0protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}// 加锁public void lock() { acquire(1); }// 尝试加锁public boolean tryLock() { return tryAcquire(1); }// 解锁public void unlock() { release(1); }// 是否锁定public boolean isLocked() { return isHeldExclusively(); }// 启动后进行线程中断注意这里会判断线程实例的中断标志位是否为false只有中断标志位为false才会中断void interruptIfStarted() {Thread t;if (getState() 0 (t thread) ! null !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}
} Worker 这个工作线程实现了 Runnable 接口并持有一个线程 thread 和一个初始化的任务 firstTask
thread 是在调用构造方法时通过ThreadFactory 来创建的Worker所持有的线程一个thread可以用来执行任务【在addWorker()会调用thread.start()】firstTask 用它来保存传入的第一个任务一个 Runable这个任务可以有也可以为null。如果这个值是非空的那么线程就会在启动初期立即执行这个任务也就对应核心线程创建时的情况如果这个值是null那么就需要创建一个线程去执行任务列表workQueue中的任务也就是非核心线程的创建。
Worker执行任务的模型如下图所示 线程池需要管理线程的生命周期需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。
Worker是通过继承AQS使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock而是使用AQS为的就是实现不可重入的特性去反应线程现在的执行状态。
lock() 方法一旦获取了独占锁表示当前线程正在执行任务中。如果正在执行任务则不应该中断线程。如果该线程现在不是独占锁的状态也就是空闲的状态说明它没有在处理任务这时可以对该线程进行中断。线程池在执行 shutdown() 方法或 tryTerminate() 方法时会调用 interruptIdleWorkers() 方法来中断空闲的线程interruptIdleWorkers() 方法会使用 tryLock() 方法来判断线程池中的线程是否是空闲状态如果线程是空闲状态则可以安全回收。
在线程回收过程中就使用到了这种特性回收过程如下图所示 2.4.2 Worker线程增加——addWorker方法
增加线程是通过线程池中的 ThreadPoolExecutor#addWorker() 方法该方法的功能就是增加一个线程该方法不考虑线程池是在哪个阶段增加的该线程这个分配线程的策略是在上个步骤完成的该步骤仅仅完成增加线程并使它运行最后返回是否成功这个结果。
addWorker(firstTask, core) 方法有两个参数
firstTask 参数 用于指定新增的线程执行的第一个任务该参数可以为空core 参数 为true表示在新增线程时会判断当前活动线程数是否小于 corePoolSizefalse表示新增线程前需要判断当前活动线程数是否小于maximumPoolSize。
我们看看 addWorker() 方法的源码提交任务时触发
/*** param firstTask 需要执行的Runnable线程* param core true新增线程时【当前活动的线程数】是否 corePoolSize* false新增线程时【当前活动的线程数】是否 maximumPoolSize*/
private boolean addWorker(Runnable firstTask, boolean core) {retry:/*** 步骤一试图将workerCount1*/for (;;) {int c ctl.get();// 获得运行状态runStateint rs runStateOf(c);/*** 只有如下两种情况可以新增worker继续执行下去* case one rs RUNNING* case two rs SHUTDOWN firstTask null !workQueue.isEmpty()*/if (rs SHUTDOWN // 即非RUNNING状态请查看isRunning()方法。线程池异常表示不再去接收新的线程任务了返回false/*** 当线程池是SHUTDOWN状态时表示不再接收新的任务了所以* case1如果firstTask!null表示要添加新任务则新增worker失败返回false。* case2如果firstTasknull并且workQueue为空表示队列中的任务已经处理完毕不需要添加新任务了。则新增worker失败返回false*/!(rs SHUTDOWN firstTask null !workQueue.isEmpty())) {return false;}for (; ; ) {// 获得当前线程池里的线程数int wc workerCountOf(c);/*** 满足如下任意情况则新增worker失败返回false* case1大于等于最大线程容量即int CAPACITY 00011111111111111111111111111111 536870911十进制* case2当core是true时 核心线程数* 当core是false时 最大线程数*/if (wc CAPACITY || wc (core ? corePoolSize : maximumPoolSize)) {return false;}// 当前工作线程数加1if (compareAndIncrementWorkerCount(c)) {break retry; // 成功加1则跳出retry标识的这两层for循环}// 如果线程数加1操作失败则获取当前最新的线程池运行状态c ctl.get();// 判断线程池运行状态rs是否改变如果不同则说明方法处理期间线程池运行状态发生了变化重新获取最新runStateif (runStateOf(c) ! rs) {continue retry; // 跳出内层for循环继续从第一个for循环执行}}}/*** 步骤二workerCount成功1后创建Worker加入集合workers中并启动Worker线程*/boolean workerStarted false; // 用于判断新的worker实例是否已经开始执行Thread.start()boolean workerAdded false; // 用于判断新的worker实例是否已经被添加到线程池的workers队列中Worker w null; // AQS.Workertry {// 创建Worker实例每个Worker对象都会针对入参firstTask来创建一个线程。w new Worker(firstTask);// 从Worker中获得新建的线程tfinal Thread t w.thread;if (t ! null) {// 重入锁final ReentrantLock mainLock this.mainLock;/** ----------lock() 尝试加锁操作获得锁后继续执行没获得则等待直到获得锁为止---------- */mainLock.lock();try {// 获得线程池当前的运行状态runStatusint rs runStateOf(ctl.get());/*** 满足如下任意条件即可向线程池中添加线程* case1线程池状态为RUNNING。请查看isRunning()方法* case2线程池状态为SHUTDOWN并且firstTask为null。*/if (rs SHUTDOWN || (rs SHUTDOWN firstTask null)) {// 因为t是新构建的线程还没有启动。所以如果是alive状态说明已经被启动了则抛出异常。if (t.isAlive()) {throw new IllegalThreadStateException();}// workers中保存线程池中存在的所有work实例集合workers.add(w);int s workers.size();if (s largestPoolSize) { // largestPoolSize用于记录线程池中曾经存在的最大的线程数量largestPoolSize s;}workerAdded true;}} finally {/** ----------unlock 解锁操作---------- */mainLock.unlock();}if (workerAdded) {/** 开启线程执行Worker.run() */t.start();workerStarted true;}}} finally {// 如果没有开启线程if (!workerStarted) {addWorkerFailed(w); // 往线程池中添加worker失败了}}return workerStarted;
}
主要步骤如下
试图将workerCount1 workerCount成功1后创建Worker加入集合workers中并启动Worker线程【在addWorker()会调用thread.start()】 。
其执行流程如下图所示 2.4.3 Worker线程执行任务——runWorker方法
在 Worker#run() 方法调用了 ThreadPoolExecutor#runWorker() 方法来执行任务代码如下
//ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService {//......//ThreadPoolExecutor#Workerprivate final class Worker extends AbstractQueuedSynchronizer implements Runnable {final Thread thread;Runnable firstTask;volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask firstTask;this.thread getThreadFactory().newThread(this);}public void run() {runWorker(this);}}final void runWorker(Worker w) {// 获取当前线程实际上和Worker持有的线程实例是相同的Thread wt Thread.currentThread();// 获取Worker中持有的初始化时传入的任务对象这里注意存放在临时变量task中Runnable task w.firstTask;// 设置Worker中持有的初始化时传入的任务对象为nullw.firstTask null;// 由于Worker初始化时AQS中state设置为-1这里要先做一次解锁把state更新为0允许线程中断w.unlock(); // allow interrupts// 记录线程是否因为用户异常终结默认是trueboolean completedAbruptly true;try {while (task ! null || (task getTask()) ! null) { //循环获取任务w.lock();/*** 如果线程池正在停止请确保线程被中断否则请确保线程不被中断。* 这需要在第二种情况下重新检查以处理shutdownNow竞赛同时清除中断** 同时满足如下两个条件则执行wt.interrupt()* 1.线程状态为STOP、TIDYING、TERMINATED 或者 当前线程被中断清除中断标记并且线程状态为STOP、TIDYING、TERMINATED* 2.当前线程wt没有被标记中断*/if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() runStateAtLeast(ctl.get(), STOP))) !wt.isInterrupted()) {wt.interrupt();}try {// 钩子方法任务执行前beforeExecute(wt, task);Throwable thrown null;try {task.run(); // 执行任务真正做事儿的地方} catch (RuntimeException x) {thrown x;throw x;} catch (Error x) {thrown x;throw x;} catch (Throwable x) {thrown x;throw new Error(x);} finally {// 钩子方法任务执行后afterExecute(task, thrown);}} finally {task null;// 执行完成后自增当前工作线程执行的任务数量w.completedTasks;// Worker解锁本质是AQS释放资源设置state为0w.unlock();}}// 如果线程能够执行到最后一行代表线程执行过程中没有由于发生异常导致跳出循环将 突然结束 标志改为falsecompletedAbruptly false;} finally {// 处理线程退出completedAbruptly为true说明由于用户异常导致线程非正常退出// 线程回收消除自身在线程池内的引用processWorkerExit(w, completedAbruptly);}}//......}
runWorker() 方法的执行过程如下
while 循环不断地通过 getTask() 方法获取任务。 getTask()方法从阻塞队列中取任务如果线程池正在停止请确保线程是中断状态否则请确保线程不是中断状态。执行任务。 task.run()如果 getTask() 结果为null则跳出循环执行 processWorkerExit() 方法销毁线程线程引用移出线程池。
执行流程如下图所示 2.4.4 Worker线程回收——processWorkerExit方法
线程池中线程的销毁依赖JVM自动的回收线程池做的工作是根据当前线程池的状态维护一定数量的线程引用防止这部分线程被JVM回收当线程池决定哪些线程需要回收时只需要将其引用消除即可。Worker被创建出来后就会不断地进行轮询然后获取任务去执行核心线程可以无限等待获取任务非核心线程要限时获取任务。当 Worker 无法获取到任务也就是获取的任务为空时循环会结束Worker 会主动消除自身在线程池内的引用。
我们看 Worker 的部分源码
//ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService {//......//ThreadPoolExecutor#Workerprivate final class Worker extends AbstractQueuedSynchronizer implements Runnable {final Thread thread;Runnable firstTask;volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask firstTask;this.thread getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}}final void runWorker(Worker w) {//......try {while (task ! null || (task getTask()) ! null) { //循环获取任务获取不到跳出循环//......task.run(); // 执行任务真正做事儿的地方//......}completedAbruptly false;} finally {processWorkerExit(w, completedAbruptly); //线程回收消除自身在线程池内的引用}}//消除自身在线程池内的引用private void processWorkerExit(Worker w, boolean completedAbruptly) {// 如果completedAbruptlyfalse说明是由getTask返回null导致的WorkerCount递减的操作已经执行 // 如果completedAbruptlytrue说明是由执行任务的过程中发生异常导致需要进行WorkerCount递减的操作if (completedAbruptly) {decrementWorkerCount(); // 将workerCount减1}final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// 计算任务的完成数量: 全局的已完成任务记录数加上此将要终结的Worker中的已完成任务数completedTaskCount w.completedTasks;// 将worker从线程池中移除workers.remove(w);} finally {mainLock.unlock();}//根据线程池状态判断是否结束线程池tryTerminate();int c ctl.get();// runState是RUNNING或SHUTDOWNif (runStateLessThan(c, STOP)) {// 1.如果线程不是由于抛出用户异常终结如果允许核心线程超时则保持线程池中至少存在一个工作线程// 2.如果线程由于抛出用户异常终结或者当前工作线程数那么直接添加一个新的非核心线程if (!completedAbruptly) {// 如果允许核心线程超时最小值为0否则为corePoolSizeint min allowCoreThreadTimeOut ? 0 : corePoolSize;// 如果最小值为0同时任务队列不空则更新最小值为1if (min 0 !workQueue.isEmpty()) {min 1;}// 工作线程数大于等于最小值直接返回不新增非核心线程if (workerCountOf(c) min) {return; // replacement not needed}}addWorker(null, false);}}//......
}
线程回收的工作是在 processWorkerExit() 方法完成的 事实上在这个方法中将线程引用移出线程池【workers.remove(w)】就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多线程池还要判断是什么引发了这次销毁是否要改变线程池的现阶段状态是否要根据新状态重新分配线程。
processWorkerExit()执行完毕之后意味着该工作线程的生命周期已经完结 2.4.4.1 tryTerminate方法——是否结束线程池
在上文 ThreadPoolExecutor#processWorkerExit() 方法中调用了 tryTerminate() 方法
//ThreadPoolExecutor
//消除自身在线程池内的引用
private void processWorkerExit(Worker w, boolean completedAbruptly) {//将worker从线程池中移除......//根据线程池状态判断是否结束线程池tryTerminate();//根据新状态重新分配线程......
}//根据线程池状态判断是否结束线程池
final void tryTerminate() {for (;;) {int c ctl.get(); // 获取ctl// 如果线程池运行状态是RUNNING或者大于等于TIDYING或者运行状态为SHUTDOWN且队列为非空则直接return返回// 满足此条件直接return不终结线程池if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) SHUTDOWN ! workQueue.isEmpty()))return;// 如果满足终结线程池的条件且工作线程数不为0则中断一个空闲的工作线程去确保线程池关闭的信号得以传播并returnif (workerCountOf(c) ! 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// 尝试将线程池状态设置为TIDYING状态if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {//如果CAS成功,执行terminated()钩子方法terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}
} 2.4.4.2 interruptIdleWorkers方法——中断空闲工作线程
源码如下
//ThreadPoolExecutor
private void interruptIdleWorkers() {interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t w.thread;//若线程没有被中断且tryLock成功空闲则中断该线程if (!t.isInterrupted() w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}
interruptIdleWorkers(onlyOne) 若传入 onlyOne 为true则在中断一个工作线程后跳出循环。若传入 onlyOne 为false则遍历中断 workers 中所有的工作线程。interruptIdleWorkers() 内部调用 interruptIdleWorkers(false) 。 2.5 线程池关闭——shutdown()方法
设置线程池的状态为 SHUTDOWN并停止接收新任务。线程池中的工作线程会继续执行已有的任务直到这些任务执行完毕。当所有任务都执行完毕后线程池中的线程才会被关闭。
源码如下
//ThreadPoolExecutor
public void shutdown() {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// 权限校验安全策略相关判断checkShutdownAccess();// 设置SHUTDOWN状态advanceRunState(SHUTDOWN);// 中断所有的空闲的工作线程interruptIdleWorkers();// 钩子方法onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}// 调用上面分析果敢的尝试terminate方法使状态更变为TIDYING执行钩子方法terminated()后最终状态更新为TERMINATEDtryTerminate();
} 三、JDK提供的原生线程池
在 Java 中JDK通过 Executors 类为我们提供了四种封装好的线程池类型如下所示
//Executors
//创建一个定长的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable());
}//创建一个单线程的线程池
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable()));
}//创建一个可缓存支持灵活回收的线程池
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueueRunnable());
}//创建一个支持周期执行任务的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
} 四、线程池在业务中的实践
4.1 业务背景
在当今的互联网业界为了最大程度利用CPU的多核性能并行运算的能力是不可或缺的。通过线程池管理线程获取并发性是一个非常基础的操作让我们来看两个典型的使用线程池获取并发性的场景。
场景1快速响应用户请求
描述用户发起的实时请求服务追求响应时间。比如说用户要查看一个商品的信息那么我们需要将商品维度的一系列信息如商品的价格、优惠、库存、图片等等聚合起来展示给用户。
分析从用户体验角度看这个结果响应的越快越好如果一个页面半天都刷不出用户可能就放弃查看这个商品了。而面向用户的功能聚合通常非常复杂伴随着调用与调用之间的级联、多级级联等情况业务开发同学往往会选择使用线程池这种简单的方式将调用封装成任务并行的执行缩短总体响应时间。另外使用线程池也是有考量的这种场景最重要的就是获取最大的响应速度去满足用户所以应该不设置队列去缓冲并发任务调高corePoolSize和maxPoolSize去尽可能创造多的线程快速执行任务。 场景2快速处理批量任务
描述离线的大量计算任务需要快速执行。比如说统计某个报表需要计算出全国各个门店中有哪些商品有某种属性用于后续营销策略的分析那么我们需要查询全国所有门店中的所有商品并且记录具有某属性的商品然后快速生成报表。
分析这种场景需要执行大量的任务我们也会希望任务执行的越快越好。这种情况下也应该使用多线程策略并行计算。但与响应速度优先的场景区别在于这类场景任务量巨大并不需要瞬时的完成而是关注如何使用有限的资源尽可能在单位时间内处理更多的任务也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务调整合适的corePoolSize去设置处理任务的线程数。在这里设置的线程数过多可能还会引发线程上下文切换频繁的问题也会降低处理任务的速度降低吞吐量。 4.2 实际问题及方案思考——参数如何配置
线程池使用面临的核心的问题在于线程池的参数并不好配置。一方面线程池的运行机制不是很好理解配置合理需要强依赖开发人员的个人经验和知识另一方面线程池执行的情况和任务类型相关性较大IO密集型和CPU密集型的任务运行起来的情况差异非常大这导致业界并没有一些成熟的经验策略帮助开发人员参考。
关于线程池配置不合理引发的故障公司内部有较多记录下面举一些例子
Case12018年XX页面展示接口大量调用降级
事故描述XX页面展示接口产生大量调用降级数量级在几十到上百。
事故原因该服务展示接口内部逻辑使用线程池做并行计算由于没有预估好调用的流量导致最大线程数和阻塞队列设置偏小大量抛出RejectedExecutionException触发接口降级条件示意图如下 Case22018年XX业务服务不可用S2级故障
事故描述XX业务提供的服务执行时间过长作为上游服务整体超时大量下游服务调用失败。
事故原因该服务处理请求内部逻辑使用线程池做资源隔离由于队列设置过长最大线程数设置失效导致请求数量增加时大量任务堆积在队列中任务执行时间过长最终导致下游服务的大量调用超时失败。示意图如下 业务中要使用线程池而使用不当又会导致故障那么我们怎样才能更好地使用线程池呢针对这个问题我们下面延展几个方向
1. 能否不用线程池?
回到最初的问题业务使用线程池是为了获取并发性对于获取并发性是否可以有什么其他的方案呢替代我们尝试进行了一些其他方案的调研 综合考虑这些新的方案都能在某种情况下提升并行任务的性能然而本次重点解决的问题是如何更简易、更安全地获得的并发性。另外Actor模型的应用实际上甚少只在Scala中使用广泛协程框架在Java中维护的也不成熟。这三者现阶段都不是足够的易用也并不能解决业务上现阶段的问题。
2. 追求参数设置合理性
有没有一种计算公式能够让开发同学很简易地计算出某种场景中的线程池应该是什么参数呢
带着这样的疑问我们调研了业界的一些线程池参数配置方案 调研了以上业界方案后我们并没有得出通用的线程池计算方式。并发任务的执行情况和任务类型相关IO密集型和CPU密集型的任务运行起来的情况差异非常大但这种占比是较难合理预估的这导致很难有一个简单有效的通用公式帮我们直接计算出结果。
3. 线程池参数动态化
尽管经过谨慎的评估仍然不能够保证一次计算出来合适的参数那么我们是否可以将修改线程池参数的成本降下来这样至少可以发生故障的时候可以快速调整从而缩短故障恢复的时间呢基于这个思考我们是否可以将线程池的参数从代码中迁移到分布式配置中心上实现线程池参数可动态配置和即时生效线程池参数动态化前后的参数修改流程对比如下 基于以上三个方向对比我们可以看出参数动态化方向简单有效。 4.3 动态化线程池
4.3.1 整体设计
动态化线程池的核心设计包括以下三个方面
简化线程池配置线程池构造参数有8个但是最核心的是3个corePoolSize、maximumPoolSizeworkQueue它们最大程度地决定了线程池的任务分配和线程分配策略。考虑到在实际应用中我们获取并发性的场景主要是两种 并行执行子任务提高响应速度。这种情况下应该使用同步队列没有什么任务应该被缓存下来而是应该立即执行。并行执行大批次任务提升吞吐量。这种情况下应该使用有界队列使用队列去缓冲大批量的任务队列容量必须声明防止任务无限制堆积。所以线程池只需要提供这三个关键参数的配置并且提供两种队列的选择就可以满足绝大多数的业务需求Less is More。参数可动态修改为了解决参数不好配修改参数成本高等问题。在Java线程池留有高扩展性的基础上封装线程池允许线程池监听同步外部的消息根据消息进行修改配置。将线程池的配置放置在平台侧允许开发同学简单的查看、修改线程池配置。增加线程池监控对某事物缺乏状态的观测就对其改进无从下手。在线程池执行任务的生命周期添加监控能力帮助开发了解线程池状态。 4.3.2 功能架构
动态化线程池提供如下功能
动态调参支持线程池参数动态调整、界面化操作包括修改线程池核心大小、最大核心大小、队列长度等参数修改后及时生效。 任务监控支持应用粒度、线程池粒度、任务粒度的Transaction监控可以看到线程池的任务执行情况、最大任务执行时间、平均任务执行时间、95/99线等。 负载告警线程池队列任务积压到一定值的时候会通过大象美团内部通讯工具告知应用开发负责人当线程池负载数达到一定阈值的时候会通过大象告知应用开发负责人。操作监控创建/修改和删除线程池都会通知到应用的开发负责人。 操作日志可以查看线程池参数的修改记录谁在什么时候修改了线程池参数、修改前的参数值是什么。 权限校验只有应用开发负责人才能够修改应用的线程池参数。 4.3.2.1 参数动态化
JDK原生线程池ThreadPoolExecutor提供了如下几个public的setter方法如下图所示 JDK允许线程池使用方通过 ThreadPoolExecutor 的实例来动态设置线程池的核心策略以setCorePoolSize() 方法为例在运行期线程池使用方调用此方法设置 corePoolSize 之后线程池会直接覆盖原来的 corePoolSize 值并且基于当前值和原始值的比较结果采取不同的处理策略。对于当前值小于当前工作线程数的情况说明有多余的worker线程此时会向当前idle空闲的的worker线程发起中断请求以实现回收多余的worker在下次idel的时候也会被回收对于当前值大于原始值且当前队列中有待执行任务则线程池会创建新的worker线程来执行队列任务setCorePoolSize() 具体流程如下 线程池内部会处理好当前状态做到平滑修改其他几个方法限于篇幅这里不一一介绍。重点是基于这几个public方法我们只需要维护 ThreadPoolExecutor 的实例并且在需要修改的时候拿到实例修改其参数即可。基于以上的思路我们实现了线程池参数的动态化、线程池参数在管理平台可配置可修改其效果图如下图所示 用户可以在管理平台上通过线程池的名字找到指定的线程池然后对其参数进行修改保存后会实时生效。目前支持的动态参数包括核心数、最大值、队列长度等。除此之外在界面中我们还能看到用户可以配置是否开启告警、队列等待任务告警阈值、活跃度告警等等。关于监控和告警我们下面一节会对齐进行介绍。
4.3.2.2 线程池监控
除了参数动态化之外为了更好地使用线程池我们需要对线程池的运行状况有感知比如当前线程池的负载是怎么样的分配的资源够不够用任务的执行情况是怎么样的是长任务还是短任务基于对这些问题的思考动态化线程池提供了多个维度的监控和告警能力包括线程池活跃度、任务的执行Transaction频率、耗时、Reject异常、线程池内部统计信息等等既能帮助用户从多个维度分析线程池的使用情况又能在出现问题第一时间通知到用户从而避免故障或加速故障恢复。
1. 负载监控和告警
线程池负载关注的核心问题是基于当前线程池参数分配的资源够不够。对于这个问题我们可以从事前和事中两个角度来看。事前线程池定义了“活跃度”这个概念来让用户在发生Reject异常之前能够感知线程池负载问题线程池活跃度计算公式为线程池活跃度 activeCount/maximumPoolSize。这个公式代表当活跃线程数趋向于maximumPoolSize的时候代表线程负载趋高。事中也可以从两方面来看线程池的过载判定条件一个是发生了Reject异常一个是队列中有等待任务支持定制阈值。以上两种情况发生了都会触发告警告警信息会通过大象推送给服务所关联的负责人。 2. 任务级精细化监控
在传统的线程池应用场景中线程池中的任务执行情况对于用户来说是透明的。比如在一个具体的业务场景中业务开发申请了一个线程池同时用于执行两种任务一个是发消息任务、一个是发短信任务这两类任务实际执行的频率和时长对于用户来说没有一个直观的感受很可能这两类任务不适合共享一个线程池但是由于用户无法感知因此也无从优化。动态化线程池内部实现了任务级别的埋点且允许为不同的业务任务指定具有业务含义的名称线程池内部基于这个名称做Transaction打点基于这个功能用户可以看到线程池内部任务级别的执行情况且区分业务任务监控示意图如下图所示 3. 运行时状态实时查看
用户基于JDK原生线程池 ThreadPoolExecutor 提供的几个public的getter方法可以读取到当前线程池的运行状态以及参数如下图所示 动态化线程池基于这几个接口封装了运行时状态实时查看的功能用户基于这个功能可以了解线程池的实时状态比如当前有多少个工作线程执行了多少个任务队列中等待的任务数等等。效果如下图所示