惠州惠阳网站建设,微信怎么建设网站,wordpress自豪地,旅游交友的网站建设目录
1.消息可靠性
1.1.生产者消息确认
1.1.1.修改配置
1.1.2.定义Return回调
1.1.3.定义ConfirmCallback
1.2.消息持久化
1.2.1.交换机持久化
1.2.2.队列持久化
1.2.3.消息持久化
1.3.消费者消息确认
1.3.1.演示none模式
1.3.2.演示auto模式
1.4.消费失败重试机制…目录
1.消息可靠性
1.1.生产者消息确认
1.1.1.修改配置
1.1.2.定义Return回调
1.1.3.定义ConfirmCallback
1.2.消息持久化
1.2.1.交换机持久化
1.2.2.队列持久化
1.2.3.消息持久化
1.3.消费者消息确认
1.3.1.演示none模式
1.3.2.演示auto模式
1.4.消费失败重试机制
1.4.1.本地重试
1.4.2.失败策略
1.5.总结
1.消息可靠性
消息从发送到消费者接收会经理多个过程 其中的每一步都可能导致消息丢失常见的丢失原因包括 发送时丢失 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机queue将消息丢失 consumer接收到消息后未消费就宕机
针对这些问题RabbitMQ分别给出了解决方案 生产者确认机制 mq持久化 消费者确认机制 失败重试机制
下面我们就通过案例来演示每一个步骤。
首先导入课前资料提供的demo工程 项目结构如下 1.1.生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后会返回一个结果给发送者表示消息是否处理成功。
返回结果有两种方式 publisher-confirm发送者确认 消息成功投递到交换机返回ack 消息未投递到交换机返回nack publisher-return发送者回执 消息投递到交换机了但是没有路由到队列。返回ACK及路由失败原因。 注意 1.1.1.修改配置
首先修改publisher服务中的application.yml文件添加下面的内容
spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true
说明 publish-confirm-type开启publisher-confirm这里支持两种类型 simple同步等待confirm结果直到超时 correlated异步回调定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallback publish-returns开启publish-return功能同样是基于callback机制不过是定义ReturnCallback template.mandatory定义消息路由失败时的策略。true则调用ReturnCallbackfalse则直接丢弃消息
1.1.2.定义Return回调
每个RabbitTemplate只能配置一个ReturnCallback因此需要在项目加载时配置
修改publisher服务添加一个
package cn.itcast.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;Slf4j
Configuration
public class CommonConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {// 投递失败记录日志log.info(消息发送失败应答码{}原因{}交换机{}路由键{},消息{},replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要可以重发消息});}
}
1.1.3.定义ConfirmCallback
ConfirmCallback可以在发送消息时指定因为每个业务处理confirm成功或失败的逻辑不一定相同。
在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中定义一个单元测试方法
public void testSendMessage2SimpleQueue() throws InterruptedException {// 1.消息体String message hello, spring amqp!;// 2.全局唯一的消息ID需要封装到CorrelationData中CorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(result - {if(result.isAck()){// 3.1.ack消息成功log.debug(消息发送成功, ID:{}, correlationData.getId());}else{// 3.2.nack消息失败log.error(消息发送失败, ID:{}, 原因{},correlationData.getId(), result.getReason());}},ex - log.error(消息发送异常, ID:{}, 原因{},correlationData.getId(),ex.getMessage()));// 4.发送消息rabbitTemplate.convertAndSend(task.direct, task, message, correlationData);// 休眠一会儿等待ack回执Thread.sleep(2000);
}
1.2.消息持久化
生产者确认可以确保消息投递到RabbitMQ的队列中但是消息发送到RabbitMQ以后如果突然宕机也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存必须开启消息持久化机制。 交换机持久化 队列持久化 消息持久化
1.2.1.交换机持久化
RabbitMQ中交换机默认是非持久化的mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化
Bean
public DirectExchange simpleExchange(){// 三个参数交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange(simple.direct, true, false);
}
事实上默认情况下由SpringAMQP声明的交换机都是持久化的。
可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示 1.2.2.队列持久化
RabbitMQ中队列默认是非持久化的mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化
Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列durable就是持久化的return QueueBuilder.durable(simple.queue).build();
}
事实上默认情况下由SpringAMQP声明的队列都是持久化的。
可以在RabbitMQ控制台看到持久化的队列都会带上D的标示 1.2.3.消息持久化
利用SpringAMQP发送消息时可以设置消息的属性MessageProperties指定delivery-mode 1非持久化 2持久化
用java代码指定 默认情况下SpringAMQP发出的任何消息都是持久化的不用特意指定。
1.3.消费者消息确认
RabbitMQ是阅后即焚机制RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的消费者获取消息后应该向RabbitMQ发送ACK回执表明自己已经处理消息。
设想这样的场景 1RabbitMQ投递消息给消费者 2消费者获取消息后返回ACK给RabbitMQ 3RabbitMQ删除消息 4消费者宕机消息尚未处理
这样消息就丢失了。因此消费者返回ACK的时机非常重要。
而SpringAMQP则允许配置三种确认模式
•manual手动ack需要在业务代码结束后调用api发送ack。
•auto自动ack由spring监测listener代码是否出现异常没有异常则返回ack抛出异常则返回nack
•none关闭ackMQ假定消费者获取消息后会成功处理因此消息投递后立即被删除
由此可知 none模式下消息投递是不可靠的可能丢失 auto模式类似事务机制出现异常时返回nack消息回滚到mq没有异常返回ack manual自己根据业务情况判断什么时候该ack
一般我们都是使用默认的auto即可。
1.3.1.演示none模式
修改consumer服务的application.yml文件添加下面内容
spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack
修改consumer服务的SpringRabbitListener类中的方法模拟一个消息处理异常
RabbitListener(queues simple.queue)
public void listenSimpleQueue(String msg) {log.info(消费者接收到simple.queue的消息【{}】, msg);// 模拟异常System.out.println(1 / 0);log.debug(消息处理完成);
}
测试可以发现当消息处理抛异常时消息依然被RabbitMQ删除了。
1.3.2.演示auto模式
再次把确认机制修改为auto:
spring:rabbitmq:listener:simple:acknowledge-mode: auto # 关闭ack
在异常位置打断点再次发送消息程序卡在断点时可以发现此时消息状态为unack未确定状态
抛出异常后因为Spring会自动返回nack所以消息恢复至Ready状态并且没有被RabbitMQ删除 1.4.消费失败重试机制
当消费者出现异常后消息会不断requeue重入队到队列再重新发送给消费者然后再次异常再次requeue无限循环导致mq的消息处理飙升带来不必要的压力 怎么办呢
1.4.1.本地重试
我们可以利用Spring的retry机制在消费者出现异常时利用本地重试而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件添加内容
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数下次等待时长 multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态false有状态。如果业务中包含事务这里改为false
重启consumer服务重复之前的测试。可以发现 在重试3次后SpringAMQP会抛出异常AmqpRejectAndDontRequeueException说明本地重试触发了 查看RabbitMQ控制台发现消息被删除了说明最后SpringAMQP返回的是ackmq删除消息了
结论 开启本地重试时消息处理过程中抛出异常不会requeue到队列而是在消费者本地重试 重试达到最大次数后Spring会返回ack消息会被丢弃
1.4.2.失败策略
在之前的测试中达到最大重试次数后消息会被丢弃这是由Spring内部机制决定的。
在开启重试模式后重试次数耗尽如果消息依然失败则需要有MessageRecovery接口来处理它包含三种不同的实现 RejectAndDontRequeueRecoverer重试耗尽后直接reject丢弃消息。默认就是这种方式 ImmediateRequeueMessageRecoverer重试耗尽后返回nack消息重新入队 RepublishMessageRecoverer重试耗尽后将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer失败后将消息投递到一个指定的专门存放异常消息的队列后续由人工集中处理。 1在consumer服务中定义处理失败消息的交换机和队列
Bean
public DirectExchange errorMessageExchange(){return new DirectExchange(error.direct);
}
Bean
public Queue errorQueue(){return new Queue(error.queue, true);
}
Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(error);
}
2定义一个RepublishMessageRecoverer关联队列和交换机
Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error);
}
完整代码
package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;Configuration
public class ErrorMessageConfig {Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange(error.direct);}Beanpublic Queue errorQueue(){return new Queue(error.queue, true);}Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(error);}Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error);}
}
1.5.总结
如何确保RabbitMQ消息的可靠性 开启生产者确认机制确保生产者的消息能到达队列 开启持久化功能确保消息未消费前在队列中不会丢失 开启消费者确认机制为auto由spring确认消息处理成功后完成ack 开启消费者失败重试机制并设置MessageRecoverer多次重试失败后将消息投递到异常交换机交由人工处理