当前位置: 首页 > news >正文

优购物官方网站app全国最好的广告公司加盟

优购物官方网站app,全国最好的广告公司加盟,外贸网站和内贸,精品源码分享免费下载ThreadPoolExecutor 状态 ThreadPoolExecutor 继承了 AbstractExecutorService,并实现了 ExecutorService 接口,用于管理线程。内部使用了原子整型 AtomicInteger ctl 来表示线程池状态和 Worker 数量。前 3 位表示线程池状态,后 29 位表示 …

ThreadPoolExecutor 状态

ThreadPoolExecutor 继承了 AbstractExecutorService,并实现了 ExecutorService 接口,用于管理线程。内部使用了原子整型 AtomicInteger ctl 来表示线程池状态和 Worker 数量。前 3 位表示线程池状态,后 29 位表示 Worker 数量。

public class ThreadPoolExecutor extends `AbstractExecutorService` {private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY   = (1 << COUNT_BITS) - 1;private static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;private static int runStateOf(int c)     { return c & ~CAPACITY; }private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean runStateLessThan(int c, int s) {return c < s;}private static boolean runStateAtLeast(int c, int s) {return c >= s;}private static boolean isRunning(int c) {return c < SHUTDOWN;}......}

由源码中的常量定义可知,ThreadPoolExecutor 有 5 种线程池状态:

  • RUNNING:线程池接收新任务,会执行任务阻塞队列中的任务,ctl 前三位为 111
  • SHUTDOWN:线程池拒绝接收新任务,会执行任务阻塞队列中的任务,ctl 前三位为 000
  • STOP:线程池拒绝接收新任务,不会执行任务阻塞队列中的任务,尝试中断正在执行的任务,ctl 前三位为 001
  • TIDYING:所有任务被关闭,Worker 数量为 0,ctl 前三位为 010
  • TERMINATED:方法 terminated() 执行完毕,ctl 前三位为 011

通过 ctl 的结构可知,其前三位取值对应的线程池状态满足以下关系:

RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

因此 runStateLessThan()runStateAtLeast()isRunning() 方法可以很方便的对线程池状态进行判断,不需要考虑当前 Worker 的具体数量。

img

执行任务源码分析

执行任务的方法为 execute(),其源码如下:

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 1.如果正在运行的线程数少于核心线程数,则尝试以给定的操作作为其第一个任务以启动一个新线程。addWorker()的调用以原子方式检查了 runState 和 workerCount,从而防止由于误报 false 而导致的线程增加if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 2.如果一个任务成功加入了阻塞队列,那么我们仍然需要仔细检查是否应该添加一个线程(因为自上次检查以来原有的线程可能已死亡)或自进入此方法以来,线程池已关闭。所以如果重新检查状态已停止,有必要则直接回滚入队操作,或者如果没有正在运行的线程,则启动一个新线程。if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (!isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3.如果无法将任务加入阻塞队列,那么我们尝试添加一个新的线程。 如果失败了,那我们就知道任务线程池已经关闭或饱和,因此拒绝任务。else if (!addWorker(command, false))reject(command);
}

execute() 方法根据 Worker 的数量和线程池状态来决定是新建 Worker 来执行任务,还是将任务添加到任务阻塞队列中。任务无法成功添加到阻塞队列或者新建 Worker 来执行任务失败,则执行任务拒绝策略。

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {// 线程池不为 RUNNING 状态已停止工作,或为 SHUTDOWN 同时阻塞队列为空,则 Worker 创建失败if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {// 当前 Worker 数量大于 size,则拒绝创建 Worker。其中 core 为 true 对比核心线程数,core 为 false 则比对最大线程数。if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;// 以 CAS 方式将 Worker 数量加1,成功则跳出 retry 循环if (compareAndIncrementWorkerCount(c))break retry;// CAS 追加 Worker 数量失败,重新获取 ctl。c = ctl.get();  // 线程池状态改变,则需要基于新的线程池状态,重新执行外层循环来判断是否允许创建 Worker// 线程池状态不变,则继续执行内层循环,再次尝试以 CAS 方式增加 Worker 数量if (runStateAtLeast(c, SHUTDOWN))continue retry;}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {// 存储 Worker 的 HashSet 需要获取全局锁来保证添加 Worker 时的线程安全final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 在获取全局锁之后,再次获取线程池最新的状态int c = ctl.get();// 线程池状态为 RUNNING 或 SHUTDOWN 同时创建 Worker 的初始任务为 nullif (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {// 避免 Worker 的线程被重复启动(处于活动状态的线程无法再次启动)if (t.isAlive())throw new IllegalThreadStateException();workers.add(w);// 记录线程池存在过的最大 Worker 数int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

addWorker() 方法中只有以下两种情况下可以创建 Worker:

  • 线程池状态为 RUNNIGNG
  • 线程池状态为 SHUTDOWN,且任务阻塞队列不为空,可以创建初始任务为 null 的 Worker

当 Worker 创建成功后,其线程就会启动,如果 Worker 创建失败或者线程启动失败,则会调用回滚方法 addWorkerFailed(),源码实现如下:

private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)workers.remove(w);decrementWorkerCount();tryTerminate();} finally {mainLock.unlock();}
}

