北京seo服务,温州网站建设优化,招远网站定制,wordpress 安全密钥#x1f9d1; 博主简介#xff1a;CSDN博客专家#xff0c;历代文学网#xff08;PC端可以访问#xff1a;https://literature.sinhy.com/#/literature?__c1000#xff0c;移动端可微信小程序搜索“历代文学”#xff09;总架构师#xff0c;15年工作经验#xff0c;… 博主简介CSDN博客专家历代文学网PC端可以访问https://literature.sinhy.com/#/literature?__c1000移动端可微信小程序搜索“历代文学”总架构师15年工作经验精通Java编程高并发设计Springboot和微服务熟悉LinuxESXI虚拟化以及云原生Docker和K8s热衷于探索科技的边界并将理论知识转化为实际应用。保持对新技术的好奇心乐于分享所学希望通过我的实践经历和见解启发他人的创新思维。在这里我希望能与志同道合的朋友交流探讨共同进步一起在技术的世界里不断学习成长。 技术合作请加本人wx注明来自csdnforeast_sea Java CountDownLatch 用法和源码解析
CountDownLatch 用法和源码解析 认识 CountDownLatchCountDownLatch 的使用 CountDownLatch 应用场景CountDownLatch 用法 CountDownLatch 源码分析 Sync 内部类await 方法countDown 方法 总结
CountDownLatch 是 Java 并发包中的一个同步辅助类用于协调多个线程之间的同步操作。是多线程控制的一种工具它被称为 门阀、 计数器或者 闭锁。
它内部有一个计数器这个计数器在构造 CountDownLatch 时被初始化其值代表需要等待的事件数量。例如当你创建一个 CountDownLatch 对象并传入数字 5就表示需要等待 5 个事件完成。 工作原理如下主线程或者任何等待的线程会调用 await 方法此时线程会被阻塞。在其他的线程通常是执行任务的工作线程完成自己的任务后会调用 countDown 方法。每调用一次 countDown 方法计数器的值就会减 1。
当计数器的值减到 0 时那些因调用 await 方法而被阻塞的线程就会被唤醒继续执行后续的操作。这就像是在赛跑比赛中所有选手工作线程完成比赛调用 countDown后裁判调用 await 的线程才宣布比赛结束并进行后续流程。
举个例子假设有一个程序需要等待多个文件下载完成后才能进行下一步处理。可以使用 CountDownLatch每个文件下载线程在完成下载后调用 countDown而负责后续处理的主线程调用 await 等待当所有文件下载完成主线程就可以开始进行下一步操作从而有效地实现了多线程之间的协调同步。
认识 CountDownLatch
CountDownLatch 能够使一个线程在等待另外一些线程完成各自工作之后再继续执行。它相当于是一个计数器这个计数器的初始值就是线程的数量每当一个任务完成后计数器的值就会减一当计数器的值为 0 时表示所有的线程都已经任务了然后在 CountDownLatch 上等待的线程就可以恢复执行接下来的任务。
CountDownLatch 的使用
CountDownLatch 提供了一个构造方法你必须指定其初始值还指定了 countDown 方法这个方法的作用主要用来减小计数器的值当计数器变为 0 时在 CountDownLatch 上 await 的线程就会被唤醒继续执行其他任务。当然也可以延迟唤醒给 CountDownLatch 加一个延迟时间就可以实现。 其主要方法如下 CountDownLatch 主要有下面这几个应用场景
CountDownLatch 应用场景
典型的应用场景就是当一个服务启动时同时会加载很多组件和服务这时候主线程会等待组件和服务的加载。当所有的组件和服务都加载完毕后主线程和其他线程在一起完成某个任务。
CountDownLatch 还可以实现学生一起比赛跑步的程序CountDownLatch 初始化为学生数量的线程鸣枪后每个学生就是一条线程来完成各自的任务当第一个学生跑完全程后CountDownLatch 就会减一直到所有的学生完成后CountDownLatch 会变为 0 接下来再一起宣布跑步成绩。
顺着这个场景你自己就可以延伸、拓展出来很多其他任务场景。
CountDownLatch 用法
下面我们通过一个简单的计数器来演示一下 CountDownLatch 的用法
public class TCountDownLatch {public static void main(String[] args) {CountDownLatch latch new CountDownLatch(5);Increment increment new Increment(latch);Decrement decrement new Decrement(latch);new Thread(increment).start();new Thread(decrement).start();try {Thread.sleep(6000);} catch (InterruptedException e) {e.printStackTrace();}}
}class Decrement implements Runnable {CountDownLatch countDownLatch;public Decrement(CountDownLatch countDownLatch){this.countDownLatch countDownLatch;}Overridepublic void run() {try {for(long i countDownLatch.getCount();i 0;i--){Thread.sleep(1000);System.out.println(countdown);this.countDownLatch.countDown();}} catch (InterruptedException e) {e.printStackTrace();}}
}class Increment implements Runnable {CountDownLatch countDownLatch;public Increment(CountDownLatch countDownLatch){this.countDownLatch countDownLatch;}Overridepublic void run() {try {System.out.println(await);countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Waiter Released);}
}在 main 方法中我们初始化了一个计数器为 5 的 CountDownLatch在 Decrement 方法中我们使用 countDown 执行减一操作然后睡眠一段时间同时在 Increment 类中进行等待直到 Decrement 中的线程完成计数减一的操作后唤醒 Increment 类中的 run 方法使其继续执行。
下面我们再来通过学生赛跑这个例子来演示一下 CountDownLatch 的具体用法
public class StudentRunRace {CountDownLatch stopLatch new CountDownLatch(1);CountDownLatch runLatch new CountDownLatch(10);public void waitSignal() throws Exception{System.out.println(选手 Thread.currentThread().getName() 正在等待裁判发布口令);stopLatch.await();System.out.println(选手 Thread.currentThread().getName() 已接受裁判口令);Thread.sleep((long) (Math.random() * 10000));System.out.println(选手 Thread.currentThread().getName() 到达终点);runLatch.countDown();}public void waitStop() throws Exception{Thread.sleep((long) (Math.random() * 10000));System.out.println(裁判Thread.currentThread().getName()即将发布口令);stopLatch.countDown();System.out.println(裁判Thread.currentThread().getName()已发送口令正在等待所有选手到达终点);runLatch.await();System.out.println(所有选手都到达终点);System.out.println(裁判Thread.currentThread().getName()汇总成绩排名);}public static void main(String[] args) {ExecutorService service Executors.newCachedThreadPool();StudentRunRace studentRunRace new StudentRunRace();for (int i 0; i 10; i) {Runnable runnable () - {try {studentRunRace.waitSignal();} catch (Exception e) {e.printStackTrace();}};service.execute(runnable);}try {studentRunRace.waitStop();} catch (Exception e) {e.printStackTrace();}service.shutdown();}
}下面我们就来一起分析一下 CountDownLatch 的源码
CountDownLatch 源码分析
CountDownLatch 使用起来比较简单但是却非常有用现在你可以在你的工具箱中加上 CountDownLatch 这个工具类了。下面我们就来深入认识一下 CountDownLatch。
CountDownLatch 的底层是由 AbstractQueuedSynchronizer 支持而 AQS 的数据结构的核心就是两个队列一个是 同步队列(sync queue)一个是条件队列(condition queue)。
Sync 内部类
CountDownLatch 在其内部是一个 Sync 它继承了 AQS 抽象类。
private static final class Sync extends AbstractQueuedSynchronizer {...}CountDownLatch 其实其内部只有一个 sync 属性并且是 final 的
private final Sync sync;CountDownLatch 只有一个带参数的构造方法
public CountDownLatch(int count) {if (count 0) throw new IllegalArgumentException(count 0);this.sync new Sync(count);
}也就是说初始化的时候必须指定计数器的数量如果数量为负会直接抛出异常。
然后把 count 初始化为 Sync 内部的 count也就是
Sync(int count) {setState(count);
}注意这里有一个 setState(count)这是什么意思呢见闻知意这只是一个设置状态的操作但是实际上不单单是还有一层意思是 state 的值代表着待达到条件的线程数。这个我们在聊 countDown 方法的时候再讨论。 getCount() 方法的返回值是 getState() 方法它是 AbstractQueuedSynchronizer 中的方法这个方法会返回当前线程计数具有 volatile 读取的内存语义。
// ---- CountDownLatch ----int getCount() {return getState();
}// ---- AbstractQueuedSynchronizer ----protected final int getState() {return state;
}tryAcquireShared() 方法用于获取·共享状态下对象的状态判断对象是否为 0 如果为 0 返回 1 表示能够尝试获取如果不为 0那么返回 -1表示无法获取。
protected int tryAcquireShared(int acquires) {return (getState() 0) ? 1 : -1;
}// ---- getState() 方法和上面的方法相同 ----这个 共享状态 属于 AQS 中的概念在 AQS 中分为两种模式一种是 独占模式一种是 共享模式。
tryAcquire 独占模式尝试获取资源成功则返回 true失败则返回 false。tryAcquireShared 共享方式尝试获取资源。负数表示失败0 表示成功但没有剩余可用资源正数表示成功且有剩余资源。
tryReleaseShared() 方法用于共享模式下的释放
protected boolean tryReleaseShared(int releases) {// 减小数量变为 0 的时候进行通知。for (;;) {int c getState();if (c 0)return false;int nextc c-1;if (compareAndSetState(c, nextc))return nextc 0;}
}这个方法是一个无限循环获取线程状态如果线程状态是 0 则表示没有被线程占有没有占有的话那么直接返回 false 表示已经释放然后下一个状态进行 - 1 使用 compareAndSetState CAS 方法进行和内存值的比较如果内存值也是 1 的话就会更新内存值为 0 判断 nextc 是否为 0 如果 CAS 比较不成功的话会再次进行循环判断。 如果 CAS 用法不清楚的话读者朋友们可以参考这篇文章 告诉你一个 AtomicInteger 的惊天大秘密 await 方法
await() 方法是 CountDownLatch 一个非常重要的方法基本上可以说只有 countDown 和 await 方法才是 CountDownLatch 的精髓所在这个方法将会使当前线程在 CountDownLatch 计数减至零之前一直等待除非线程被中断。
CountDownLatch 中的 await 方法有两种一种是不带任何参数的 await()一种是可以等待一段时间的await(long timeout, TimeUnit unit)。下面我们先来看一下 await() 方法。
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}await 方法内部会调用 acquireSharedInterruptibly 方法这个 acquireSharedInterruptibly 是 AQS 中的方法以共享模式进行中断。
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) 0)doAcquireSharedInterruptibly(arg);
}可以看到acquireSharedInterruptibly 方法的内部会首先判断线程是否中断如果线程中断则直接抛出线程中断异常。如果没有中断那么会以共享的方式获取。如果能够在共享的方式下不能获取锁那么就会以共享的方式断开链接。
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node addWaiter(Node.SHARED);boolean failed true;try {for (;;) {final Node p node.predecessor();if (p head) {int r tryAcquireShared(arg);if (r 0) {setHeadAndPropagate(node, r);p.next null; // help GCfailed false;return;}}if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}这个方法有些长我们分开来看
首先会先构造一个共享模式的 Node 入队然后使用无限循环判断新构造 node 的前驱节点如果 node 节点的前驱节点是头节点那么就会判断线程的状态这里调用了一个 setHeadAndPropagate ,其源码如下
private void setHeadAndPropagate(Node node, int propagate) {Node h head; setHead(node);if (propagate 0 || h null || h.waitStatus 0 ||(h head) null || h.waitStatus 0) {Node s node.next;if (s null || s.isShared())doReleaseShared();}
}首先会设置头节点然后进行一系列的判断获取节点的获取节点的后继以共享模式进行释放就会调用 doReleaseShared 方法我们再来看一下 doReleaseShared 方法
private void doReleaseShared() {for (;;) {Node h head;if (h ! null h ! tail) {int ws h.waitStatus;if (ws Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws 0 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h head) // loop if head changedbreak;}
}这个方法会以无限循环的方式首先判断头节点是否等于尾节点如果头节点等于尾节点的话就会直接退出。如果头节点不等于尾节点会判断状态是否为 SIGNAL不是的话就继续循环 compareAndSetWaitStatus然后断开后继节点。如果状态不是 SIGNAL也会调用 compareAndSetWaitStatus 设置状态为 PROPAGATE状态为 0 并且不成功就会继续循环。
也就是说 setHeadAndPropagate 就是设置头节点并且释放后继节点的一系列过程。
我们来看下面的 if 判断也就是 shouldParkAfterFailedAcquire(p, node) 这里
if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt())throw new InterruptedException();如果上面 Node p node.predecessor() 获取前驱节点不是头节点就会进行 park 断开操作判断此时是否能够断开判断的标准如下
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws pred.waitStatus;if (ws Node.SIGNAL)return true;if (ws 0) {do {node.prev pred pred.prev;} while (pred.waitStatus 0);pred.next node;} else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}这个方法会判断 Node p 的前驱节点的结点状态(waitStatus)节点状态一共有五种分别是 CANCELLED(1)表示当前结点已取消调度。当超时或被中断响应中断的情况下会触发变更为此状态进入该状态后的结点将不会再变化。 SIGNAL(-1)表示后继结点在等待当前结点唤醒。后继结点入队时会将前继结点的状态更新为 SIGNAL。 CONDITION(-2)表示结点等待在 Condition 上当其他线程调用了 Condition 的 signal() 方法后CONDITION状态的结点将从等待队列转移到同步队列中等待获取同步锁。 PROPAGATE(-3)共享模式下前继结点不仅会唤醒其后继结点同时也可能会唤醒后继的后继结点。 0新结点入队时的默认状态。
如果前驱节点是 SIGNAL 就会返回 true 表示可以断开如果前驱节点的状态大于 0 (此时为什么不用 ws Node.CANCELLED ) 呢因为 ws 大于 0 的条件只有 CANCELLED 状态了。然后就是一系列的查找遍历操作直到前驱节点的 waitStatus 0。如果 ws 0 而且还不是 SIGNAL 状态的话就会使用 CAS 替换前驱节点的 ws 为 SIGNAL 状态。
如果检查判断是中断状态的话就会返回 false。
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();
}这个方法使用 LockSupport.park 断开连接然后返回线程是否中断的标志。
cancelAcquire() 用于取消等待队列如果等待过程中没有成功获取资源如timeout或者可中断的情况下被中断了那么取消结点在队列中的等待。
private void cancelAcquire(Node node) {if (node null)return;node.thread null;Node pred node.prev;while (pred.waitStatus 0)node.prev pred pred.prev;Node predNext pred.next;node.waitStatus Node.CANCELLED;if (node tail compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {int ws;if (pred ! head ((ws pred.waitStatus) Node.SIGNAL ||(ws 0 compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) pred.thread ! null) {Node next node.next;if (next ! null next.waitStatus 0)compareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node);}node.next node; // help GC}
}所以对 CountDownLatch 的 await 调用大致会有如下的调用过程。 一个和 await 重载的方法是 await(long timeout, TimeUnit unit)这个方法和 await 最主要的区别就是这个方法能够可以等待计数器一段时间再执行后续操作。
countDown 方法
countDown 是和 await 同等重要的方法countDown 用于减少计数器的数量如果计数减为 0 的话就会释放所有的线程。
public void countDown() {sync.releaseShared(1);
}这个方法会调用 releaseShared 方法此方法用于共享模式下的释放操作首先会判断是否能够进行释放判断的方法就是 CountDownLatch 内部类 Sync 的 tryReleaseShared 方法
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}// ---- CountDownLatch ----protected boolean tryReleaseShared(int releases) {for (;;) {int c getState();if (c 0)return false;int nextc c-1;if (compareAndSetState(c, nextc))return nextc 0;}
}tryReleaseShared 会进行 for 循环判断线程状态值使用 CAS 不断尝试进行替换。
如果能够释放就会调用 doReleaseShared 方法
private void doReleaseShared() {for (;;) {Node h head;if (h ! null h ! tail) {int ws h.waitStatus;if (ws Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws 0 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h head) // loop if head changedbreak;}
}可以看到doReleaseShared 其实也是一个无限循环不断使用 CAS 尝试替换的操作。
总结
本文是 CountDownLatch 的基本使用和源码分析CountDownLatch 就是一个基于 AQS 的计数器它内部的方法都是围绕 AQS 框架来谈的除此之外还有其他比如 ReentrantLock、Semaphore 等都是 AQS 的实现所以要研究并发的话离不开对 AQS 的探讨。CountDownLatch 的源码看起来很少比较简单但是其内部比如 await 方法的调用链路却很长也值得花费时间深入研究。