图书类网站建设策划书,英文商城网站模板,ghost vs wordpress,网络怎么设计作者#xff1a;京东科技 李玉亮
目录指引 限流场景
软件系统中一般有两种场景会用到限流#xff1a;
•场景一、高并发的用户端场景。 尤其是C端系统#xff0c;经常面对海量用户请求#xff0c;如不做限流#xff0c;遇到瞬间高并发的场景#xff0c;则可能压垮系统…作者京东科技 李玉亮
目录指引 限流场景
软件系统中一般有两种场景会用到限流
•场景一、高并发的用户端场景。 尤其是C端系统经常面对海量用户请求如不做限流遇到瞬间高并发的场景则可能压垮系统。
•场景二、内部交易处理场景。 如某类交易任务处理时有速率要求再如上下游调用时下游对上游有速率要求。
•无论哪种场景都需要对请求处理的速率进行限制或者单个请求处理的速率相对固定或者批量请求的处理速率相对固定见下图 常用的限流算法有如下几种
•算法一、信号量算法。 维护最大的并发请求数如连接数当并发请求数达到阈值时报错或等待如线程池。
•算法二、漏桶算法。 模拟一个按固定速率漏出的桶当流入的请求量大于桶的容量时溢出。
•算法三、令牌桶算法。 以固定速率向桶内发放令牌。请求处理时,先从桶里获取令牌只服务有令牌的请求。
本次要介绍的RateLimiter使用的是令牌桶算法。RateLimiter是google的guava包中的一个轻巧限流组件它主要有两个java类文件RateLimiter.java和SmoothRateLimiter.java。两个类文件共有java代码301行、注释420行注释比java代码还要多写的非常详细后面的介绍也有相关内容是翻译自其注释有些描述英文原版更加准确清晰有兴趣的也可以结合原版注释进行更详细的了解。
使用介绍
RateLimiter使用时只需引入guava jar便可最新的版本是31.1-jre, 本文介绍的源码也是此版本。 dependencygroupIdcom.google.guava/groupIdartifactIdguava/artifactIdversion31.1-jre/version/dependency
源码中提供了两个直观的使用示例。
示例一、有一系列任务列表要提交执行控制提交速率不超过每秒2个。 final RateLimiter rateLimiter RateLimiter.create(2.0); // 创建一个每秒2个许可的RateLimiter对象.void submitTasks(ListRunnable tasks, Executor executor) {for (Runnable task : tasks) {rateLimiter.acquire(); // 此处可能有等待executor.execute(task);}}
示例二、以不超过5kb/s的速率产生数据流。 final RateLimiter rateLimiter RateLimiter.create(5000.0); // 创建一个每秒5k个许可的RateLimiter对象void submitPacket(byte[] packet) {rateLimiter.acquire(packet.length);networkService.send(packet);}
可以看出RateLimiter的使用非常简单只需要构造限速器调用获取许可方法便可不需要释放许可.
算法介绍
在介绍之前先说一下RateLimiter中的几个名词
•许可( permit ) 代表一个令牌获取到许可的请求才能放行。
•资源利用不足( underunilization ): 许可的发放一般是匀速的但请求未必是匀速的有时会有无请求资源利用不足的场景令牌桶会有贮存机制。
•贮存许可( storedPermit ): 令牌桶支持对空闲资源进行许可贮存许可请求时优先使用贮存许可。
•新鲜许可( freshPermit ): 当贮存许可为空时采用透支方式下发新鲜许可同时设置下次许可生效时间为本次新鲜许可的结束时间。
•如下为一个许可发放示例矩形代表整个令牌桶许可产生速度为1个/秒令牌桶里有一个贮存桶容量为2。 以上示例中在T1贮存容量为0许可请求时直接返回1个新鲜许可贮存容量随着时间推移增长至最大值2在T2时收到3个许可的请求此时会先从贮存桶中取出2个然后再产生1个新鲜许可0.5s后在T3时刻又来了1个许可请求由于最近的许可0.5s后才会下发因此先sleep0.5s再下发。
RateLimiter的核心功能是限速我们首先想到的限速方案是记住最后一次下发令牌许可(permit)时间下次许可请求时如果与最后一次下发许可时间的间隔小于1/QPS则进行sleep至1/QPS否则直接发放但该方法不能感知到资源利用不足的场景。一方面隔了很长一段再来请求许可则可能系统此时相对空闲可下发更多的许可以充分利用资源另一方面隔了很长一段时间再来请求许可也可能意味着处理请求的资源变冷如缓存失效处理效率会下降。因此在RateLimiter中增加了资源利用不足underutilization的管理在代码中体现为贮存许可(storedPermits)贮存许可值最开始为0随着时间的增加一直增长为最大贮存许可数。许可获取时首先从贮存许可中获取然后再根据下次新鲜许可获取时间来进行新鲜许可获取。这里要说的是RateLimiter是记住了下次令牌发放的时间类似于透支的功能当前许可获取时立刻返回同时记录下次获取许可的时间。
代码结构和主体流程
代码结构
整体类图如下 RateLimiter类
RateLimiter类是顶级类也是唯一暴露给使用者的类它提供了工厂方法来创建RateLimiter方法。 create(double permitsPerSecond) 方法创建的是突发限速器create(double permitsPerSecond, Duration warmupPeriod)方法创建的是预热限速器。同时它提供了acquire方法用于获取令牌提供了tryAcquire方法用于尝试获取令牌。该类的内部实现上一方面有一个SleepingStopWatch 用于sleep操作另一方面有一个mutexDoNotUseDirectly变量和mutex方法进行互斥加锁。
SmoothRateLimiter类
该类继承了RateLimiter类是一个抽象类含义为平滑限速器限制速率是平滑的maxPermits和storedPermits维护了最大存储许可数量和当前存储许可数量stableIntervalMicros指规定的稳定许可发放间隔nextFreeTicketMicros指下一个空闲许可时间。
SmoothBursty类
平滑突发限速器该类继承了SmoothRateLimiter它存储许可的发放频率同设置的stableIntervalMicros有一个成员变量maxBurstSeconds代表最多存储多长时间的令牌许可。
SmoothWarmingUp类
平滑预热限速器继承了SmoothRateLimiter与SmoothBursty平级它的预热算法需要一定的理解成本。
主体流程
获取许可的主体流程如下 主体流程主要是对贮存许可数量和新鲜许可数量进行计算和更新得到当前许可请求的等待时间。SmoothBursty算法和SmoothWarmingUp算法共用这一套主体流程差异主要是贮存许可的管理策略两种算法的不同策略在两个子类中各自实现SmoothBursty算法相对简单一些下面先介绍该算法然后再介绍SmoothWarmingUp算法。
SmoothBursty算法
限速器创建
采用的是工厂模式创建源码如下 public static RateLimiter create(double permitsPerSecond) {// permitsPerSecond指每秒允许的许可数. 该方法调用了下面的方法return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());}// 创建SmoothBursty(固定贮存1s的贮存许可), 然后设置速率static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {RateLimiter rateLimiter new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);rateLimiter.setRate(permitsPerSecond);return rateLimiter;}
1、SmoothBursty的构造方法相对简单 SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {super(stopwatch);this.maxBurstSeconds maxBurstSeconds;}
2、rateLimiter.setRate的定义在父类RateLimiter中 public final void setRate(double permitsPerSecond) {checkArgument(permitsPerSecond 0.0 !Double.isNaN(permitsPerSecond), rate must be positive);synchronized (mutex()) {doSetRate(permitsPerSecond, stopwatch.readMicros());}}
该方法使用synchronized(mutex())方法对互斥锁进行同步以保证多线程调用的安全然后调用子类的doSetRate方法。 第二个参数nowMicros传的值是调用了stopwatch的方法将限速器创建的时间定义为0然后计算了当前时间和创建时间的时间差因此采用的是相对时间。
2.1 mutex方法的实现如下 // Cant be initialized in the constructor because mocks dont call the constructor.// 从上行注释可看出这是因为mock才用了懒加载, 实际上即时加载代码更简洁CheckForNull private volatile Object mutexDoNotUseDirectly;// 双重检查锁的懒加载模式private Object mutex() {Object mutex mutexDoNotUseDirectly;if (mutex null) {synchronized (this) {mutex mutexDoNotUseDirectly;if (mutex null) {mutexDoNotUseDirectly mutex new Object();}}}return mutex;}
该方法使用了双重检查锁来对锁对象mutexDoNotUseDirectly进行懒加载另外该方法通过mutex临时变量来解决了双重检查锁失效的问题。
2.2 doSetRate方法的主体实现在SmoothRateLimiter类中 final void doSetRate(double permitsPerSecond, long nowMicros) {// 同步贮存许可和时间resync(nowMicros);double stableIntervalMicros SECONDS.toMicros(1L) / permitsPerSecond;this.stableIntervalMicros stableIntervalMicros;doSetRate(permitsPerSecond, stableIntervalMicros);}
该方法在限速器创建时会调用创建后调用限速器的setRate重置速率时也会调用。
2.2.1 resync方法用于基于当前时间刷新计算最新的storedPermis和nextFreeTicketMicros. /** Updates {code storedPermits} and {code nextFreeTicketMicros} based on the current time. */void resync(long nowMicros) {// if nextFreeTicket is in the past, resync to nowif (nowMicros nextFreeTicketMicros) {double newPermits (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();storedPermits min(maxPermits, storedPermits newPermits);nextFreeTicketMicros nowMicros;}}
该方法从现实场景上讲代表的是随着时间的流逝贮存许可不断增加但从技术实现的角度并不是真正的持续刷新而是仅在需要时调用刷新。该方法如果当前时间小于等于下次许可时间则贮存许可数量和下次许可时间不需要刷新否则通过 (当前时间-下次许可时间)/贮存许可的发放间隔计算出的值域最大贮存数量取小则为已贮存的许可数量需要注意的是贮存许可数量是double类型的。
限速器使用
限速器常用的方法主要有accquire和tryAccquire。
先说一下accquire方法 共有两个共有方法一个是无参的每次获取1个许可再一个是整数参数的每次调用获取多个许可。 // 获取1个许可public double acquire() {return acquire(1);}// 获取多个许可public double acquire(int permits) {// 留出permits个许可得到需要sleep的微秒数.long microsToWait reserve(permits);// 该方法如果小于等于零则直接返回否则sleepstopwatch.sleepMicrosUninterruptibly(microsToWait);// 返回休眠的秒数.return 1.0 * microsToWait / SECONDS.toMicros(1L);}
从以上源码可看出获取许可的逻辑很简单留出permits个许可根据返回值决定是否sleep等待。留出许可的方法实现如下 // 预留出permits个许可final long reserve(int permits) {checkPermits(permits);synchronized (mutex()) {return reserveAndGetWaitLength(permits, stopwatch.readMicros());}}// 预留出permits个需求得到需要等待的时间final long reserveAndGetWaitLength(int permits, long nowMicros) {long momentAvailable reserveEarliestAvailable(permits, nowMicros);return max(momentAvailable - nowMicros, 0);}abstract long reserveEarliestAvailable(int permits, long nowMicros);
reserveEarliestAvailable为抽象方法实现在SmoothRateLimiter类中该方法是核心主链路方法该方法先从贮存许可中获取如果数量足够则直接返回否则先将全部贮存许可取出再计算还需要的等待时间逻辑如下 final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {// 刷新贮存许可和下个令牌时间resync(nowMicros);// 返回值为当前的下次空闲时间long returnValue nextFreeTicketMicros;// 要消耗的贮存数量为需要的贮存数量double storedPermitsToSpend min(requiredPermits, this.storedPermits);// 新鲜许可数需要的许可数-使用的贮存许可double freshPermits requiredPermits - storedPermitsToSpend;// 等待时间贮存许可等待时间(实现方决定)新鲜许可等待时间(数量*固定速率)long waitMicros storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) (long) (freshPermits * stableIntervalMicros);// 透支后的下次许可可用时间当前时间(nextFreeTicketMicros)等待时间(waitMicros)this.nextFreeTicketMicros LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);// 贮存许可数量减少this.storedPermits - storedPermitsToSpend;return returnValue;}
该方法有两点说明1、returnValue为之前计算的下次空闲时间前面有说RateLimiter采用预支的模式本次直接返回同时计算下次的最早空闲时间 2、贮存许可的等待时间不同的实现方逻辑不同SmoothBursty算法认为贮存许可直接可用所以返回0, 后面的SmoothWarmingUp算法认为贮存许可需要消耗比正常速率更多的预热时间有一定算法逻辑.
至此整个accquire方法的调用链路分析结束下面再看tryAccquire方法就比较简单了tryAccquire比accquire差异的逻辑在于tryAccquire方法会判断下次许可时间-当前时间是否大于超时时间如果是则直接返回false否则进行sleep并返回true. 方法源码如下 public boolean tryAcquire(Duration timeout) {return tryAcquire(1, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);}public boolean tryAcquire(long timeout, TimeUnit unit) {return tryAcquire(1, timeout, unit);}public boolean tryAcquire(int permits) {return tryAcquire(permits, 0, MICROSECONDS);}public boolean tryAcquire() {return tryAcquire(1, 0, MICROSECONDS);}public boolean tryAcquire(int permits, Duration timeout) {return tryAcquire(permits, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);}public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {long timeoutMicros max(unit.toMicros(timeout), 0);checkPermits(permits);long microsToWait;synchronized (mutex()) {long nowMicros stopwatch.readMicros();// 判断超时微秒数是否可等到下个许可时间if (!canAcquire(nowMicros, timeoutMicros)) {return false;} else {microsToWait reserveAndGetWaitLength(permits, nowMicros);}}// 休眠等待stopwatch.sleepMicrosUninterruptibly(microsToWait);return true;}// 下次许可时间-超时时间当前时间private boolean canAcquire(long nowMicros, long timeoutMicros) {return queryEarliestAvailable(nowMicros) - timeoutMicros nowMicros;}
SmoothWarmingUp算法
SmoothWarmingUp算法的主体处理流程同SmoothBurstry算法主要在贮存许可时间计算上的两个方法进行了新实现该算法不像SmoothBurstry算法那么直观好理解需要先了解算法逻辑再看源码。
算法说明
该算法在源码注释中已经描述的比较清晰了主要思想是限流器的初始贮存许可数量便是最大贮存许可值 贮存许可执行时按一定算法由慢到快的产生直至设定的固定速率以此来达到预热过程。该算法涉及到一些数学知识如果不是很感兴趣则了解其主要思想便可。下面详细说一下该算法。
说到该算法前我们再回头看一下SmoothRateLimiter的贮存许可贮存许可有当前数量和最大数量另外还有两个算法逻辑一个是贮存许可生产的速率控制再一个是贮存许可消费速率的控制在Bursty算法中生产的速率同设定的固定速率而消费的速率为无穷大(立刻消费不占用时间在WarmingUp算法中需对照下图进行分析 该图可这样理解每个贮存许可的消费耗时为右侧梯形面积梯形面积(上边长下边长)/2 * 高. 可以看到每个贮存许可的面积越来越小直到固定速率的长方形面积。
在限速器初始化时输入的变量有固定速率和预热时间另外冷却因子是固定值3在作者算法中首先计算的是阈值许可数 0.5 * 预热周期 / 固定速率. 然后计算的是最大许可数我们知道了梯形的面积、上边(大速率)、下边(小速率)便能推到出高最大许可阀值许可数 高。
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {double oldMaxPermits maxPermits;double coldIntervalMicros stableIntervalMicros * coldFactor;thresholdPermits 0.5 * warmupPeriodMicros / stableIntervalMicros;maxPermits thresholdPermits 2.0 * warmupPeriodMicros / (stableIntervalMicros coldIntervalMicros);slope (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);if (oldMaxPermits Double.POSITIVE_INFINITY) {// if we dont special-case this, we would get storedPermits NaN, belowstoredPermits 0.0;} else {storedPermits (oldMaxPermits 0.0)? maxPermits // initial state is cold: storedPermits * maxPermits / oldMaxPermits;}}
在具体使用中一个是生产的速率固定为预热时间/最大许可数源码如下 double coolDownIntervalMicros() {return warmupPeriodMicros / maxPermits;}
再一个是消费的速率按如上曲线从右至左的面积梯形面积长方形面积梯形面积(上边下边) /2 * 高 源码如下 long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {double availablePermitsAboveThreshold storedPermits - thresholdPermits;long micros 0;// measuring the integral on the right part of the function (the climbing line)if (availablePermitsAboveThreshold 0.0) {double permitsAboveThresholdToTake min(availablePermitsAboveThreshold, permitsToTake);// TODO(cpovirk): Figure out a good name for this variable.double length permitsToTime(availablePermitsAboveThreshold) permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);micros (long) (permitsAboveThresholdToTake * length / 2.0);permitsToTake - permitsAboveThresholdToTake;}// measuring the integral on the left part of the function (the horizontal line)micros (long) (stableIntervalMicros * permitsToTake);return micros;}
源码分析
了解了以上算法后再看下面的源码就相对简单了。 static final class SmoothWarmingUp extends SmoothRateLimiter {// 预热时间private final long warmupPeriodMicros;//斜率private double slope;//阈值许可private double thresholdPermits;//冷却因子private double coldFactor;SmoothWarmingUp(SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {super(stopwatch);this.warmupPeriodMicros timeUnit.toMicros(warmupPeriod);this.coldFactor coldFactor;}// 参数初始化Overridevoid doSetRate(double permitsPerSecond, double stableIntervalMicros) {double oldMaxPermits maxPermits;double coldIntervalMicros stableIntervalMicros * coldFactor;thresholdPermits 0.5 * warmupPeriodMicros / stableIntervalMicros;maxPermits thresholdPermits 2.0 * warmupPeriodMicros / (stableIntervalMicros coldIntervalMicros);slope (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);if (oldMaxPermits Double.POSITIVE_INFINITY) {// if we dont special-case this, we would get storedPermits NaN, belowstoredPermits 0.0;} else {storedPermits (oldMaxPermits 0.0)? maxPermits // initial state is cold: storedPermits * maxPermits / oldMaxPermits;}}// 有storedPermits个贮存许可要使用permitsToTake个时的等待时间计算Overridelong storedPermitsToWaitTime(double storedPermits, double permitsToTake) {double availablePermitsAboveThreshold storedPermits - thresholdPermits;long micros 0;// measuring the integral on the right part of the function (the climbing line)if (availablePermitsAboveThreshold 0.0) {double permitsAboveThresholdToTake min(availablePermitsAboveThreshold, permitsToTake);// TODO(cpovirk): Figure out a good name for this variable.double length permitsToTime(availablePermitsAboveThreshold) permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);micros (long) (permitsAboveThresholdToTake * length / 2.0);permitsToTake - permitsAboveThresholdToTake;}// measuring the integral on the left part of the function (the horizontal line)micros (long) (stableIntervalMicros * permitsToTake);return micros;}// 许可耗时固定速率许可值*斜率private double permitsToTime(double permits) {return stableIntervalMicros permits * slope;}// 冷却间隔固定为预热时间/最大许可数.Overridedouble coolDownIntervalMicros() {return warmupPeriodMicros / maxPermits;}}
思考总结
sleep说明和相对时间
RateLimiter内部使用类StopWatch进行了一个相对时间的度量RateLimiter创建时时间为0然后向后累计sleep时不受interrupt异常影响。
double浮点数
RateLimiter暴露的API的许可数量入参为整数类型但内部计算时实际是浮点double类型支持小数许可数量一方面浮点存在丢失精度另一方面也不便于理解是否可以使用整数值得考虑。
只支持单机
RateLimiter的这几种算法只支持单机限流如要支持集群限流一种方式是先根据负载均衡的权重计算出单机的限速值再进行单节点限速另一种方式是参考该组件使用redis等中心化数量管理的中间件但性能和稳定性会降低一些。
扩展性
RateLimiter提供了有限的扩展能力自带的SmoothBursty和SmoothWarmingUp类不是公开类不能直接创建或调整参数如关闭贮存功能或调整预热系数等。这种场景需要继承SmoothRateLimiter进行重写贮存许可的生产和消费算法是容易变化和重写的点将整个源码拷贝出来进行二次修改也是一种方案。