电商自建站,上海市工程质量建设管理协会网站,开源企业网站系统,网站建设和微信小程序RabbitMQ发送者重连、发送者确认
一、发送者重连
spring:rabbitmq:connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled: true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次的等待时长倍数#xff0c;下次等…RabbitMQ发送者重连、发送者确认
一、发送者重连
spring:rabbitmq:connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled: true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次的等待时长倍数下次等待时长initial-interva * multipliermax-attempts: 3 #最大重试次数当网络不稳定的时候利用重试机制可以有效提高消息发送成功的的成功率不过SpringAMQP提供的重试机制是阻塞式的重试也就是说多次重试等待的过程中当前线程是被阻塞的会影响业务性能。 如果对于业务性能有要求建议禁用重试机制。如果一定要用请合理配置等待时长和重试次数当然也可考虑使用异步线程来执行发送消息的代码。 二、发送者确认 SpringAMQP提供了Publisher Confirm和 Publisher Return两种确认机制。开启确认机制以后当发送者发送消息给MQ后MQ会返回确认结果给发送者。返回的结果有以下几种情况
消息投递到MQ但是路由失败。此时会通过PublisherReturn返回路由异常原因然后返回告知投递成功。临时消息投递到了并且入队成功返回告知投递成功持久消息投递到了并且入队完成持久化返回告知投递成功其他情况都会返回告知投递失败。
.在这个微服务的中添加配置
spring:rabbitmq:publisher-confirm-type: correlated #开启publisher confirm机制并设置confirm类型publisher-returns: true #开启publisher return机制配置说明这里有三种模式可选
关闭机制同步阻塞等待的回执消息异步回调方式返回回执消息
2.ReturnCallback:每个RabbitTemplate只能配置一个ReturnCallback因此需要在项目启动过程中配置
Slf4j
Configuration
RequiredArgsConstructor
public class MqConfig {private final RabbitTemplate rabbitTemplate;PostConstructpublic void init() {rabbitTemplate.setReturnsCallback(returnedMessage - {log.error(监听到消息return callback);log.debug(exchange: {}, returnedMessage.getExchange());log.debug(routingKey: {}, returnedMessage.getRoutingKey());log.debug(message: {}, returnedMessage.getMessage());log.debug(replyCode: {}, returnedMessage.getReplyCode());log.debug(replyText: {}, returnedMessage.getReplyText());});}
上面简化的是这部分内容rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {//TODO}});
3.ConfirmCallback发送消息指定消息ID消息ConfirmCallback
Testpublic void testConfirmCallback() throws InterruptedException {//创建correlationDataCorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());correlationData.getFuture().addCallback(new ListenableFutureCallbackCorrelationData.Confirm() {Overridepublic void onFailure(Throwable ex) {log.error(spring amqp 处理确认结果异常, ex);}Overridepublic void onSuccess(CorrelationData.Confirm result) {if (result.isAck()) {log.debug(收到ConfirmCallback ack,消息发送成功!!!);} else {log.error(收到ConfirmCallback nack,消息发送失败reason{}, result.getReason());}}});//交换机名称String exChangeNange hmall.topic;//消息String message TopicTopicTopic ;//发送消息rabbitTemplate.convertAndSend(exChangeNange, hina.new, message, correlationData);Thread.sleep(3000);}