蓝色 宽屏 网站 模板,模板网站建设制作,中国免费网站服务器主机域名,美的集团网站建设RabbitMQ 消息确认机制
本文总结了RabbitMQ消息发送过程中的一些代码片段#xff0c;详细分析了回调函数和发布确认机制的实现#xff0c;以提高消息传递的可靠性。 返回回调机制的代码分析
主要用途
这个代码主要用于设置RabbitMQ消息发送过程中的回调函数#xff0c;即…RabbitMQ 消息确认机制
本文总结了RabbitMQ消息发送过程中的一些代码片段详细分析了回调函数和发布确认机制的实现以提高消息传递的可靠性。 返回回调机制的代码分析
主要用途
这个代码主要用于设置RabbitMQ消息发送过程中的回调函数即在消息不能被成功投递到目标队列时的返回回调处理。具体来说它使用RabbitTemplate对象来设置ReturnsCallback回调函数这个回调函数可以帮助开发者在消息无法正确路由到队列时获得相关的反馈信息。
代码详细分析
类与注解
Slf4j用来生成一个用于日志记录的log对象方便日志输出。AllArgsConstructor自动生成全参构造器使得可以方便地初始化类的所有字段。Configuration表明这是一个Spring配置类用于定义一些Bean。
RabbitTemplate的初始化
private final RabbitTemplate rabbitTemplate;这是一个RabbitTemplate对象用于与RabbitMQ进行交互操作。PostConstruct标注的方法将在所有依赖项注入完成后自动调用通常用于初始化逻辑。
init() 方法
rabbitTemplate.setReturnsCallback(...)设置一个回调函数当消息不能正确路由时会触发这个回调。new RabbitTemplate.ReturnsCallback()匿名内部类用于定义ReturnsCallback的逻辑。public void returnedMessage(ReturnedMessage returned)这是实现ReturnsCallback接口中的方法它负责处理返回的消息。
returnedMessage() 方法中的具体逻辑
log.error(触发return callback,)表示触发了返回回调记录一条错误日志。log.debug(...)输出了消息在返回时的各种属性包括 exchange消息发送到的交换机名称。routingKey用于路由消息的路由键。message具体的消息内容。replyCode 和 replyText返回码及返回的文本表明返回的原因。
方法执行的时机
当RabbitMQ消息没有成功被路由到指定的队列时这个回调函数将被调用。典型的场景是队列不存在、交换机配置错误或路由键不匹配等情况。这种情况下RabbitMQ会触发返回回调通过returnedMessage()方法通知应用程序该消息未能被成功投递。
代码示例
Slf4j
AllArgsConstructor
Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;PostConstructpublic void init() {rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {Overridepublic void returnedMessage(ReturnedMessage returned) {log.error(触发return callback,);log.debug(exchange: {}, returned.getExchange());log.debug(routingKey: {}, returned.getRoutingKey());log.debug(message: {}, returned.getMessage());log.debug(replyCode: {}, returned.getReplyCode());log.debug(replyText: {}, returned.getReplyText());}});}
}代码的用途
当RabbitMQ消息没有成功被路由到指定的队列时这个回调函数将被调用。典型的场景是队列不存在、交换机配置错误或路由键不匹配等情况。通过这些日志输出开发人员可以方便地定位问题所在了解为什么消息未能成功发送到队列中。这个回调可以提高系统的健壮性确保消息发送的可靠性如果消息丢失或出现其他问题可以通过日志及时发现并进行处理。
总结这个代码主要用于设置RabbitMQ的返回回调机制以便在消息发送失败时可以及时进行处理和日志记录。它是消息可靠传输的一部分尤其适用于需要高可靠性的消息传递场景。 发布确认机制的代码分析
主要用途
这个代码演示了如何在RabbitMQ中使用**发布确认Publisher Confirm**的机制确保消息成功到达交换机并获得确认ack。通过这种机制应用程序可以在消息发送到交换机后获得反馈确认是否发送成功从而提高消息传输的可靠性。
代码详细分析
方法注解与声明
Test表示这是一个测试方法通常用JUnit等测试框架来执行测试消息发送的过程。void testPublisherConfirm()方法名表明该方法用于测试发布者的确认Publisher Confirm。
创建CorrelationData对象
CorrelationData cd new CorrelationData(); CorrelationData对象用于追踪每个消息的唯一标识。在发布消息时相关的确认信息与该对象进行关联。通过该对象可以区分出每条消息的确认状态。
设置ConfirmCallback回调 cd.getFuture().addCallback(new ListenableFutureCallbackCorrelationData.Confirm() {...}) 使用了一个ListenableFutureCallback来监听消息的确认结果。addCallback()方法用于指定回调逻辑当消息被确认或失败时执行对应的逻辑。 onFailure(Throwable ex) 如果消息确认过程中发生异常则会进入onFailure()方法。log.error(handle message ack fail, ex)记录一条错误日志表明消息的确认失败并输出具体异常信息。 onSuccess(CorrelationData.Confirm result) 如果消息成功被确认则会进入onSuccess()方法。result.isAck()判断确认结果是否为ack即成功确认。如果isAck()返回true则记录日志表示消息成功接收。如果返回false则表示消息未被确认即nack需要处理相应的失败情况记录错误日志并输出原因。
发送消息
rabbitTemplate.convertAndSend(hmall.direct, red1, hello, cd) rabbitTemplate是用于发送消息的模板对象。convertAndSend()方法用于发送消息参数分别为 hmall.direct交换机名称。red1路由键。hello消息内容。cd之前创建的CorrelationData对象用于追踪该消息的状态。
方法执行的时机
onFailure(Throwable ex)方法会在消息确认过程中出现异常时被调用例如由于网络中断或RabbitMQ服务器不可用等情况导致消息无法被正确发送或确认。onSuccess(CorrelationData.Confirm result)方法会在RabbitMQ成功处理消息时被调用返回确认结果ack或者未确认结果nack。如果交换机成功接收了消息但队列出现问题则可能返回nack。
代码示例
Test
void testPublisherConfirm() throws InterruptedException {// 1. 创建CorrelationDataCorrelationData cd new CorrelationData();// 2. 给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallbackCorrelationData.Confirm() {Overridepublic void onFailure(Throwable ex) {// 2.1. Future发生异常时的处理逻辑基本不会触发log.error(handle message ack fail, ex);}Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2. Future接收到确认的处理逻辑参数中的result就是回执内容if(result.isAck()){ // result.isAck()boolean类型true代表ack回执false 代表 nack回执log.debug(发送消息成功, 收到 ack!);} else {log.error(发送消息失败, 收到 nack, reason: {}, result.getReason());}}});// 3. 发送消息rabbitTemplate.convertAndSend(hmall.direct, red1, hello, cd);
}发布确认机制在MQ中的作用
消息的可靠性传输
发布者确认Publisher Confirm机制确保消息从生产者成功发送到交换机并获得交换机的确认。通过这种方式可以保证消息不会因为网络故障或者交换机问题而丢失。
失败处理
通过在onFailure()方法中记录异常可以帮助开发者了解在发送过程中出现的错误。在onSuccess()中处理ack和nack使得开发者可以及时知道消息是否被交换机接收并正确处理。
提高系统健壮性
确保消息传递的可靠性当消息未被正确确认例如nack时可以及时记录日志或者进行补救措施例如重新发送消息。特别适用于金融系统、电商订单等需要高可靠性的场景。
总结这个代码演示了在RabbitMQ中使用发布者确认机制以提高消息传递的可靠性。通过监听消息的确认结果可以确保消息是否成功到达交换机并在失败的情况下做出适当的处理。这种机制非常适用于需要高可靠性消息传递的系统以确保消息不会丢失。 发布者确认和返回机制总结 Spring AMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启确认机制后当发送者发送消息给MQ后MQ会返回确认结果给发送者。返回的结果有以下几种情况
消息投递到了MQ但路由失败。此时会通过PublisherReturn返回路由异常原因然后返回ACK告知投递成功。临时消息投递到了MQ并且入队成功返回ACK告知投递成功。持久消息投递到了MQ并且入队完成持久化返回ACK告知投递成功。其它情况都会返回NACK告知投递失败。
通过以上机制消息的可靠性得到进一步保证。结合代码和确认机制开发者可以更加精细地控制消息的生命周期和状态以确保业务逻辑的健壮性和可靠性。