网站建设报价清单,网站建设栏目层级,如何本地搭建自己的网站,seo方式包括生产者没有成功把消息发送到MQ 丢失的原因 #xff1a;因为网络传输的不稳定性#xff0c;当生产者在向MQ发送消息的过程中#xff0c;MQ没有成功接收到消息#xff0c;但是生产者却以为MQ成功接收到了消息#xff0c;不会再次重复发送该消息#xff0c;从而导致消息的丢… 生产者没有成功把消息发送到MQ 丢失的原因 因为网络传输的不稳定性当生产者在向MQ发送消息的过程中MQ没有成功接收到消息但是生产者却以为MQ成功接收到了消息不会再次重复发送该消息从而导致消息的丢失。
解决办法 有两个解决办法事务机制和confirm机制最常用的是confirm机制发布确认机制。
注意 RabbitMQ的事务机制是同步的很耗型能会降低RabbitMQ的吞吐量。 confirm机制是异步的生成者发送完一个消息之后不需要等待RabbitMQ的回调就可以发送下一个消息当RabbitMQ成功接收到消息之后会自动异步的回调生产者的一个接口返回成功与否的消息。
两个机制说明如下:
confirm发布确认机制 解释RabbitMQ可以开启 confirm 模式在生产者那里设置开启 confirm 模式之后生产者每次写的消息都会分配一个唯一的 id如果消息成功写入 RabbitMQ 中RabbitMQ 会给生产者回传一个 ack 消息告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息会回调你的一个 nack 接口告诉你这个消息接收失败生产者可以重新发送。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态如果超过一定时间还没接收到这个消息的回调那么可以重发。
代码 yml配置 ----------------------------------------------------------------------------------------------------
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* 交换机回滚
*/
Component
Slf4j
public class ExchangeCallback implements RabbitTemplate.ConfirmCallback{/* correlationData 内含消息内容* ack 交换机接受成功或者失败。 true表示交换机接受消息成功 false表示交换机接受失败* cause 表示失败原因*/Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println(hello world);String id correlationData.getId();String message new String(correlationData.getReturnedMessage().getBody());if (ack){log.info(交换机收到消息id为{}, 消息内容为{}, id, message);}else {log.info(交换机未收到消息id为{}, 消息内容为{}, 原因为{}, id, message, cause);}}
}
----------------------------------------队列防止消息丢失----------------------------------------------------------------
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 队列防止消息丢失*/
Slf4j
Component
public class QueueCallback implements RabbitTemplate.ReturnCallback{Overridepublic void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {log.info(消息 {} 经交换机 {} 通过routingKey{} 路由到队列失败失败code为{} 失败原因为{},new String(message.getBody()), exchange, routingKey, replyCode, replyText);}
}
--------------------------引用-controller-----------------------------------------------
//交换机回滚
Autowired
private ExchangeCallback exchangeCallback;
//队列回滚
Autowired
private QueueCallback queueCallback;
/*** 初始化交换机监听*/
PostConstruct
public void init(){
//交换机
rabbitTemplate.setConfirmCallback(exchangeCallback);
/*** true交换机无法将消息进行路由时会将该消息返回给生产者* false如果发现消息无法进行路由则直接丢弃*/
rabbitTemplate.setMandatory(true);
//队列
rabbitTemplate.setReturnCallback(queueCallback);
}
/*** 发送消息* 结果:这是一条消息*/GetMapping(/sendMessageTest)public String sendMessageTest(){// 消息类型为object 发送对象也是可以的String msg 这是一条消息;// 第一个参数为发送消息到那个交换机上第二个是发送的路由键交换机进行需要符合绑定的队列第三个参数为发送的消息
//CommonUtils.dirExchange--自己的交换机名称
//CommonUtils.routingKey --路由Key值 rabbitTemplate.convertAndSend(1235,CommonUtils.routingKey,msg);System.out.println(消息发送成功:msg);return 发送成功;发送内容为:msg;}
运行结果: