自己如何建设网站,最近韩国电影片免费韩国在线观看,成全视频免费观看在线看古装电视剧,网页游戏服务端背景前段时间有个小项目需要使用延迟任务#xff0c;谈到延迟任务#xff0c;我脑子第一时间一闪而过的就是使用消息队列来做#xff0c;比如RabbitMQ的死信队列又或者RocketMQ的延迟队列#xff0c;但是奈何这是一个小项目#xff0c;并没有引入MQ#xff0c;我也不太想…背景前段时间有个小项目需要使用延迟任务谈到延迟任务我脑子第一时间一闪而过的就是使用消息队列来做比如RabbitMQ的死信队列又或者RocketMQ的延迟队列但是奈何这是一个小项目并没有引入MQ我也不太想因为一个延迟任务就引入MQ增加系统复杂度所以这个方案直接就被pass了。虽然基于MQ这个方式走不通了但是这个项目中使用到Redis所以我就想是否能够使用Redis来代替MQ实现延迟队列的功能于是我就查了一下有没有现成可用的方案别说还真给我查到了两种方案并且我还仔细研究对比了这两个方案发现要想很好的实现延迟队列并不简单。监听过期key基于监听过期key的方式来实现延迟队列是我查到的第一个方案为了弄懂这个方案实现的细节我还特地去扒了扒官网还真有所收获1、Redis发布订阅模式一谈到发布订阅模式其实一想到的就是MQ只不过Redis也实现了一套并且跟MQ贼像如图图中的channel的概念跟MQ中的topic的概念差不多你可以把channel理解成MQ中的topic。生产者在消息发送时需要到指定发送到哪个channel上消费者订阅这个channel就能获取到消息。2、keyspace notifications在Redis中有很多默认的channel只不过向这些channel发送消息的生产者不是我们写的代码而是Redis本身。当消费者监听这些channel时就可以感知到Redis中数据的变化。这个功能Redis官方称为keyspace notifications字面意思就是键空间通知。这些默认的channel被分为两类以__keyspacedb__:为前缀后面跟的是key的名称表示监听跟这个key有关的事件。举个例子现在有个消费者监听了__keyspace0__:sanyou这个channelsanyou就是Redis中的一个普通key那么当sanyou这个key被删除或者发生了其它事件那么消费者就会收到sanyou这个key删除或者其它事件的消息以__keyeventdb__:为前缀后面跟的是消息事件类型表示监听某个事件同样举个例子现在有个消费者监听了__keyevent0__:expired这个channel代表了监听key的过期事件。那么当某个Redis的key过期了expired那么消费者就能收到这个key过期的消息。如果把expired换成del那么监听的就是删除事件。具体支持哪些事件可从官网查。上述db是指具体的数据库Redis不是默认分为16个库么序号从0-15所以db就是0到15的数字示例中的0就是指0对应的数据库。3、延迟队列实现原理通过对上面的两个概念了解之后应该就对监听过期key的实现原理一目了然了其实就是当这个key过期之后Redis会发布一个key过期的事件到__keyeventdb__:expired这个channel只要我们的服务监听这个channel那么就能知道过期的Key从而就算实现了延迟队列功能。所以这种方式实现延迟队列就只需要两步发送延迟任务key是延迟消息本身过期时间就是延迟时间监听__keyeventdb__:expired这个channel处理延迟任务4、demo好了基本概念和核心原理都说完了之后又到了show me the code环节。好巧不巧Spring已经实现了监听__keyevent*__:expired这个channel这个功能__keyevent*__:expired中的*代表通配符的意思监听所有的数据库。所以demo写起来就很简单了只需3步即可引入pomdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactIdversion2.2.5.RELEASE/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactIdversion2.2.5.RELEASE/version/dependency配置类Configurationpublic class RedisConfiguration {Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer redisMessageListenerContainer new RedisMessageListenerContainer();redisMessageListenerContainer.setConnectionFactory(connectionFactory);return redisMessageListenerContainer;}Beanpublic KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {return new KeyExpirationEventMessageListener(redisMessageListenerContainer);}}KeyExpirationEventMessageListener实现了对__keyevent*__:expiredchannel的监听当KeyExpirationEventMessageListener收到Redis发布的过期Key的消息的时候会发布RedisKeyExpiredEvent事件所以我们只需要监听RedisKeyExpiredEvent事件就可以拿到过期消息的Key也就是延迟消息。对RedisKeyExpiredEvent事件的监听实现MyRedisKeyExpiredEventListenerComponent
public class MyRedisKeyExpiredEventListener implements ApplicationListenerRedisKeyExpiredEvent {Overridepublic void onApplicationEvent(RedisKeyExpiredEvent event) {byte[] body event.getSource();System.out.println(获取到延迟消息 new String(body));}}整个工程目录也简单代码写好启动应用之后我直接通过Redis命令设置消息就没通过代码发送消息了消息的key为sanyou值为task值不重要过期时间为5sset sanyou task
expire sanyou 5
如果上面都理论都正确不出意外的话5s后MyRedisKeyExpiredEventListener应该可以监听到sanyou这个key过期的消息也就相当于拿到了延迟任务控制台会打印出获取到延迟消息sanyou。于是我满怀希望静静地等待了5s。。5、4、3、2、1时间一到我查看控制台但是控制台并没有按照预期打印出上面那句话。为什么会没打印出难道是代码写错了正当我准备检查代码的时候官网的一段话道出了真实原因。我给大家翻译一下上面这段话讲的内容。上面这段话主要讨论的是key过期事件的时效性问题首先提到了Redis过期key的两种清除策略就是面试八股文常背的两种惰性清除。当这个key过期之后访问时这个Key才会被清除定时清除。后台会定期检查一部分key如果有key过期了就会被清除再后面那段话是核心意思是说key的过期事件发布时机并不是当这个key的过期时间到了之后就发布而是这个key在Redis中被清理之后也就是真正被删除之后才会发布。到这我终于明白了上面的例子中即使我设置了5s的过期时间但是当5s过去之后只要两种清除策略都不满足没人访问sanyou这个key后台的定时清理的任务也没扫描到sanyou这个key那么就不会发布key过期的事件自然而然也就监听不到了。至于后台的定时清理的任务什么时候能扫到这个没有固定时间可能一到过期时间就被扫到也可能等一定时间才会被扫到这就可能会造成了客户端从发布到监听到的消息时间差会大于等于过期时间从而造成一定时间消息的延迟这就着实有点坑了。。5、坑除了上面测试demo的时候遇到的坑之外在我深入研究之后还发现了一些更离谱的坑。丢消息太频繁Redis的丢消息跟MQ不一样因为MQ都会有消息的持久化机制可能只有当机器宕机了才会丢点消息但是Redis丢消息就很离谱比如说你的服务在重启的时候就消息会丢消息。Redis实现的发布订阅模式消息是没有持久化机制当消息发布到某个channel之后如果没有客户端订阅这个channel那么这个消息就丢了并不会像MQ一样进行持久化等有消费者订阅的时候再给消费者消费。所以说假设服务重启期间某个生产者或者是Redis本身发布了一条消息到某个channel由于服务重启没有监听这个channel那么这个消息自然就丢了。消息消费只有广播模式Redis的发布订阅模式消息消费只有广播模式一种。所谓的广播模式就是多个消费者订阅同一个channel那么每个消费者都能消费到发布到这个channel的所有消息。如图生产者发布了一条消息内容为sanyou那么两个消费者都可以同时收到sanyou这条消息。所以如果通过监听channel来获取延迟任务那么一旦服务实例有多个的话还得保证消息不能重复处理额外地增加了代码开发量。接收到所有key的某个事件这个不属于Redis发布订阅模式的问题而是Redis本身事件通知的问题。当消费者监听了以__keyeventdb__:开头的消息那么会导致所有的key发生了事件都会被通知给消费者。举个例子某个消费者监听了__keyevent*__:expired这个channel那么只要key过期了不管这个key是张三还会李四消费者都能收到。所以如果你只想消费某一类消息的key那么还得自行加一些标记比如消息的key加个前缀消费的时候判断一下带前缀的key就是需要消费的任务。所以综上能够得出一个非常重要的结论那就是监听Redis过期Key这种方式实现延迟队列不稳定坑贼多那有没有比较靠谱的延迟队列的实现方案呢这就不得不提到我研究的第二种方案了。Redisson实现延迟队列Redisson他是Redis的儿子Redis son基于Redis实现了非常多的功能其中最常使用的就是Redis分布式锁的实现但是除了实现Redis分布式锁之外它还实现了延迟队列的功能。先来个demo后面再来说说这种实现的原理。1、demo引入pomdependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.13.1/version/dependency封装了一个RedissonDelayQueue类ComponentSlf4jpublic class RedissonDelayQueue {private RedissonClient redissonClient;private RDelayedQueueString delayQueue;private RBlockingQueueString blockingQueue;PostConstructpublic void init() {initDelayQueue();startDelayQueueConsumer();}private void initDelayQueue() {Config config new Config();SingleServerConfig serverConfig config.useSingleServer();serverConfig.setAddress(redis://localhost:6379);redissonClient Redisson.create(config);blockingQueue redissonClient.getBlockingQueue(SANYOU);delayQueue redissonClient.getDelayedQueue(blockingQueue);}private void startDelayQueueConsumer() {new Thread(() - {while (true) {try {String task blockingQueue.take();log.info(接收到延迟任务:{}, task);} catch (Exception e) {e.printStackTrace();}}}, SANYOU-Consumer).start();}public void offerTask(String task, long seconds) {log.info(添加延迟任务:{} 延迟时间:{}s, task, seconds);delayQueue.offer(task, seconds, TimeUnit.SECONDS);}}这个类在创建的时候会去初始化延迟队列创建一个RedissonClient对象之后通过RedissonClient对象获取到RDelayedQueue和RBlockingQueue对象传入的队列名字叫SANYOU这个名字无所谓。当延迟队列创建之后会开启一个延迟任务的消费线程这个线程会一直从RBlockingQueue中通过take方法阻塞获取延迟任务。添加任务的时候是通过RDelayedQueue的offer方法添加的。controller类通过接口添加任务延迟时间为5sRestController
public class RedissonDelayQueueController {Resourceprivate RedissonDelayQueue redissonDelayQueue;GetMapping(/add)public void addTask(RequestParam(task) String task) {redissonDelayQueue.offerTask(task, 5);}}启动项目在浏览器输入如下连接添加任务http://localhost:8080/add?tasksanyou静静等待5s成功获取到任务。2、实现原理如下图就是上面demo中一个延迟队列会在Redis内部使用到的channel和数据类型SANYOU前面的前缀都是固定的Redisson创建的时候会拼上前缀。redisson_delay_queue_timeout:SANYOUsorted set数据类型存放所有延迟任务按照延迟任务的到期时间戳提交任务时的时间戳 延迟时间来排序的所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务这个概念很重要redisson_delay_queue:SANYOUlist数据类型也是存放所有的任务但是研究下来发现好像没什么用。。SANYOUlist数据类型被称为目标队列这个里面存放的任务都是已经到了延迟时间的可以被消费者获取的任务所以上面demo中的RBlockingQueue的take方法是从这个目标队列中获取到任务的redisson_delay_queue_channel:SANYOU是一个channel用来通知客户端开启一个延迟任务有了这些概念之后再来看看整体的运行原理图生产者在提交任务的时候将任务放到redisson_delay_queue_timeout:SANYOU中分数就是提交任务的时间戳延迟时间就是延迟任务的到期时间戳客户端会有一个延迟任务为了区分后面我都说是客户端延迟任务。这个延迟任务会向Redis Server发送一段lua脚本Redis执行lua脚本中的命令并且是原子性的这段lua脚本主要干了两件事将到了延迟时间的任务从redisson_delay_queue_timeout:SANYOU中移除存到SANYOU这个目标队列获取到redisson_delay_queue_timeout:SANYOU中目前最早到过期时间的延迟任务的到期时间戳然后发布到redisson_delay_queue_channel:SANYOU这个channel中当客户端监听到redisson_delay_queue_channel:SANYOU这个channel的消息时会再次提交一个客户端延迟任务延迟时间就是消息最早到过期时间的延迟任务的到期时间戳- 当前时间戳这个时间其实也就是redisson_delay_queue_channel:SANYOU中最早到过期时间的任务还剩余的延迟时间。此处可以等待10s好好想想。。这样一旦时间来到了上面说的最早到过期时间任务的到期时间戳redisson_delay_queue_timeout:SANYOU中上面说的最早到过期时间的任务已经到期了客户端的延迟任务也同时到期于是开始执行lua脚本操作及时将到了延迟时间的任务放到目标队列中。然后再次发布剩余的延迟任务中最早到期的任务到期时间戳到channe中如此循环往复一直运行下去保证redisson_delay_queue_timeout:SANYOU中到期的数据能及时放到目标队列中。所以上述说了一大堆的主要的作用就是保证到了延迟时间的任务能够及时被放到目标队列。这里再补充两个特殊情况图中没有画出第一个就是如果redisson_delay_queue_timeout:SANYOU是新添加的任务队列之前有或者没有任务是队列中最早需要被执行的也会发布消息到channel之后就按时上面说的流程走了。添加任务代码如下也是通过lua脚本来的第二种特殊情况就是项目启动的时候会执行一次客户端延迟任务。项目在重启时由于没有客户端延迟任务的执行可能会出现redisson_delay_queue_timeout:SANYOU队列中有到期但是没有被放到目标队列的可能重启就执行一次就是为了保证到期的数据能被及时放到目标队列中。3、与第一种方案比较现在来比较一下第一种方案和Redisson的这种方案看看有没有第一种方案的那些坑。第一个任务延迟的问题Redisson方案理论上是没有延迟的但是当消息数量增加消费者消费缓慢这个情况下可能会导致延迟任务消费的延迟。第二个丢消息的问题Redisson方案很大程度上减轻了丢消息的可能性因为所有的任务都是存在list和sorted set两种数据类型中Redis有持久化机制就算Redis宕机了也就可能会丢一点点数据。第三个广播消费任务的问题这个是不会出现的因为每个客户端都是从同一个目标队列中获取任务的。第四个问题是Redis内部channel发布事件的问题跟这种方案不沾边就更不可能存在了。所以通过上面的对比可以看出Redisson这种实现方案就显得更加的靠谱了。