当前位置: 首页 > news >正文

手机免费建立网站济南seo优化公司

手机免费建立网站,济南seo优化公司,wordpress 头像加速,上海嘉定区网站建设公司RabbitMQ 消息可靠保障 消息的可靠性保证生产者重连生产者确认解决思路A-确认机制解决思路B-备份交换机 MQ 服务器宕机导致消息丢失消费端消息的可靠性保障 消费端限流给消息生成唯一id 消息的可靠性保证 实际项目中 MQ 的流程一般是#xff1a;生产端把消息路由到交换机生产端把消息路由到交换机然后由交换机把消息发送到队列接着就是消费端拿到消息进行消费这三个过程都有可能造成消息的不稳定导致不可靠 生产者重连 Spring Boot 提供了一种重试机制只需要通过配置即可实现消息重试从而确保消息的可靠性这里简单介绍一下 spring:rabbitmq:listener:simple:acknowledge-mode: auto # 开启自动确认消费机制retry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待时长为5秒multiplier: 1 # 失败的等待时长倍数下次等待时长 multiplier * 上次等待时间max-attempts: 3 # 最大重试次数stateless: true # true无状态false有状态如果业务中包含事务这里改为false通过配置在消费者的方法上如果执行失败或执行异常只需要抛出异常一定要出现异常才会触发重试注意不要捕获异常 即可实现消息重试这样也可以保证消息的可靠性。 失败重试机制https://www.bilibili.com/video/BV1mN4y1Z7t9?p24spm_id_frompageDrivervd_sourcec3fc4867486b99c0b85501121f575279 生产者确认 生产者确认机制分为两部分生产者到交换机和交换机到队列的可靠性保障 解决思路A-确认机制 在生产端进行确认具体操作中会分别针对交换机和队列来确认如果没有成功发送的队列服务器上那就可以尝试重新发送 首先需要增加下列配置 spring.rabbitmq.host192.168.133.128 spring.rabbitmq.port5672 spring.rabbitmq.passwordadmin spring.rabbitmq.usernameadmin spring.rabbitmq.virtual-host/ spring.rabbitmq.listener.typesimple spring.rabbitmq.publisher-confirm-typeCORRELATED #交换机的确认 spring.rabbitmq.publisher-returnstrue #队列的确认这里 publisher-confirm-type 有三种模式可选 none表示禁用发送方确认机制correlated表示开启发送方确认机制simple表示开启发送方确认机制并支持 waitForConfirms() 和 waitForConfirmsOrDie() 的调用。 接下来增加一个Rabbit 配置 Configuration Slf4j public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {Resourceprivate RabbitTemplate rabbitTemplate;PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// 消息发送到交换机成功或失败时调用此方法log.info(confirm函数打印correlationData correlationData);log.info(confirm函数打印ack ack);log.info(confirm函数打印cause cause);}Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {// 发送队列失败时才调用此方法log.info(消息主体 new String(returnedMessage.getMessage().getBody()));log.info(应答码 returnedMessage.getReplyCode());log.info(描述 returnedMessage.getReplyText());log.info(交换机 returnedMessage.getExchange());log.info(路由key returnedMessage.getRoutingKey());} }为了方便测试定义一个交换机和队列吧并声明它们的绑定关系 public class RabbitMQConfig {//定义一个交换机已及 routingkey用来测试消息的可靠传递测试public static final String NORMAL_EXCHANGE normal.demo.exchange;public static final String NORMAL_ROUTING_KEY normal.demo.routingkey;public static final String NORMAL_QUEUE normal.demo.queue;Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}Beanpublic Queue normalQueue() {return new Queue(NORMAL_QUEUE);}Beanpublic Binding normalBinding(Qualifier(normalQueue) Queue queue,Qualifier(normalExchange) DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY);} }在 controller 层测试 RestController RequestMapping(/rabbit) public class RabbitController {Resourceprivate RabbitTemplate rabbitTemplate;GetMapping(/test)public String test(){rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY, Message Test Confirm~~);return success.;} }消息成功路由到交换机然后交换机发送到队列正常情况下是下面的输出 2024-08-17T17:08:35.10508:00 INFO 2484 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.133.128:5672] 2024-08-17T17:08:35.14908:00 INFO 2484 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#59f45950:0/SimpleConnection3542faa [delegateamqp://admin192.168.133.128:5672/, localPort63927] 2024-08-17T17:08:35.20408:00 INFO 2484 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印correlationDatanull 2024-08-17T17:08:35.20408:00 INFO 2484 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印acktrue 2024-08-17T17:08:35.20508:00 INFO 2484 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印causenull现在改一下模拟路由交换机失败的场景注意没有 RabbitMQConfig.NORMAL_EXCHANGE~,这个交换机 rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE~, RabbitMQConfig.NORMAL_ROUTING_KEY, Message Test Confirm~~);重新测试此时输出如下错误提示的还是很明显的 2024-08-17T17:11:39.98108:00 INFO 15864 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.133.128:5672] 2024-08-17T17:11:40.03708:00 INFO 15864 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#59f45950:0/SimpleConnection568384f7 [delegateamqp://admin192.168.133.128:5672/, localPort64266] 2024-08-17T17:11:40.07408:00 ERROR 15864 --- [68.133.128:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #methodchannel.close(reply-code404, reply-textNOT_FOUND - no exchange normal.demo.exchange~ in vhost /, class-id60, method-id40) 2024-08-17T17:11:40.07608:00 INFO 15864 --- [nectionFactory3] com.example.demo.config.RabbitConfig : confirm函数打印correlationDatanull 2024-08-17T17:11:40.07608:00 INFO 15864 --- [nectionFactory3] com.example.demo.config.RabbitConfig : confirm函数打印ackfalse 2024-08-17T17:11:40.07708:00 INFO 15864 --- [nectionFactory3] com.example.demo.config.RabbitConfig : confirm函数打印causechannel error; protocol method: #methodchannel.close(reply-code404, reply-textNOT_FOUND - no exchange normal.demo.exchange~ in vhost /, class-id60, method-id40)现在再模拟交换机发送消息到队列失败的场景改一下 routingKey rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY~, Message Test Confirm~~); 重新测试此时输出如下returnedMessage 方法执行了 2024-08-17T17:13:49.83508:00 INFO 12892 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.133.128:5672] 2024-08-17T17:13:49.88908:00 INFO 12892 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#59f45950:0/SimpleConnection572e41be [delegateamqp://admin192.168.133.128:5672/, localPort64519] 2024-08-17T17:13:49.93308:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 消息主体Message Test Confirm~~ 2024-08-17T17:13:49.93408:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 应答码312 2024-08-17T17:13:49.93408:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 描述NO_ROUTE 2024-08-17T17:13:49.93408:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 交换机normal.demo.exchange 2024-08-17T17:13:49.93408:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : 路由keynormal.demo.routingkey~ 2024-08-17T17:13:49.93508:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : confirm函数打印correlationDatanull 2024-08-17T17:13:49.93608:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : confirm函数打印acktrue 2024-08-17T17:13:49.93608:00 INFO 12892 --- [nectionFactory1] com.example.demo.config.RabbitConfig : confirm函数打印causenull顺便了解一下spring.rabbitmq.template.mandatory 配置 当使用 RabbitMQ 作为消息队列时spring.rabbitmq.template.mandatory 配置项用于控制 MQ 消息模板RabbitTemplate的发送行为特别是在消息路由时找不到队列queue的情况下的处理方式。 2、spring.rabbitmq.template.mandatory属性可能会返回三种值null、false、true, 3、spring.rabbitmq.template.mandatory 结果为 false 时会忽略掉spring.rabbitmq.publisher-returns 属性的值也就是 returnedMessage 方法不会执行结果为 truereturnedMessage 可以得到执行 4、spring.rabbitmq.template.mandatory 结果为null即不配置时结果由spring.rabbitmq.publisher-returns 确定 解决思路B-备份交换机 上面思路A对于消息是否成功发送到队列是通过重写 RabbitTemplate.ReturnsCallback 接口来操作的备份交换机是对这一操作的替换二选一如果即配置了 ReturnsCallback 回调 又配置了备份交换机实际运行发现备份交换机优先级更高 为目标交换机指定备份交换机当目标交换机投递失败时把消息投递至备份交换机(实际项目中很少用一般用思路A) 在上面代码的基础上改一下RabbitMQConfig 就单纯的声明队列和交换机 public class RabbitMQConfig {//定义一个正常交换机以及 routingkey用来测试消息的可靠传递测试public static final String NORMAL_EXCHANGE normal.demo.exchange;public static final String NORMAL_ROUTING_KEY normal.demo.routingkey;public static final String NORMAL_QUEUE normal.demo.queue;//定义一个备份交换机以及 队列用来测试消息的可靠传递测试public static final String NORMAL_EXCHANGE_BACKUP normal.demo.exchange.backup;public static final String NORMAL_QUEUE_BACKUP normal.demo.queue.backup; }定义一个消费者监听队列实际项目中都会有的有如下注意点 备份交换机必须定义成 FANOUT 类型声明正常交换机的时候指定一个备份交换机通过 alternate-exchange 参数代码有体现 Slf4j Component public class RabbitConsumer {RabbitListener(bindings QueueBinding(value Queue(value RabbitMQConfig.NORMAL_QUEUE),exchange Exchange(value RabbitMQConfig.NORMAL_EXCHANGE, type ExchangeTypes.DIRECT, arguments {Argument(name alternate-exchange, value RabbitMQConfig.NORMAL_EXCHANGE_BACKUP)}),key RabbitMQConfig.NORMAL_ROUTING_KEY))public void receiveA(Message message, Channel channel) throws IOException {String msg new String(message.getBody());log.info(正常队列{}收到消息{}, RabbitMQConfig.NORMAL_QUEUE, msg);}RabbitListener(bindings QueueBinding(value Queue(value RabbitMQConfig.NORMAL_QUEUE_BACKUP),exchange Exchange(value RabbitMQConfig.NORMAL_EXCHANGE_BACKUP, type ExchangeTypes.FANOUT)))public void receiveB(Message message, Channel channel) throws IOException {String msg new String(message.getBody());log.info(备份队列{}收到消息{}, RabbitMQConfig.NORMAL_QUEUE_BACKUP, msg);} }注意RabbitListener 必须先创建队列不然报错 有三个 属性 queues()、bindings()、queuesToDeclare()它们之间是互斥的。设定了queues()就不能再设定 bindings() 和 queuesToDeclare()了具体用法可以看这篇文章 接下来模拟无法到达队列(如果无法到达交换机直接报错没有后面什么事了…)测试能不能正常进入备份交换机 rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY~, Message Test Confirm~~);运行结果如下 2024-08-17T18:25:32.25308:00 INFO 22476 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印correlationDatanull 2024-08-17T18:25:32.25408:00 INFO 22476 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印acktrue 2024-08-17T18:25:32.25408:00 INFO 22476 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印causenull 2024-08-17T18:25:32.25708:00 INFO 22476 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer : 备份队列normal.demo.queue.backup收到消息Message Test Confirm~~MQ 服务器宕机导致消息丢失 这一点官方早就考虑到了现在无论是交换机还是队列消息都是默认持久化到硬盘上哪怕服务器重启也不会导致消息丢失 消息持久化 Override public void sendMessage() {// 构造消息将消息持久化Message message MessageBuilder.withBody(单程车票.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 向MQ发送消息消息内容都为消息表记录的idrabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message); }队列和交换机的持久化默认就是 true无需显示指定 Bean public Queue queue() {// 四个参数name队列名、durable持久化、 exclusive独占、autoDelete自动删除return new Queue(MESSAGE_QUEUE, true); } Bean public DirectExchange exchange() {// 四个参数name交换机名、durable持久化、autoDelete自动删除、arguments额外参数return new DirectExchange(Direct_Exchange, true, false); }消费端消息的可靠性保障 配置文件增加 spring.rabbitmq.listener.simple.acknowledge-modemanual 配置也就是下面这样 spring.rabbitmq.host192.168.133.128 spring.rabbitmq.port5672 spring.rabbitmq.passwordadmin spring.rabbitmq.usernameadmin spring.rabbitmq.virtual-host/ spring.rabbitmq.listener.typesimple spring.rabbitmq.publisher-confirm-typeCORRELATED spring.rabbitmq.publisher-returnstrue spring.rabbitmq.listener.simple.acknowledge-modemanual #把消息确认模式改成手动确认因为 RabbitMQ 客户端默认是 自动返回 ACK 确认的也就是不管是处理成功还是失败默认都按成功来处理这样就不太好所以这个配置要改成手动确认 需要提前知道个东西啥是 deliveryTag 队列的定义这里把备份交换机队列都删了哈 public class RabbitMQConfig {//定义一个正常交换机以及 routingkey用来测试消息的可靠传递测试public static final String NORMAL_EXCHANGE normal.demo.exchange;public static final String NORMAL_ROUTING_KEY normal.demo.routingkey;public static final String NORMAL_QUEUE normal.demo.queue; }消费者逻辑如下 Slf4j Component public class RabbitConsumer {RabbitListener(bindings QueueBinding(value Queue(value RabbitMQConfig.NORMAL_QUEUE),exchange Exchange(value RabbitMQConfig.NORMAL_EXCHANGE, type ExchangeTypes.DIRECT, arguments {Argument(name alternate-exchange, value RabbitMQConfig.NORMAL_EXCHANGE_BACKUP)}),key RabbitMQConfig.NORMAL_ROUTING_KEY))public void receiveA(Message message, Channel channel) throws IOException {// 消息内容String msg new String(message.getBody());// 消息的 deliveryTaglong deliveryTag message.getMessageProperties().getDeliveryTag();try {// 操作成功返回 ack 信息int i 1/0; // 模拟消息处理异常channel.basicAck(deliveryTag, false);log.info(正常队列{}收到消息{}, RabbitMQConfig.NORMAL_QUEUE, msg);} catch (Exception ex) {// 获取当前消息是否是重复投递的// true-说明消息已经重复投递过一次了false-说明当前消息是第一次投递Boolean redelivered message.getMessageProperties().getRedelivered();// 操作失败返回 Nack 信息if (redelivered) {// requeue 参数控制消息是否重新放入队列 true-重放队列broker会重新投递消息; false-不重放broker会丢弃消息channel.basicNack(deliveryTag, false, false); // basicNack(long deliveryTag, boolean multiple, boolean requeue)} else {channel.basicNack(deliveryTag, false, true);}log.info(正常队列{}收到消息,redelivered: {}但是处理异常{}, RabbitMQConfig.NORMAL_QUEUE, redelivered, msg);}// basicReject 和 basicNack 区别- 唯一的区别就是 basicNack 有批量操作控制就是 multiple 参数是否批量处理true 表示将一次性 ack 所有小于 deliveryTag 的消息// channel.basicReject(deliveryTag, false); //basicReject(long deliveryTag, boolean requeue)} }2024-08-17T19:48:38.13108:00 INFO 18192 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印correlationDatanull 2024-08-17T19:48:38.13208:00 INFO 18192 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印acktrue 2024-08-17T19:48:38.13208:00 INFO 18192 --- [nectionFactory2] com.example.demo.config.RabbitConfig : confirm函数打印causenull 2024-08-17T19:48:55.20508:00 INFO 18192 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer : 正常队列normal.demo.queue收到消息,redelivered: false但是处理异常Message Test Confirm~~ 2024-08-17T19:49:12.00708:00 INFO 18192 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer : 正常队列normal.demo.queue收到消息,redelivered: true但是处理异常Message Test Confirm~~具体效果可以本地 debug 下哈消费完并且都 ack 后这里都是0了 消费端限流 我们都知道 MQ 有削峰填谷的效果假设有下面的场景消息队列里面有10000条消息但是消费端的并发能力只有 1000为了避免一次性把这1万条消息全部取出导致消费端压力太大我们可以做个设置每次最多取1000条消息对消费端也是一种保护。 从操作层面来说也比较简单配置spring.rabbitmq.listener.simple.prefetch 属性即可 做个小实验 当不设置 prefetch 时生产者一次性向交换机中投递100条消息消费者确认消息的方式设置为手动确认在确认消息之前线程休眠 5 秒因为 rabbitmq 管理控制台数据更新是 5 秒更新一次这里设置为 5 秒比较方便观察可以观察到队列是一次性将100条数据全部发送给了消费者然后消费者再进行处理 生产者一次发送10条消息到交换机 GetMapping(/test)public String test(){for (int i 0; i 100; i) {rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE, RabbitMQConfig.NORMAL_ROUTING_KEY, Message Test Confirmi);}return success.;}消费者在确认之前休眠5秒 RabbitListener(bindings QueueBinding(value Queue(value RabbitMQConfig.NORMAL_QUEUE_BACKUP),exchange Exchange(value RabbitMQConfig.NORMAL_EXCHANGE_BACKUP, type ExchangeTypes.FANOUT))) public void receiveB(Message message, Channel channel) throws IOException {String msg new String(message.getBody());log.info(备份队列{}收到消息{}, RabbitMQConfig.NORMAL_QUEUE_BACKUP, msg); }现在增加spring.rabbitmq.listener.simple.prefetch2配置就是一次从队列中取两条数据再观察结果 会发现消费端不再是一次性全部把消息取出而是每次只取 2 个这就是 prefetch 配置作用 给消息生成唯一id 可以在消费端打印消息 id此时消息Id是 null 2024-08-18T20:43:46.29508:00 INFO 24028 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer : 消息id:null消息内容:Message Test Confirm重写 MessageConverter Bean public MessageConverter messageConverter() {// 定义消息转换器Jackson2JsonMessageConverter jjmc new Jackson2JsonMessageConverter();// 配置自动创建消息 id, 用于识别不同消息也可以在业务中基于ID判断是否重复消息jjmc.setCreateMessageIds(true);return jjmc; }再次打印 2024-08-18T21:01:20.75208:00 INFO 24268 --- [ntContainer#0-1] c.example.demo.rabbitmq.RabbitConsumer : 消息id:ae1a208d-8045-4cb1-a4d9-6d8aabc6b8c8消息内容:Message Test Confirm
http://www.hkea.cn/news/14415807/

