当前位置: 首页 > news >正文

规划馆网站建设搜索引擎优化百度

规划馆网站建设,搜索引擎优化百度,wordpress 后台忘了,h5网站开发用什么软件制作redis stream是redis5引入的特性#xff0c;一定程度上借鉴了kafka等MQ的设计#xff0c;部署的redis版本必须 5 本文主要讲的是思路#xff0c;结合简单的源码分析#xff08;放心#xff0c;无需深入大量源码#xff09;#xff1b;讲述在redis stream文档缺乏一定程度上借鉴了kafka等MQ的设计部署的redis版本必须 5 本文主要讲的是思路结合简单的源码分析放心无需深入大量源码讲述在redis stream文档缺乏网上资料欠缺gpt回答不上来的情况下博主是如何用两三天的时间 从没接触过redis stream 到分析完成了redis stream mq功能 。博主始终认为 有明确的思路 才能知道什么代码是正确的 能复制拿来用什么代码只是单纯跑起来demo的 绝对达不到生产级别。 本文源自csdn博主孟秋与你 博主虽才疏学浅 却也是在资料极少的情况下 辛苦研究源码、整理思路 撰写的本文转载请声明出处。 文章目录 redisTemplate API的熟悉配置redis mq config监听器定时器 优化方向 (本文基于springboot3.3 jdk17 redis6环境, 理论上springboot2 redis5也是通用教程 可能会有细微的api差异 稍微分析一下源码方法都能处理) redisTemplate API的熟悉 我们在操作redis的时候 通常是使用spring-data-redis提供的redisTemplate或者jedis 本文以redisTemplate为例。 实际业务场景可能需要考虑用jedis替换 因为mq通常在数据量、并发量都大的场景redisTemplate的优势在于和springboot的完美集成且不需要考虑通过连接池来管理线程安全问题 用过redisTemplate的同学应该都会自己封装一下工具类因为redisTemplate封装的不够好不管怎么样 我们都需要先看看这个类 redisTemplate.opsForHash()redisTemplate.opsForValue() 各位应该很熟悉了, stream是一种新引入的格式那么我们直接在RedisTemplate类里面搜stream就好了,正常都会有对应API 没对应API那就是spring版本太老了 spring那个老版本出来的时候 redis还没出到5 搜到了opsForStream()方法 继续查看方法 如下图 这里说明一下redis的streamKey就类似mq的topic, group是消费者组,cousumer是消费者acknowledge即ack 应答机制 告诉mq已经成功消费了claim是强制将消息转至其它消费者 通常用于消费失败/多次消费失败的场景pending存放的是未ack的消息 就比如消费某个消息时 出现了异常 没能执行到ack 这些消息就会放在pending list 确保消息不丢失。 通过api加上我们掌握的mq基本知识大概就能理解是怎么一回事了。demo搭建不难但是代码要上生产我们就必须考虑消息消费失败了怎么办 该如何重试也就是说重点的api在acknowledge和pending上面。 一个简单的封装 Componentpublic class RedisStreamUtil {Autowiredprivate RedisTemplateString, Object redisTemplate;/*** 创建消费组** param key 键名称* param group 组名称* return {link String}*/public String createGroup(String key, String group) {return redisTemplate.opsForStream().createGroup(key, group);}/*** 获取消费者信息** param key 键名称* param group 组名称* return {link StreamInfo.XInfoConsumers}*/public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {return redisTemplate.opsForStream().consumers(key, group);}/*** 查询组信息** param key 键名称* return*/public StreamInfo.XInfoGroups queryGroups(String key) {return redisTemplate.opsForStream().groups(key);}/*** 添加Map消息* param key* param value*/public String addMap(String key, MapString, Object value) {return redisTemplate.opsForStream().add(key, value).getValue();}/*** 读取消息* param key*/public ListMapRecordString, Object, Object read(String key) {return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));}/*** 确认消费* param key* param group* param recordIds*/public Long ack(String key, String group, String... recordIds) {return redisTemplate.opsForStream().acknowledge(key, group, recordIds);}/*** 删除消息* 当一个节点的所有消息都被删除那么该节点会自动销毁* param key* param recordIds*/public Long del(String key, String... recordIds) {return redisTemplate.opsForStream().delete(key, recordIds);}/*** 判断是否存在key* param key*/public boolean hasKey(String key) {Boolean flag redisTemplate.hasKey(key);return flag ! null flag;}} 注意会有循环依赖的问题如果没有那就是springboot版本太低低版本默认是开启允许循环依赖的高版本默认不允许(2.7已经不允许了 具体版本不记得了) 解决方法1 在yml配置里面允许循环依赖 server:port: 8586spring:application:name: springboot3-demodata:redis:port: 6579host: 192.168.1.1password: xxxxxxxdatabase: 1lettuce:pool:max-wait: 5000msmax-active: 1000datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?characterEncodingutf8serverTimezoneUTCrewriteBatchedStatementstruetype: com.alibaba.druid.pool.DruidDataSourceusername: rootpassword: root # 允许循环依赖main:allow-circular-references: true 解决方法2该工具类不交给spring托管 代码如下图所示 在spring bean初始化的时候 把redisTemplate bean赋值到工具类即可工具类方法变成静态方法 配置 redis mq config 以下代码展示了如何配置多个生产者也是这个代码最难写, 尤其是Subscription的创建 不能用spring官方文档里面提供的demo package com.qiuhuanhen.springboot3demo.redis.config;import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.StrUtil; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.qiuhuanhen.springboot3demo.redis.RedisStreamUtil; import com.qiuhuanhen.springboot3demo.redis.consumer.RedisConsumer; import com.qiuhuanhen.springboot3demo.redis.consumer.listener.RedisConsumersListener; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisServerCommands; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.data.redis.stream.Subscription;import javax.annotation.Resource; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ThreadPoolExecutor;Configuration Slf4j public class RedisConfig{Autowiredprivate RedisStreamUtil redisStreamUtil;Autowiredprivate ThreadPoolExecutor threadPoolExecutor;Autowiredprivate MapString, RedisConsumer redisConsumer;/*** redis序列化** param redisConnectionFactory* return {code RedisTemplateString, Object}*/Beanpublic RedisTemplateString, Object redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplateString, Object template new RedisTemplate();template.setConnectionFactory(redisConnectionFactory);ObjectMapper om new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.activateDefaultTyping(om.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);Jackson2JsonRedisSerializer jackson2JsonRedisSerializer new Jackson2JsonRedisSerializer(om,Object.class);StringRedisSerializer stringRedisSerializer new StringRedisSerializer();template.setKeySerializer(stringRedisSerializer);template.setHashKeySerializer(stringRedisSerializer);template.setValueSerializer(jackson2JsonRedisSerializer);template.setHashValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}Beanpublic ListSubscription subscriptions(RedisConnectionFactory factory) {ListSubscription subscriptions new ArrayList();subscriptions.add( createSubscription(factory, orderStream, orderGroup, orderConsumer));subscriptions.add( createSubscription(factory, productStream, productGroup, productConsumer));return subscriptions;}/*** param factory* param streamName 类似 topic* param groupName 消费组是 Redis Streams 中的一个重要特性它允许多个消费者协作消费同一个流中的消息。每个消费组可以有多个消费者。* param consumerName 这是消费组中的具体消费者名称。每个消费者会从消费组中领取消息进行处理。* return*/private Subscription createSubscription(RedisConnectionFactory factory, String streamName, String groupName, String consumerName) {initStream(streamName, groupName);StreamMessageListenerContainer.StreamMessageListenerContainerOptionsString, MapRecordString, String, String options StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 每次从Redis Stream中读取消息的最大条数 (32为rocketmq的pullBatchSize默认数量).batchSize(32).executor(threadPoolExecutor)// 轮询拉取消息的时间 (如果流中没有消息它会等待这么久的时间然后再次检查。).pollTimeout(Duration.ofSeconds(1)).errorHandler(throwable - {log.error([redis MQ handler exception], throwable);throwable.printStackTrace();}).build();var listenerContainer StreamMessageListenerContainer.create(factory, options);// 手动ask消息 // Subscription subscription listenerContainer.receive(Consumer.from(groupName, consumerName), // // 创建一个流的偏移量实例。 含义: 指定从哪个偏移量开始读取消息。ReadOffset.lastConsumed()表示从上次消费的位置开始。 // StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);// 自动ask消息 // Subscription subscription listenerContainer.receiveAutoAck(Consumer.from(groupName, consumerName), // StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);// 手动创建 核心在于 cancelOnError(t - false) 出现异常不退出StreamMessageListenerContainer.ConsumerStreamReadRequestString build StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(streamName, ReadOffset.lastConsumed())).consumer(Consumer.from(groupName, consumerName)).autoAcknowledge(false)// 重要.cancelOnError(t - false).build();Subscription subscription listenerContainer.register(build, new RedisConsumersListener(redisStreamUtil));listenerContainer.start();return subscription;}/*** 初始化流 保证stream流程是正常的** param key* param group*/private void initStream(String key, String group) {boolean hasKey redisStreamUtil.hasKey(key);if (!hasKey) {MapString, Object map new HashMap(1);map.put(author, mengQiu);//创建主题String result redisStreamUtil.addMap(key, map);//创建消费组redisStreamUtil.createGroup(key, group);//将初始化的值删除掉redisStreamUtil.del(key, result);log.info(stream:{}-group:{} initialize success, key, group);}}/*** 校验 Redis 版本号是否满足最低的版本号要求 可自行使用*/private static void checkRedisVersion(RedisTemplateString, ? redisTemplate) {// 获得 Redis 版本Properties info redisTemplate.execute((RedisCallbackProperties) RedisServerCommands::info);String version MapUtil.getStr(info, redis_version);// 校验最低版本必须大于等于 5.0.0int majorVersion Integer.parseInt(StrUtil.subBefore(version, ., false));if (majorVersion 5) {throw new IllegalStateException(StrUtil.format(您当前的 Redis 版本为 {}小于最低要求的 5.0.0 版本, version));}} } 我们简单阐述一下上面代码中的initStream方法 private void initStream(String key, String group) {boolean hasKey redisStreamUtil.hasKey(key);if (!hasKey) {MapString, Object map new HashMap(1);// 先创建一个keymap.put(author, mengQiu);//创建主题String result redisStreamUtil.addMap(key, map);//创建消费组redisStreamUtil.createGroup(key, group);//再将初始化的值删除掉redisStreamUtil.del(key, result);log.info(stream:{}-group:{} initialize success, key, group);}} 先创建了一对K-V 接着创建了一个消费组再把K-V删除剩下的就是消费组了。因为我们在createSubscription的时候声明了消费组redis stream mq机制如此 如果redis里面没有消费组会直接报错消费组不存在 而不会自动创建 与rocketMq类似 那么有同学可能会问 直接createGroup不行吗第一次创建当然是没问题的但是后面项目再启动时 就会报错group已存在 聪明的你可能会有疑惑那先查询组是否存在 再创建不行吗 我们来看看redisTemplate的api: redisTemplate.opsForStream().groups(key) 这个是查询消费组信息的api, 如果消费组不存在会直接报错该消费组不存在。 所以initSream方法是一个小技巧有点类似于卡bug。 当然 如果硬要只使用createGroup方法也不是不可以加个try catch就好了但这就相当于除了第一次初始化之外之后每次启动项目 其实都会发生一次异常。 监听器 核心是实现StreamListener接口 Slf4j public class RedisConsumersListener implements StreamListenerString, MapRecordString, String, String {private RedisStreamUtil redisStreamUtil;public RedisConsumersListener(RedisStreamUtil redisStreamUtil) {this.redisStreamUtil redisStreamUtil;}/*** 监听器** param message*/Overridepublic void onMessage(MapRecordString, String, String message) {// stream的key值String streamName message.getStream();//消息IDRecordId recordId message.getId();//消息内容MapString, String msg message.getValue();// do something 处理 (这里一般是通过设计模式获取实现类方法 统一处理)//逻辑处理完成后ack消息删除消息group为消费组名称StreamInfo.XInfoGroups xInfoGroups redisStreamUtil.queryGroups(streamName);xInfoGroups.forEach(xInfoGroup - redisStreamUtil.ack(streamName, xInfoGroup.groupName(), recordId.getValue()));redisStreamUtil.del(streamName, recordId.getValue());}log.info(【streamName】 streamName ,【recordId】 recordId ,【msg】 msg);} } 感兴趣可以看博主踩到的坑, 看完思路才能自行判断 代码是否能直接复制使用 个人感觉这才是分析技术最精彩的地方 有正确的思路才能在使用新技术时披荆斩棘; 不感兴趣可以直接跳到下一目录 踩坑start 一开始使用的是receive方法 被注释的部分 // 手动ask消息 // Subscription subscription listenerContainer.receive(Consumer.from(groupName, consumerName), // // 创建一个流的偏移量实例。 含义: 指定从哪个偏移量开始读取消息。ReadOffset.lastConsumed()表示从上次消费的位置开始。 // StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);这也是网上使用最多的方法因为spring给的文档demo就是这么创建的,但是它非常坑 通过方法名我们可以判断出 receiveAutoAck是会自动ack的不出异常还好那如果出现异常呢 如何ack 所以我们肯定是要手动控制的。 我们可以看看源码 它们的差异 是的就是一个是否自动ack的差别。 既然引入了消息队列那说明数据量是比较大的所以肯定是需要考虑异常情况下 消息不能丢失的于是博主在消费时故意编写了异常模拟不触发ack的场景. 结果发现 一旦消费出现异常 没有ack时pending list不再新增数据在项目重启后数据又增加了但是再次消息异常时 pending list又阻塞了这种现象非常奇怪 难道一个消息没ack redis stream就阻塞吗这显然不符合设计。 反复思考后看起来像是出现异常后就停止了轮询这个mq就像极了是一次性的。 但是和轮询相关的 也就一个pollTimeout参数它能掀起多大的火花呢 于是继续看代码 配置redis mq时都有哪些api. 使用receive方法后 返回的是一个Subscription Subscription类有isActive()方法 于是在定时器中打印subsciption.isActive() 发现它竟然为false 于是我们追踪这个方法 追踪到了StreamPollTask类 如果是task类 那么应该会有run方法 我们直接在里面搜run() run方法里面主要就这两个方法 this.pollState.running(); this.doLoop(); 第一个running方法 一眼看到头没什么东西 我们看doLoop() 这个方法看起来是循环执行如果任务中断了 说明是loop出问题了 里面有行代码 if (this.cancelSubscriptionOnError.test(ex)) {this.cancel();}也就是说在cancelSubscriptionOnError.test为true的时候 会取消执行 还记得isActive()方法吗 它正是去判断该状态的. 通过构造方法 可以看出 该参数是StreamMessageListenerContainer.StreamReadRequest streamRequest 传进来的 StreamMessageListenerContainer.StreamReadRequest在我们查看listenerContainer.receive源码时 有过一面之缘 我们再看看StreamReadRequest.builder出来的StreamReadRequestBuilder类 至此分析完成了闭环因为receive方法创建出来 默认是遇到异常就取消执行 这明显不符合实际使用这个设计个人感觉非常欠佳。 这便是为什么使用以下代码来创建的原因 StreamMessageListenerContainer.ConsumerStreamReadRequestString build StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(streamName, ReadOffset.lastConsumed())).consumer(Consumer.from(groupName, consumerName)).autoAcknowledge(false)// 重要.cancelOnError(t - false).build();踩坑end 定时器 代码比较乱 注释代码比较多的原因 不是因为瞎写而是那些api 在实际业务中可能会使用到所以特地写在下面了 // 定期处理 pending list 中的消息Scheduled(cron 0/20 * * * * ?)public void processPendingMessages() {String streamKey orderStream; // Redis Stream 的键String groupName orderGroup; // 消费者组的名称String consumerName orderConsumer; // 当前消费者的名称for (Subscription each : subscription) {System.out.println(each.isActive());}StreamOperationsString, String, String streamOps redisTemplate.opsForStream();// 获取 pending list 中未确认的消息概要PendingMessagesSummary pendingSummary streamOps.pending(streamKey, groupName);// 所有pending消息的数量long totalPendingMessages pendingSummary.getTotalPendingMessages();if (pendingSummary.getTotalPendingMessages() 0L) {return;}// 消费组名称String groupName1 pendingSummary.getGroupName();// pending队列中的最小IDString minMessageId pendingSummary.minMessageId();// pending队列中的最大IDString maxMessageId pendingSummary.maxMessageId();if (pendingSummary.getTotalPendingMessages() 0) {// 读取消费者pending队列的前10条记录从ID0的记录开始一直到ID最大值 // PendingMessages pendingMessages streamOps.pending(streamKey, Consumer.from(groupName, consumerName), Range.closed(0, ), 10);// 获取 pending list 中具体的消息PendingMessages pendingMessages streamOps.pending(streamKey, groupName, Range.unbounded(), 10000);int size pendingMessages.size();// 获取当前批次的消息PendingMessage currentBatchMin pendingMessages.get(0);PendingMessage currentBatchMax pendingMessages.get(size-1);pendingMessages.forEach(pendingMessage -{// 消息被获取的次数 可以根据次数做不同业务 超过一定次数未消费 考虑是否要ack并dellong deliveryCount pendingMessage.getTotalDeliveryCount();// 读取每个未确认的消息 // ListMapRecordString,String,String messages streamOps.read( // StreamReadOptions.empty(), // StreamOffset.create(streamKey,ReadOffset.lastConsumed())StreamOffset.create(streamKey,ReadOffset.from(0)) // );ListMapRecordString, String, String messages streamOps.range(streamKey, Range.closed(currentBatchMin.getId().toString(), currentBatchMax.getId().toString()), Limit.limit().count(10000));for (MapRecordString, String, String message : messages) {try {// 处理消息processMessage(message);// 成功处理后确认消息streamOps.acknowledge(streamKey, groupName, message.getId());streamOps.delete(streamKey, message.getId());} catch (Exception e) {// 处理异常情况e.printStackTrace();}}});}}至于如何触发就比较简单了往redis添加一个streamKey即可 GetMapping(/stream)public String testStream() {String mystream ;for (int i 0; i 10; i) {Oper oper new Oper();oper.setTestId(11111111L);oper.setTestDesc(订单消息队列);oper.setVersion(i);oper.setTestXxx(LocalDateTime.now().toString());MapString, Object map new HashMap();map.put(oper, oper);try {Thread.sleep(10);mystream redisStreamUtil.addMap(orderStream, map);} catch (InterruptedException e) {throw new RuntimeException(e);}}return String.valueOf(mystream);} 优化方向 建立一个消费者抽象类定义消费方法 建议一个降级处理抽象类定义补偿方法即消费失败时的处理 定义spring的properties类 把生产者消费者字段写到里面 redis需要部署集群可在博主的主页搜索哨兵有哨兵架构教程。 实际业务中消费消息很可能是存入数据库在入库完成之后 redis ack完成之前如果这一瞬间突然宕机了而数据量又非常大可能会导致消费重复的情况因为没有完成ack 下次还是会把该数据从pending list里面取出来。 解决方案1 考虑是加redisson锁 解决方案2数据库存入消息id字段并建立唯一索引 (唯一索引的魅力体现出来了) 至此一份生产级别的redis stream mq架构成立。
http://www.hkea.cn/news/14416808/

