网站直接登陆wordpress,快手推广网站,扬州网站建设制作,长垣县建站塔山双喜1.绪论
本文主要讲解常见的几种延迟队列的实现方式#xff0c;以及其原理。
2.延迟队列的使用场景
延迟队列主要用于解决每个被调度的任务开始执行的时间不一致的场景#xff0c;主要包含如下场景:
1.比如订单超过15分钟后#xff0c;关闭未关闭的订单。
2.比如用户可以…1.绪论
本文主要讲解常见的几种延迟队列的实现方式以及其原理。
2.延迟队列的使用场景
延迟队列主要用于解决每个被调度的任务开始执行的时间不一致的场景主要包含如下场景:
1.比如订单超过15分钟后关闭未关闭的订单。
2.比如用户可以下发任务并且可以自定义任务的开始时间。
3.延迟队列的几大要素
延迟队列主要包含如下几个要素
1.延迟队列里面存储的其实就是需要调度的任务所以我们需要一个存储任务的容器这个容器可以是的数据库redis或者内存中队列包括链表优先队列等
2.一个线程来轮询的存储任务的容器判断任务是否已经到达执行时间
4.延迟队列的实现方式
上面我们说了定时任务其实就是由两个部分组成分别是存储任务的容器和轮询线程接下来我们根据这两个组件来分析各种延迟队列的实现。
4.1 定时任务扫表
4.1.1 组成组件
1.调度线程一般采用分布式的定时任务如果xxljob等。
2.存储任务的容器:数据库
4.1.2 实现方式
启动定时任务每隔一段时间轮询数据库找出已经到达任务开始时间的任务查询出后执行业务逻辑。
4.1.3 优缺点
1.频繁的对数据库进行全表扫描数据库压力大。
2.可能有时间延迟问题延迟大小取决于轮询间隔。
3.定时轮询也会增加自身服务器开销。
4.2 基于内存队列的实现方式
4.2.1 实现原理
1.基本实现
如图所示可以将需要调度的任务存储到一个链表里面然后开启一个线程轮询该链表如果如果某个任务的执行时间已到便执行该任务。 但是上述场景存在一个问题就是每个需要遍历整个链表时间复杂度为o(n)。在这个定时任务重过期时间小的任务一定会先被执行所以我们可以考虑将时间最小的任务放到队首这样就以o(1)的时间复杂度取出下一个需要执行的任务。
2.基于优先队列的优化
在jdk中可以采用优先队列来实现PriorityQueue它其实就是一个小顶堆每次插入元素的时候可以以O(nlogn)的时间复杂度来维持堆首元素最小的特征。 4.2.2 jdk自带的延迟队列DelayQueue实现方式
我们可以看一下jdk自带的延迟队列DelayQueue的实现方式。
1.组成组件
1.存储任务的容器:数据库
2.调度线程:可能有同学好奇DelayQueue的调度线程是哪一个其实是我们在使用DelayQqueue时间会启动一个线程循环轮询DelayQueue这线程就是DelayQueue的调度线程。
new Thread(() - {while(true) {delayQueue.offer();}
}).start();
2.添加元素 public boolean offer(E e) {final ReentrantLock lock this.lock;lock.lock();try {q.offer(e);if (q.peek() e) {leader null;available.signal();}return true;} finally {lock.unlock();}}
添加元素其实就是往优先队列里面写入一个任务优先队列会自动的将过期时间最小的任务放在队首。
3.取出元素 public E take() throws InterruptedException {final ReentrantLock lock this.lock;lock.lockInterruptibly();try { //一直循环for (;;) {//取出队首元素即下一个过期的元素E first q.peek();if (first null)available.await();else {//获取随手元素时间long delay first.getDelay(NANOSECONDS);//如果已经到期返回队首元素if (delay 0)return q.poll();first null; // dont retain ref while waitingif (leader ! null)available.await();else {Thread thisThread Thread.currentThread();leader thisThread;try {//如果没到期等阻塞线程到队首元素的开始时间available.awaitNanos(delay);} finally {if (leader thisThread)leader null;}}}}} finally {if (leader null q.peek() ! null)available.signal();lock.unlock();}}
其实就是轮询整个优先队列优先队列的队首元素就是下一个需要调度的任务如果队首元素的直线时间小于当前时间返回该任务否者阻塞当前任务到下一个任务的执行时间再返回当前任务。
4.2.3 优缺点
1.被调度任务存储在内存中如果重启服务需要调度的任务会丢失。
2.基于优先队列实现插入调度任务时间复杂度为o(nlogn)如果是数据量庞大插入性能可能会被影响并且上一个任务的执行时间可能会影响到下一个任务的执行。
4.3 基于时间轮的实现
4.3.1 什么是时间轮
时间轮其实就是利用一个环形队列来表示时间队列上的每个元素挂载了在这个时间刻度上需要执行的任务。
1.单层时间轮
如图所示就是一个时间轮分成了6个刻度假设每个刻度代表1秒假设当前时间为0秒则第一秒执行的任务放在刻度1第2秒执行的任务放在刻度2。如果任务的执行时间超过了刻度6比如第8秒需要执行的任务放在哪儿呢。我们可以将其对6求余放在刻度2的位置然后用ticket来表示还差几轮才会轮到自己执行。
所以时间轮的执行步骤为通过一个线程轮询环形队列找到当前刻度取出当前刻度上任务链表如果任务链表中的任务的ticket为1立刻执行该任务如果大于1便将ticket减1说明是后面轮次的任务。 2.多层时间轮
单层时间轮一旦时间跨度过大就会导致时间轮的轮数过多每个刻度上挂载的链表过长所以引入多层时间轮。
多层时间轮其实就是有多个不同刻度的单层时间轮组成的一种结构以一天为例子可以用一个3层时间轮来表示。其中一个时间轮刻度为1秒一个时间轮刻度为1分钟一个时间轮刻度为1小时。如果秒时间轮已经转完60个刻度即1分钟。则分时间轮需要向下转动一个刻度将任务取出分散到秒时间轮上。这样便实现了任务的分散。 4.3.2 Netty中的时间轮
1.组成组件
1.存储任务的容器:任务数组
2.调度线程netty启动的推动时间轮的线程。
2.添加元素 //向时间轮中添加定时任务的方法但该方法实际上只会把定时任务存放到timeouts队列中public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {if (task null) {throw new NullPointerException(task);}if (unit null) {throw new NullPointerException(unit);}//启动工作线程并且确保只启动一次这里面会涉及线程的等待和唤醒start();//计算该定时任务的执行时间startTime是worker线程的开始时间。以后所有添加进来的任务的执行时间都是根据这个开始时间做的对比long deadline System.nanoTime() unit.toNanos(delay) - startTime;HashedWheelTimeout timeout new HashedWheelTimeout(task, deadline);//将定时任务和任务的执行时间添加到普通任务队列中timeouts.add(timeout);return timeout;}
注意:netty实现的方式是业务线程会执行任务加入到HashedWheelTimeout 这个普通队列中然后再由推动时间轮的线程来将HashedWheelTimeout中的任务移到时间轮中。其实这一步也可以省略直接在业务线程添加调度的任务的时候将执行任务写入到时间轮询中。增加HashedWheelTimeout的原因应该是为了减少并发读写。
3.执行定时任务 //时间轮线程一直执行的private final class Worker implements Runnable {//这个属性代表当前时间轮的指针移动了几个刻度private long tick;Overridepublic void run() {//给starttime赋值这里要等待该值复制成功后另一个线程才能继续向下执行startTime System.nanoTime();//这里是不是就串联起来了通知之前的线程可以继续向下运行了startTimeInitialized.countDown();do {//返回的是时间轮线程从开始工作到现在执行了多少时间了final long deadline waitForNextTick();if (deadline 0) {//获取要执行的定时任务的那个数组下标。就是让指针当前的刻度和掩码做位运算int idx (int) (tick mask);//上面已经得到了要执行的定时任务的数组下标这里就可以得到该bucket而这个bucket就是定时任务的一个双向链表//链表中的每个节点都是一个定时任务HashedWheelBucket bucket wheel[idx];//在真正执行定时任务之前把即将被执行的任务从普通任务队列中放到时间轮的数组当中transferTimeoutsToBuckets();//执行定时任务bucket.expireTimeouts(deadline);//指针已经移动了所以加1tick;}//暂且让时间轮线程一直循环} while (true);}
}
这里可以看出会有一个线程来一直推动时间轮向前。并执行任务。
4.4 基于redis的实现方式
redis实现延迟队列有两种方式分别是监听key过期和通过zset来存储调度任务。
4.4.1 监听key过期
1.实现原理
即业务系统将调度任务数据存储到redis作为key过期时间设置为任务执行时间。并监听这些key当key过期被删除的时候redis回给业务系统发送通知。
2.优缺点
1.redis采用定期删除惰性删除的方式所以一个key计算过期也可能不会被立即删除掉而是等待下一次访问该key或者被redis的定时任务扫到才会删除key导致任务执行时间不精准。
4.4.2 基于zset存储key和执行时间实现
1.实现原理
实现原理和jdk自带的延迟队列实现原理一样只是存储任务的数据采用Redis中的zset实现下次需要执行的任务放在zset的首部只需要获取首部任务元素然后获取到该元素的过期时间redission启动一个定时任务阻塞线程至首部元素的执行时间才开始执行任务,并且将其加入到一个阻塞队列中。业务系统会启动一个线程一直监听阻塞队列如果有数据证明有任务到达执行时间了便取出数据开始执行任务。 2.Redisson的实现
1.组成组件
1.存储任务的容器:
一个普通的list主要是为了保存执行任务的插入顺序方便执行增删改操作一个zset:key为执行任务score为任务执行时间利用zset的排序功能zrange,可以取出执行时间最小的任务;blist:阻塞队列如果执行任务到期便会被转移到阻塞队列中业务线程会轮询阻塞队列取出里面执行任务完成消费逻辑。
2.执行线程:其实是redisson客户端开启的一个线程。
2.源码分析
redisson的执行逻辑其实可以分成两个层面:
1.就是上面的普通逻辑redisson客户端会启动一个线程一直轮询zset取出里面的过期任务转移到阻塞队列中。但是这里轮询并不是定时扫描而是每次取出到期任务过后会返回最近的下一次任务的到达时间然后启动一个定时器等到下一个任务执行时间到期后才再次从redis中拉取数据大大的减少了io操作。这一操作其实和jdk的延迟队列是一样的。
2.还有就是处理特殊场景一是在初始化的时候如何判断下一个任务到达时间是多少二是在redis中已经拉取到最新的一条任务的过期时间后有新的任务添加到redis中而且这个新的任务的过期时间是小于以前的最近的一条任务的过期时间的。针对这两种情况redisson采用发布订阅的思想。redisson在构造延迟队列的时候会订阅redisson_delay_queue_channel这个channel。
每次添加任务的时候会判断被添加任务的过期时间是不是超过zset中所有任务的过期时间如果是便会向redisson_delay_queue_channel发布消息消息体包含了最近的这条任务的时间。redisson收到消息过后会更新定时器的执行时间为最新的一条执行任务的时间。
a)构造器 protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {super(codec, commandExecutor, name);//创建定时任务QueueTransferTask task new QueueTransferTask(commandExecutor.getServiceManager()) { //这个逻辑是核心的转移逻辑就是前面说的取出zset前面100条数据并且返回下一次任务//的执行时间protected RFutureLong pushTaskAsync() {return commandExecutor.evalWriteAsync(RedissonDelayedQueue.this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, local expiredValues redis.call(zrangebyscore, KEYS[2], 0, ARGV[1], limit, 0, ARGV[2]); if #expiredValues 0 then for i, v in ipairs(expiredValues) do local randomId, value struct.unpack(Bc0Lc0, v);redis.call(rpush, KEYS[1], value);redis.call(lrem, KEYS[3], 1, v);end; redis.call(zrem, KEYS[2], unpack(expiredValues));end; local v redis.call(zrange, KEYS[2], 0, 0, WITHSCORES); if v[1] ~ nil then return v[2]; end return nil;, Arrays.asList(RedissonDelayedQueue.this.getRawName(), RedissonDelayedQueue.this.timeoutSetName, RedissonDelayedQueue.this.queueName), new Object[]{System.currentTimeMillis(), 100});}//创建redisson_delay_queue_channel这个channelprotected RTopic getTopic() {return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, RedissonDelayedQueue.this.channelName);}};//真正的定时任务调度转移zset任务至阻塞队列的逻辑queueTransferService.schedule(this.queueName, task);this.queueTransferService queueTransferService;}
//可以看出在启动这个线程的时候会订阅redisson_delay_queue_channel这个topic
public void start() {RTopic schedulerTopic this.getTopic();this.statusListenerId schedulerTopic.addListener(new BaseStatusListener() {public void onSubscribe(String channel) {QueueTransferTask.this.pushTask();}});this.messageListenerId schedulerTopic.addListener(Long.class, new MessageListenerLong() {//当有消息到达的时候证明此时有新的执行任务的过期时间小于zset中任务最小的过期时间public void onMessage(CharSequence channel, Long startTime) {//所以需要更新定时器中定时时间QueueTransferTask.this.scheduleTask(startTime);}});}
可以看出在初始化的时候redisson就订阅了redisson_delay_queue_channel这个channel其他都是回调方法。
b)添加任务 public RFutureVoid offerAsync(V e, long delay, TimeUnit timeUnit) {if (delay 0L) {throw new IllegalArgumentException(Delay cant be negative);} else {long delayInMs timeUnit.toMillis(delay);long timeout System.currentTimeMillis() delayInMs;byte[] random this.getServiceManager().generateIdArray(8);//这是添加任务的核心方法其实就是将任务添加到zset中如果当前任务的过期时间小于zset中所有任务的过期时间便会执行发布一条消息。return this.commandExecutor.evalWriteNoRetryAsync(this.getRawName(), this.codec, RedisCommands.EVAL_VOID, local value struct.pack(Bc0Lc0, string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]);redis.call(zadd, KEYS[2], ARGV[1], value);redis.call(rpush, KEYS[3], value);local v redis.call(zrange, KEYS[2], 0, 0); if v[1] value then redis.call(publish, KEYS[4], ARGV[1]); end;, Arrays.asList(this.getRawName(), this.timeoutSetName, this.queueName, this.channelName), new Object[]{timeout, random, this.encode(e)});}}
可以看出添加任务其实就是将任务添加到zset中如果当前任务的过期时间小于zset中所有任务的过期时间便会执行发布一条消息到redisson_delay_queue_channel中触发上面回调方法QueueTransferTask.this.scheduleTask(startTime)。
c)转移任务至阻塞队列 private void scheduleTask(Long startTime) {QueueTransferTask.TimeoutTask oldTimeout (QueueTransferTask.TimeoutTask)this.lastTimeout.get();if (startTime ! null) {if (oldTimeout ! null) {oldTimeout.getTask().cancel();}long delay startTime - System.currentTimeMillis();if (delay 10L) {//创建一个定时器其实是由java中的timer实现的Timeout timeout this.serviceManager.newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {QueueTransferTask.this.pushTask();//定时器时间为当前zset中的最小时间QueueTransferTask.TimeoutTask currentTimeout (QueueTransferTask.TimeoutTask)QueueTransferTask.this.lastTimeout.get();if (currentTimeout.getTask() timeout) {QueueTransferTask.this.lastTimeout.compareAndSet(currentTimeout, (Object)null);}}}, delay, TimeUnit.MILLISECONDS);if (!this.lastTimeout.compareAndSet(oldTimeout, new QueueTransferTask.TimeoutTask(startTime, timeout))) {timeout.cancel();}} else {this.pushTask();}}}
其实就是创建一个定时器定时器为当前zset中的最小时间当定时任务到达时执行 QueueTransferTask.this.pushTask()方法。
最终执行的其实就是前面构造函数中的pushTaskAsync方法里面其实就是一段lua脚本
local expiredValues redis.call(zrangebyscore, KEYS[2], 0, ARGV[1], limit, 0, ARGV[2]); if #expiredValues 0 then for i, v in ipairs(expiredValues) do local randomId, value struct.unpack(Bc0Lc0, v);redis.call(rpush, KEYS[1], value);redis.call(lrem, KEYS[3], 1, v);end; redis.call(zrem, KEYS[2], unpack(expiredValues));end; local v redis.call(zrange, KEYS[2], 0, 0, WITHSCORES); if v[1] ~ nil then return v[2]; end return nil;
这个逻辑是核心的转移逻辑就是前面说的取出zset前面100条数据,如果任务到期便转移到阻塞队列中并且返回下一次任务的执行时间。
4.5 基于RocketMq的实现方式
RocketMQ 本身不直接支持延时消息队列但是可以通过特定的设置来实现类似的功能。在 RocketMQ 中消息的延时级别可以在发送消息时通过设置 delayLevel来实现delayLevel 是一个整数表示消息延时级别级别越高延时越大。RocketMQ 默认定义了 18 个延时级别级别 1 表示 1s 延时级别 2 表示 5s 延时依此类推级别 18 表示 18levels 延时level 是自定义的延时系数默认是 1000 毫秒。在rocketmq5.0中也支持了自定义任务执行时间的延迟队列。它本质上还是通过时间轮来实现的。
5.总结
可以看出延迟队列主要包括两个部分分别是存储任务的数据结构(可以是内存队列redis数据库mq等)还有就是需要线程来推送扫描队列中的任务。万变不离其宗。