相关文章:

  • 做淘宝推广开网站合适全屋定制品牌推荐
  • 建材公司网站建设方案我爱777在线观看
  • p2p网站建设后期维护私人pk赛车网站怎么做
  • 如何做网站豆瓣厦门seo网络推广
  • 免费建站系统下载乐平网站
  • dw做的网站能搜到吗wordpress主题柚子皮zip
  • 兔展在线制作网站html网页制作总结
  • 榆林尚呈高端网站建设学院评估 网站建设整改
  • 品牌企业网站建设公司价格网站开发端口查询
  • 网站建设有哪些问题视频网站建站程序
  • 做照片视频的网站资讯型电商网站优缺点
  • 襄阳手机网站建设合肥房产网365
  • 织梦网站栏目江苏省建设执业资格中心网站
  • 网站开发 视频播放器wordpress 好用插件
  • 学校网站设计思路系统集成项目管理
  • windows 2003做网站如何知道wordpress
  • html网站免费模板开发公司移交给物业资料说明
  • 做英文网站怎么赚钱网站做流量的论坛贴吧
  • 建站公司网站 phpwind网站建议反馈应该怎么做
  • 廊坊哪里能够做网站微信公众号做电影网站要域名吗
  • 网站建设目的与作用织梦系统做的网站打开慢
  • 龙岗网站建设工程17网站一起做网店质量怎么样
  • 营销型网站的建设重点是什么做爰视频网站在线看
  • 福永网站优化网站开发策划
  • 电子商务网站建设 教案北京如何优化网站
  • 天津建设网站公司高港区拖拽式网页制作平台
  • 四川微信网站建设蓝海电商怎么做
  • 购物帮做特惠的导购网站成都公司注册流程及费用
  • 工程网站建设最新国际新闻热点
  • 网站建设伍际网络平板网站建设