在这个过程中所添加的 Worker 实体,实现了 Runnable 接口,因此它实际上就是一个任务,其线程执行的任务就是它本身。所以在 addWorker() 方法中启动线程时,会调用 Worker 的 run() 方法,实际上内部就是调用了 runWorker() 方法,源码实现如下:

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;// 释放了锁,以支持外部中断任务w.unlock();// 是否属于被中断的状态标识boolean completedAbruptly = true;try {// 获取 Worker 当前任务,或者从阻塞队列中取出下一个任务while (task != null || (task = getTask()) != null) {w.lock();// 当线程池为 STOPPING 状态时,要确保当前线程是被中断的(为了确保 shutdownNow() 方法在中断任务时能正确处理)if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}

从循环的条件可以看出,Worker 启动后,会首先执行自己的初始任务,然后再去任务阻塞队列中获取任务。任务流程执行结束或被中断后,则会调用 processWorkerExit() 方法。当 Worker 任务执行异常,或获取的任务最终为 null 时,该方法会将当前 Worker 从集合中删除,并尝试终止线程池。processWorkerExit() 方法的源码实现如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {// 异常中断的任务,Worker 数量没有及时调整,需要修正if (completedAbruptly)decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}// 尝试终止线程池tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {// 如果线程池仍在可运行状态,由异常中断的导致被移除的 Worker 需要新增一个 null 任务来替换,以保持核心线程池数目或者阻塞队列的状态一致if (!completedAbruptly) {          int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}
}

总结下来,ThreadPoolExecutor 执行任务主要由以下几步:

  1. 判断当前线程池状态是否允许创建 Worker

    • 线程池状态为 RUNNIGNG,可以创建 Worker
    • 线程池状态为 SHUTDOWN,且任务阻塞队列不为空,可以创建初始任务为 null 的 Worker
  2. 按照以下规则判断如何添加 Worker(执行任务):

    • 如果 Worker 数量小于核心线程数,则创建 Worker 来执行任务
    • 如果 Worker 数量大于等于核心线程数,则将任务添加到任务阻塞队列
    • 如果任务阻塞队列已满,则创建 Worker 来执行任务
    • 如果 Worker 数量已经达到最大线程数,此时执行任务拒绝策略
  3. Worker启动自身持有的线程,并执行自身实现的任务。启动后先执行自己的初始任务,然后再取任务阻塞队列中的任务。

关闭线程池源码分析

关闭 ThreadPoolExecutor 的方法主要有 shutdown()shutdownNow()

shutdown()

停止接收新任务,会把阻塞队列中的任务执行完毕。

public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 检查相关的线程修改权限checkShutdownAccess();// 通过CAS将线程池状态修改为SHUTDOWNadvanceRunState(SHUTDOWN);// 中断空闲的WorkerinterruptIdleWorkers();// 为 ScheduledThreadPoolExecutor 提供的方法入口,根据指定的终止策略,中断和清除队列中所有任务onShutdown();} finally {mainLock.unlock();}// 尝试终止线程池tryTerminate();
}

空闲 Worker 的中断涉及到锁的获取,具体处理过程可以参考以下的源码:

private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;// 中断线程前,尝试获取Worker的锁if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}private void interruptIdleWorkers() {interruptIdleWorkers(false);
}

这里需要注意的是,Worker 除了实现了 Runnable 接口外,还继承了 AbstractQueuedSynchronizer,因此 Worker 本身还是一把锁,故有其自身实现锁的相关方法。以下是相关方法的实现:

public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }
public void unlock()      { release(1); }protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;
}
protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;
}

从源码中可以看出,Worker 的 lock() 方法调用了 AbstractQueuedSynchronizer 抽象类的 acquire() 方法,该方法内部又调用了 Worker 实现的 tryAcquire() 方法。tryAcquire() 方法通过 CAS 将锁的状态 STATE 从 0 设置为 1,由此可知 Worker 是一把不可重入锁。

正由于 Worker 是一把不可重入锁,正在执行任务的 Worker 是无法获取到锁的,只有线程池中没有执行任务的 Worker 才能被获取到锁,所以关闭线程池时对空闲 Worker 执行中断指令,实际上就是中断没有执行任务的空闲 Worker。正在执行任务的 Worker 在 shutdown() 方法被调用时不会被中断,而是继续执行完当前任务,随后再从任务阻塞队列中获取任务来执行,直到任务阻塞队列为空,紧接着当前 Worker 也会被删除。等到线程池中所有的 Worker 都被删除,以及任务阻塞队列任务清空后,线程池才会被终止掉。

总结下来,shutdown() 方法实现的内容就是:将线程池的状态设置为 SHUTDOWN,拒绝接收新的任务,等到线程池 Worker 数为 0,且任务阻塞队列为空时,关闭线程池。

shutdownNow()

停止接收新任务,中断当前所有的任务,并将线程池的状态置为 STOP。