相关文章:

  • 做室内3d设计的网站福州网吧
  • 上海市区网站设计制作公司注册公司流程图
  • 做网站就是做服务wordpress如何用
  • 怎么上线网站小说网站架构
  • 科技公司网站系统手机怎么自己设计图片
  • 慈溪建设银行支行网站做任务赚佣金的网站
  • 网站规划详细设计怎么写linux服务器wordpress建站教程视频
  • 长沙交互网站设计服务商网站开发去哪里培训
  • 网站开发工作好吗课程设计代做网站
  • 个人网站备案费用php网站开发试题及答案
  • 电子元器件网站怎么做对seo的理解
  • 电商免费网站入口网络广告推广方案
  • 怎么看网站是谁做的开发一套软件需要多少钱
  • 做网站有什么好处团购网站制作
  • 免费发布信息网有哪些网站广州互联网网站建设
  • 做做同城网站好还是做垂直网站好网站建设导入视频
  • vps主机可以做几个网站做网站的网站赚钱吗
  • 网站建设销售开场网站开发最强工具
  • 南通百度网站快速优化深圳市坪山新区建设局网站
  • 哪个网站做娱乐新手怎样做网络营销推广
  • 五华建设银行网站佛山网页设计培训
  • 怀化网站优化哪个好百度网盘资源搜索
  • 响应式网站制作流程图青岛关键词优化排名
  • 梧州网站建设公司东莞正规网页设计培训学费
  • 网站源码之家网络设计报告3000字
  • jquery 选择 网站iis7站长工具
  • 温州网页网站制作阿里云服务器一年多少钱
  • 智能手机网站模板linux tomcat 网站目录
  • 做网站推广要会什么湖北高端网站建设价格
  • 找网站建设公司需要注意什么怎么做动态的实时更新的网站