2小时wordpress建站,wordpress 云备份,天眼查公司注册查询官网,上海网站备案信息注意#xff1a;轻量级队列可以使用工具类#xff0c;重量级数据量 请使用 MQ
本文章基于redis使用redisson客户端实现轻量级队列#xff0c;以及代码、执行结果演示
一、常见队列了解 普通队列#xff1a;先进先出#xff08;FIFO#xff09;#xff0c;只能在一端添…注意轻量级队列可以使用工具类重量级数据量 请使用 MQ
本文章基于redis使用redisson客户端实现轻量级队列以及代码、执行结果演示
一、常见队列了解 普通队列先进先出FIFO只能在一端添加元素在另一端移除元素。循环队列利用数组和取模运算实现队尾连接队首。双端队列两端都可以添加和移除元素。优先级队列根据元素的优先级顺序处理元素。阻塞队列在多线程中使用队空时取元素会等待队满时加元素会等待。有限队列队列长度固定队满时新元素加入会导致队头元素自动移除。 二、工具类 基于redisson 实现的分布式工具类copy即用 NoArgsConstructor(access AccessLevel.PRIVATE)
public class QueueUtils {private static final RedissonClient CLIENT SpringUtils.getBean(RedissonClient.class);/*** 获取客户端实例*/public static RedissonClient getClient() {return CLIENT;}/*** 添加普通队列数据** param queueName 队列名* param data 数据*/public static T boolean addQueueObject(String queueName, T data) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);return queue.offer(data);}/*** 通用获取一个队列数据 没有数据返回 null(不支持延迟队列)** param queueName 队列名*/public static T T getQueueObject(String queueName) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);return queue.poll();}/*** 通用删除队列数据(不支持延迟队列)*/public static T boolean removeQueueObject(String queueName, T data) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);return queue.remove(data);}/*** 通用销毁队列 所有阻塞监听 报错(不支持延迟队列)*/public static T boolean destroyQueue(String queueName) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);return queue.delete();}/*** 添加延迟队列数据 默认毫秒** param queueName 队列名* param data 数据* param time 延迟时间*/public static T void addDelayedQueueObject(String queueName, T data, long time) {addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);}/*** 添加延迟队列数据** param queueName 队列名* param data 数据* param time 延迟时间* param timeUnit 单位*/public static T void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);RDelayedQueueT delayedQueue CLIENT.getDelayedQueue(queue);delayedQueue.offer(data, time, timeUnit);}/*** 获取一个延迟队列数据 没有数据返回 null** param queueName 队列名*/public static T T getDelayedQueueObject(String queueName) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);RDelayedQueueT delayedQueue CLIENT.getDelayedQueue(queue);return delayedQueue.poll();}/*** 删除延迟队列数据*/public static T boolean removeDelayedQueueObject(String queueName, T data) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);RDelayedQueueT delayedQueue CLIENT.getDelayedQueue(queue);return delayedQueue.remove(data);}/*** 销毁延迟队列 所有阻塞监听 报错*/public static T void destroyDelayedQueue(String queueName) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);RDelayedQueueT delayedQueue CLIENT.getDelayedQueue(queue);delayedQueue.destroy();}/*** 添加优先队列数据** param queueName 队列名* param data 数据*/public static T boolean addPriorityQueueObject(String queueName, T data) {RPriorityBlockingQueueT priorityBlockingQueue CLIENT.getPriorityBlockingQueue(queueName);return priorityBlockingQueue.offer(data);}/*** 优先队列获取一个队列数据 没有数据返回 null(不支持延迟队列)** param queueName 队列名*/public static T T getPriorityQueueObject(String queueName) {RPriorityBlockingQueueT queue CLIENT.getPriorityBlockingQueue(queueName);return queue.poll();}/*** 优先队列删除队列数据(不支持延迟队列)*/public static T boolean removePriorityQueueObject(String queueName, T data) {RPriorityBlockingQueueT queue CLIENT.getPriorityBlockingQueue(queueName);return queue.remove(data);}/*** 优先队列销毁队列 所有阻塞监听 报错(不支持延迟队列)*/public static T boolean destroyPriorityQueue(String queueName) {RPriorityBlockingQueueT queue CLIENT.getPriorityBlockingQueue(queueName);return queue.delete();}/*** 尝试设置 有界队列 容量 用于限制数量** param queueName 队列名* param capacity 容量*/public static T boolean trySetBoundedQueueCapacity(String queueName, int capacity) {RBoundedBlockingQueueT boundedBlockingQueue CLIENT.getBoundedBlockingQueue(queueName);return boundedBlockingQueue.trySetCapacity(capacity);}/*** 尝试设置 有界队列 容量 用于限制数量** param queueName 队列名* param capacity 容量* param destroy 已存在是否销毁*/public static T boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {RBoundedBlockingQueueT boundedBlockingQueue CLIENT.getBoundedBlockingQueue(queueName);if (boundedBlockingQueue.isExists() destroy) {destroyQueue(queueName);}return boundedBlockingQueue.trySetCapacity(capacity);}/*** 添加有界队列数据** param queueName 队列名* param data 数据* return 添加成功 true 已达到界限 false*/public static T boolean addBoundedQueueObject(String queueName, T data) {RBoundedBlockingQueueT boundedBlockingQueue CLIENT.getBoundedBlockingQueue(queueName);return boundedBlockingQueue.offer(data);}/*** 有界队列获取一个队列数据 没有数据返回 null(不支持延迟队列)** param queueName 队列名*/public static T T getBoundedQueueObject(String queueName) {RBoundedBlockingQueueT queue CLIENT.getBoundedBlockingQueue(queueName);return queue.poll();}/*** 有界队列删除队列数据(不支持延迟队列)*/public static T boolean removeBoundedQueueObject(String queueName, T data) {RBoundedBlockingQueueT queue CLIENT.getBoundedBlockingQueue(queueName);return queue.remove(data);}/*** 有界队列销毁队列 所有阻塞监听 报错(不支持延迟队列)*/public static T boolean destroyBoundedQueue(String queueName) {RBoundedBlockingQueueT queue CLIENT.getBoundedBlockingQueue(queueName);return queue.delete();}/*** 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等)*/public static T void subscribeBlockingQueue(String queueName, ConsumerT consumer, boolean isDelayed) {RBlockingQueueT queue CLIENT.getBlockingQueue(queueName);if (isDelayed) {// 订阅延迟队列CLIENT.getDelayedQueue(queue);}queue.subscribeOnElements(consumer);}}
三、普通队列代码测试
3.1 添加进入队列 QueueUtils.addQueueObject 方法添加数据进入队列 test RestController
SaIgnore
public class QueueTestController {GetMapping(addQueue)public void addQueue() {TestDemo testDemo new TestDemo();testDemo.setTestKey(testKey);testDemo.setCreateTime(new Date());QueueUtils.addQueueObject(test, testDemo);}
} redis中查询加入队列数据 3.2 获取队列 获取上面添加的 test队列 数据 GetMapping(getQueue)public void getQueue() {TestDemo testDemo new TestDemo();testDemo.setTestKey(testKey);testDemo.setCreateTime(new Date());Object test QueueUtils.getQueueObject(test);Console.log(test-{}, test);} 按照先进先出的规则创建时间最早一条被获取剩下2条为后添加数据 3.3 删除数据 删除test队列数据 GetMapping(removeQueue)public RVoid removeQueue() throws ParseException {TestDemo testDemo new TestDemo();SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);testDemo.setCreateTime(simpleDateFormat.parse(2024-09-04 10:40:53));testDemo.setTestKey(testKey);boolean test QueueUtils.removeQueueObject(test, testDemo);return R.ok(test ? 成功:失败);} 如上代码删除时间为2024-09-04 10:40:53 这条数据剩下一条 3.4 销毁队列 GetMapping(destoryQueue)public RVoid destoryQueue() throws ParseException {boolean test QueueUtils.destroyQueue(test);return R.ok(test ? 成功:失败);} 如图销毁队列后刷新则提示键不存在 3.5 订阅队列 开启订阅队列 一般是在程序启动时候开启比如使用 PostConstruct 注解 或者实现 ApplicationRunner 接口来实现 Component
public class RuoYiSubcribeInitializer implements ApplicationRunner {Overridepublic void run(ApplicationArguments args) throws Exception {QueueUtils.subscribeBlockingQueue(test,(ConsumerTestDemo) testDemo -{Console.log(testDemo-{}, testDemo);},false);}
} PostConstructpublic void subscribeQueue() {QueueUtils.subscribeBlockingQueue(test,(ConsumerTestDemo) testDemo -{Console.log(testDemo-{}, testDemo);},false);} 每次执行添加操作时候订阅队列都会获取到数据 订阅队列会监听队列数据知道队列数据为空 2024-09-04 11:37:49 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]开始请求 URL[GET /addQueue],无参数
testDemo-TestDemo(idnull, deptIdnull, userIdnull, orderNumnull, testKeytestKey, valuenull, versionnull, delFlagnull)
2024-09-04 11:37:49 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]结束请求 URL[GET /addQueue],耗时:[15]毫秒
2024-09-04 11:38:12 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]开始请求 URL[GET /addQueue],无参数
2024-09-04 11:38:12 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]结束请求 URL[GET /addQueue],耗时:[13]毫秒
testDemo-TestDemo(idnull, deptIdnull, userIdnull, orderNumnull, testKeytestKey, valuenull, versionnull, delFlagnull)
四、有界队列代码测试
4.1设置队列容量 QueueUtils.trySetBoundedQueueCapacity(test, 5); 设置队列容量 GetMapping(addBoundedQueue)public void addBoundedQueue() {//销毁队列QueueUtils.destroyQueue(test);//设置队列容量QueueUtils.trySetBoundedQueueCapacity(test, 5);} 如下图bps则是记录容量 4.2 添加队列 GetMapping(addBounded)public RVoid addBounded() {//设置队列容量boolean test QueueUtils.addBoundedQueueObject(test, vlue111);return R.ok(test ? 成功:失败);} 如果未设置容量添加失败超出容量添加也会失败 4.3获取数据
获取队列数据会同时改变容量大小 getBoundedQueueObject会正确计算容量的大小。getQueueObject 获取导数据容量会为0.后面无法添加 GetMapping(getBounded)public RVoid getBounded() {//设置队列容量Object test QueueUtils.getBoundedQueueObject(test);return R.ok(test.toString());} 底层逻辑如果取出一个数据容量则会加 1 {code: 200,msg: vlue111,data: null
}
Response body is emptyResponse code: 200 (OK); Time: 26ms (26 ms); Content length: 40 bytes (40 B)
五、延时队列代码测试
5.1 延时队列数据流转流程 延时队列数据到期后会存入到普通队列如下图流程 -------------------
| 添加任务到 |
| 延时队列 |---------------------------------------
------------------- | |v v
-------------------
| 定时检查到期 |
| 任务 | 获取数据
-------------------| v |
-------------------
| 延时队列 |---------------------------------------
| - 普通队列 |
-------------------所以拿数据是从延时队列拿数据还是从普通队列拿数据考虑下业务场景 5.2 脚本的实现过程
简单了解地底层 struct.pack(dLc0, tonumber(ARGV[1]), string.len(ARGV[2]), ARGV[2])将过期时间、对象长度和对象本身打包成一个二进制字符串便于在 Redis 中存储。redis.call(zadd, KEYS[2], ARGV[1], value)将打包后的值 value 添加到有序集合延时队列中其中 ARGV[1] 是过期时间。redis.call(rpush, KEYS[3], value)将打包后的值 value 添加到列表待处理队列中。local v redis.call(zrange, KEYS[2], 0, 0)获取有序集合的第一个元素。if v[1] value then redis.call(publish, KEYS[4], ARGV[1]) end如果添加的新元素是有序集合的第一个元素则通过 Redis 的发布订阅机制通知其他消费者。 Lua 脚本local value struct.pack(dLc0, tonumber(ARGV[1]), string.len(ARGV[2]), ARGV[2]);
redis.call(zadd, KEYS[2], ARGV[1], value);
redis.call(rpush, KEYS[3], value);
local v redis.call(zrange, KEYS[2], 0, 0);
if v[1] value thenredis.call(publish, KEYS[4], ARGV[1]);
end;5.3 测试延时队列 场景 用于不是立即执行的任务场景比如用户创建订单但是不付款时间到后取消订单 如图先订阅队列 test手动开启
/*** 简述订阅延时队列* 详细描述* author syf* date 2024/9/6 14:53*/GetMapping(subscribeDelayQueue)public RVoid subscribeDelayQueue() {Console.log(开启监听。。。。。。。。。。。。。。。。。。。。。。。。。。。);QueueUtils.subscribeBlockingQueue(test,(ConsumerTestDemo) testDemo -{Console.log(接受到订单-{}, testDemo);Console.log(关闭订单);},false);return R.ok();} 添加延时队列到test GetMapping(addDelayQueue)public void addDelayQueue() throws ParseException {Console.log(创建订单。。。。。。。。。。。。。。。。。。。);TestDemo testDemo new TestDemo();testDemo.setValue(订单编号);QueueUtils.addDelayedQueueObject(test, testDemo, 10, TimeUnit.SECONDS);Console.log(等待10秒。。。。。。。。。。。。。。。。。。。);} 如图 10秒后订阅队列监听到订单并关闭 开启监听。。。。。。。。。。。。。。。。。。。。。。。。。。。
2024-09-05 19:47:51 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]结束请求 URL[GET /subscribeDelayQueue],耗时:[51]毫秒
2024-09-05 19:47:54 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]开始请求 URL[GET /addDelayQueue],无参数
创建订单。。。。。。。。。。。。。。。。。。。
等待10秒。。。。。。。。。。。。。。。。。。。
2024-09-05 19:47:54 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]结束请求 URL[GET /addDelayQueue],耗时:[57]毫秒
接受到订单-TestDemo(idnull, deptIdnull, userIdnull, orderNumnull, testKeynull, value订单编号, versionnull, delFlagnull)
关闭订单
六、优先队列代码测试 场景 vip 用户按照OrderNum随机生成等级进行排队 添加vip用户进入队列 插入数据时候会按照OrderNum 大小找到位置就像list索引一样 /*** 添加队列数据** param queueName 队列名*/GetMapping(/add)public RVoid add(String queueName) {// 用完了一定要销毁 否则会一直存在boolean b QueueUtils.destroyPriorityQueue(queueName);log.info(通道: {} , 删除: {}, queueName, b);for (int i 0; i 10; i) {int randomNum RandomUtil.randomInt(10);PriorityDemo data new PriorityDemo();data.setName(data- i);data.setOrderNum(randomNum);if (QueueUtils.addPriorityQueueObject(queueName, data)) {log.info(通道: {} , 发送数据: {}, queueName, data);} else {log.info(通道: {} , 发送数据: {}, 发送失败, queueName, data);}}return R.ok(操作成功);}
按照等级获取vip用户
GetMapping(/get)public RVoid get(String queueName) {PriorityDemo data;do {data QueueUtils.getPriorityQueueObject(queueName);log.info(通道: {} , 获取数据: {}, queueName, data);} while (data ! null);return R.ok(操作成功);} 如图orderNum从 0 到7依次被打印 2024-09-06 11:06:57 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]结束请求 URL[GET /demo/queue/priority/get],耗时:[11]毫秒
2024-09-06 11:07:50 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]开始请求 URL[GET /demo/queue/priority/get],参数类型[param],参数:[{queueName:[test]}]
2024-09-06 11:07:50 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(namedata-9, orderNum0)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(namedata-1, orderNum2)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(namedata-2, orderNum2)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(namedata-3, orderNum3)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(namedata-4, orderNum3)
2024-09-06 11:07:50 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(namedata-8, orderNum3)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(namedata-0, orderNum5)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(namedata-7, orderNum6)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(namedata-5, orderNum7)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: PriorityDemo(namedata-6, orderNum7)
2024-09-06 11:07:51 [XNIO-1 task-1] INFO c.r.d.c.q.PriorityQueueController- 通道: test , 获取数据: null
2024-09-06 11:07:51 [XNIO-1 task-1] INFO c.r.f.i.PlusWebInvokeTimeInterceptor- [PLUS]结束请求 URL[GET /demo/queue/priority/get],耗时:[488]毫秒博主精心整理专栏CV大法即可用感谢您小手点一点 手动跪拜 1- SpringBoot框架常用配置若依代码解读 http://t.csdnimg.cn/jpsSN 2- java常用工具类整理示例演示 http://t.csdnimg.cn/gmCfJ 3- CompletableFuture 异步编排与实际代码展示 http://t.csdnimg.cn/ZuC0N 4- XXL-JOB 详细学习手把手带入门 http://t.csdnimg.cn/lyR7Y