public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 通过CAS方式将线程池状态设置为STOPadvanceRunState(STOP);// 中断所有WorkerinterruptWorkers();// 将任务阻塞队列中所有任务获取出来并返回tasks = drainQueue();} finally {mainLock.unlock();}// 尝试终止线程池tryTerminate();return tasks;
}private void interruptWorkers() {// 需要确保当前线程获取到了全局锁 this.mainLockfor (Worker w : workers)w.interruptIfStarted();
}

shutdownNow() 方法跟 shutdown() 方法的过程基本一致,在一些细节的具体实现上有所区别。首先会将线程池状态设置为 STOP,然后调用 interruptWorkers() 方法中断线程池中的所有 Worker,接着调用 tryTerminate() 方法尝试终止线程池,最后将任务阻塞队列中未被执行的任务返回。

shutdownNow() 方法调用后,线程池中的所有 Worker 都会被中断,包括正在执行任务的 Worker。也就是说 shutdownNow() 方法不会保证正在执行的任务会被安全的执行完毕,同时还会放弃任务阻塞队列中的所有任务。

tryTerminate()

前面多次提及的 tryTerminate() 方法可以确保线程池被正确的关闭,这里可以看看源码的具体实现:

final void tryTerminate() {for (;;) {int c = ctl.get();// 满足以下三种状态的线程池,不能被终止if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))return;// Worker数量不为0时,中断一个空闲的Worker;内部传播了shudown信号if (workerCountOf(c) != 0) {interruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 将线程池状态设置为TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 终止线程池terminated();} finally {// 将线程池状态设置为TERMINATEDctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}
}

线程池不能被终止的三种状态:

  • RUNNING - 正在执行任务
  • 大于或等于TIDYING - 正在进行终止流程或已经被终止
  • SHUTDOWN,但任务阻塞队列不为空 - 正在等待任务阻塞队列的任务被执行完

如果线程池不属于以上三种状态,此时可以通过中断一个空闲的 Worker,被中断的空闲 Worker 会在 getTask() 方法中返回 null,从而执行 processWorkerExit() 方法。在 processWorkerExit() 方法中,会删除当前的 Worker,又会再调用 tryTerminate() 方法,从而实现在所有空闲 Worker 之前传播 shutdown 信号。

以下是源码中提供的官方注释内容:

Transitions to TERMINATED state if either (SHUTDOWN and pool and queue empty) or (STOP and pool empty). If otherwise eligible to terminate but workerCount is nonzero, interrupts an idle worker to ensure that shutdown signals propagate. This method must be called following any action that might make termination possible – reducing worker count or removing tasks from the queue during shutdown. The method is non-private to allow access from ScheduledThreadPoolExecutor.

通过官方注释可知,只有在以下两种状态下,线程池能够被终止:

  • 线程池状态为 SHUTDOWN,Worker 数量为 0,任务阻塞队列为空
  • 线程池状态为 STOP,Worker 数量为 0

同时,在所有可能导致线程池终止的操作中都应该调用 tryTerminate() 方法来尝试终止线程池,因此线程池中 Worker 被删除时和任务阻塞队列中任务被删除时会调用 tryTerminate(),以达到在线程池符合终止条件时及时终止线程池。

http://www.hkea.cn/news/812863/

相关文章:

  • 页网站无锡网站制作推广
  • 一流的龙岗网站建设目前最靠谱的推广平台
  • 企业营销型网站费用短视频推广引流
  • 化妆品可做的团购网站有哪些seo研究中心南宁线下
  • 网站空间域名是什么做电商必备的几个软件
  • 软件公司运营是做什么的seo公司运营
  • 专业云南做网站福州短视频seo服务
  • 网站开发技术期中试题电商培训机构排名
  • 网站设计连接数据库怎么做如何进行百度推广
  • 日本网站图片做淘宝代购网络营销促销方案
  • 网站开发导航栏网站制作的费用
  • 盐城网站设计网站流量统计工具
  • 网站上如何做相关推荐郑州建网站的公司
  • 漂亮大气的装潢室内设计网站模板 单页式html5网页模板包前端优化
  • 论坛网站开发开题报告青岛百度推广多少钱
  • 文山做网站如何优化百度seo排名
  • 上海展陈设计公司有哪些成都网站seo性价比高
  • 小韩网站源码360广告投放平台
  • 网站地图的重要性短信广告投放软件
  • 搭建直播网站需要怎么做教育培训机构平台
  • 濮阳网站网站建设网络营销策划是什么
  • 做新闻网站需要什么手续河北软文搜索引擎推广公司
  • 广州网站建设联系电话seo推广的公司
  • 一起做网店一样的网站关键词歌曲免费听
  • 负责网站建设推广本周热点新闻事件
  • 快速做网站优化谷歌在线浏览入口
  • 苏州企业网站建设开发与制作2023年6月份又封城了
  • 用java做网站可以吗吉林seo刷关键词排名优化
  • 网站建设面试google广告投放技巧
  • 整形网站整站源码如何让关键词排名靠前