你的网站正在建设中,互动营销策略,丽江网架公司,陕西网络公司目录 一、消息不丢失1.消息确认2.消息确认业务封装2.1 发送确认消息测试2.2 消息发送失败#xff0c;设置重发机制 一、消息不丢失
消息的不丢失#xff0c;在MQ角度考虑#xff0c;一般有三种途径#xff1a; 1#xff0c;生产者不丢数据 2#xff0c;MQ服务器不丢数据… 目录 一、消息不丢失1.消息确认2.消息确认业务封装2.1 发送确认消息测试2.2 消息发送失败设置重发机制 一、消息不丢失
消息的不丢失在MQ角度考虑一般有三种途径 1生产者不丢数据 2MQ服务器不丢数据 3消费者不丢数据 保证消息不丢失有两种实现方式 1开启事务模式 2消息确认模式 说明开启事务会大幅降低消息发送及接收效率使用的相对较少因此我们生产环境一般都采取消息确认模式以下我们只是讲解消息确认模式
1.消息确认
消息持久化 如果希望RabbitMQ重启之后消息不丢失那么需要对以下3种实体均配置持久化 Exchange 声明exchange时设置持久化durable true并且不自动删除(autoDelete false) Queue 声明queue时设置持久化durable true并且不自动删除(autoDelete false) message 发送消息时通过设置deliveryMode2持久化消息
处理消息队列丢数据的情况一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用你可以在消息持久化磁盘后再给生产者发送一个Ack信号。这样如果消息持久化磁盘之前rabbitMQ阵亡了那么生产者收不到Ack信号生产者会自动重发。那么如何持久化呢其实也很容易就下面两步
1、将queue的持久化标识durable设置为true,则代表是一个持久的队列
2、发送消息的时候将deliveryMode2
这样设置以后rabbitMQ就算挂了重启后也能恢复数据发送确认 有时业务处理成功消息也发了但是我们并不知道消息是否成功到达了rabbitmq如果由于网络等原因导致业务成功而消息发送失败那么发送方将出现不一致的问题此时可以使用rabbitmq的发送确认功能即要求rabbitmq显式告知我们消息是否已成功发送。
手动消费确认 有时消息被正确投递到消费方但是消费方处理失败那么便会出现消费方的不一致问题。比如:订单已创建的消息发送到用户积分子系统中用于增加用户积分但是积分消费方处理却都失败了用户就会问我购买了东西为什么积分并没有增加呢 要解决这个问题需要引入消费方确认即只有消息被成功处理之后才告知rabbitmq以ack否则告知rabbitmq以nack
2.消息确认业务封装
service-mq修改配置 开启rabbitmq消息确认配置,在common的配置文件中都已经配置好了
spring:rabbitmq:host: 192.168.121.140port: 5672username: adminpassword: adminpublisher-confirms-type: correlated #交换机的确认publisher-returns: true #队列的确认listener:simple:acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的如果要手动确认消息则需要修改确认模式为manualprefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为轮询分发设置为1为公平分发搭建rabbit-util模块 由于消息队列是公共模块我们把mq的相关业务封装到该模块其他service微服务模块都可能使用因此我们把他封装到一个单独的模块需要使用mq的模块直接引用该模块即可 搭建方式如 pom.xml dependencies!--rabbitmq消息队列--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependency!--rabbitmq 协议--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-bus-amqp/artifactId/dependency/dependencies4.2.4 封装发送端消息确认
/*** Description 消息发送确认* p* ConfirmCallback 只确认消息是否正确到达 Exchange 中* ReturnCallback 消息没有正确到达队列时触发回调如果正确到达队列不执行* p* 1. 如果消息没有到exchange,则confirm回调,ackfalse* 2. 如果消息到达exchange,则confirm回调,acktrue* 3. exchange到queue成功,则不回调return* 4. exchange到queue失败,则回调return* */
Component
Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {Autowiredprivate RabbitTemplate rabbitTemplate;// 修饰一个非静态的void方法,在服务器加载Servlet的时候运行并且只会被服务器执行一次在构造函数之后执行init方法之前执行。PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallbackrabbitTemplate.setReturnCallback(this); //指定 ReturnCallback}Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info(消息发送成功 JSON.toJSONString(correlationData));} else {log.info(消息发送失败 cause 数据 JSON.toJSONString(correlationData));}}Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {// 反序列化对象输出System.out.println(消息主体: new String(message.getBody()));System.out.println(应答码: replyCode);System.out.println(描述 replyText);System.out.println(消息使用的交换器 exchange : exchange);System.out.println(消息使用的路由键 routing : routingKey);}}封装消息发送
Service
public class RabbitService {Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息* param exchange 交换机* param routingKey 路由键* param message 消息*/public boolean sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);return true;}}2.1 发送确认消息测试
消息发送端
RestController
RequestMapping(/mq)
public class MqController {Autowiredprivate RabbitService rabbitService;/*** 消息发送*///http://localhost:8282/mq/sendConfirmGetMapping(sendConfirm)public Result sendConfirm() {rabbitService.sendMessage(exchange.confirm, routing.confirm, 来人了开始接客吧);return Result.ok();}
}消息接收端
Component
public class ConfirmReceiver {SneakyThrows
RabbitListener(bindingsQueueBinding(value Queue(value queue.confirm,autoDelete false),exchange Exchange(value exchange.confirm,autoDelete true),key {routing.confirm}))
public void process(Message message, Channel channel){System.out.println(RabbitListener:new String(message.getBody()));// false 确认一个消息true 批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}测试http://localhost:8282/mq/sendConfirm
2.2 消息发送失败设置重发机制
实现思路借助redis来实现重发机制 模块中添加依赖
!-- redis --
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId
/dependency!-- spring2.X集成redis所需common-pool2--
dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-pool2/artifactId
/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactId
/dependency自定义一个实体类来接收消息
Data
public class GmallCorrelationData extends CorrelationData {// 消息主体private Object message;// 交换机private String exchange;// 路由键private String routingKey;// 重试次数private int retryCount 0;// 消息类型 是否是延迟消息private boolean isDelay false;// 延迟时间private int delayTime 10;
}
修改发送方法
// 封装一个发送消息的方法
public Boolean sendMsg(String exchange,String routingKey, Object msg){// 将发送的消息 赋值到 自定义的实体类GmallCorrelationData gmallCorrelationData new GmallCorrelationData();// 声明一个correlationId的变量String correlationId UUID.randomUUID().toString().replaceAll(-,);gmallCorrelationData.setId(correlationId);gmallCorrelationData.setExchange(exchange);gmallCorrelationData.setRoutingKey(routingKey);gmallCorrelationData.setMessage(msg);// 发送消息的时候将这个gmallCorrelationData 对象放入缓存。redisTemplate.opsForValue().set(correlationId, JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);// 调用发送消息方法//this.rabbitTemplate.convertAndSend(exchange,routingKey,msg);this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,gmallCorrelationData);// 默认返回truereturn true;
}发送失败调用重发方法 MQProducerAckConfig 类中修改
Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {// ack true 说明消息正确发送到了交换机if (ack){System.out.println(哥们你来了.);log.info(消息发送到了交换机);}else {// 消息没有到交换机log.info(消息没发送到交换机);// 调用重试发送方法this.retrySendMsg(correlationData);}
}Override
public void returnedMessage(Message message, int code, String codeText, String exchange, String routingKey) {System.out.println(消息主体: new String(message.getBody()));System.out.println(应答码: code);System.out.println(描述 codeText);System.out.println(消息使用的交换器 exchange : exchange);System.out.println(消息使用的路由键 routing : routingKey);// 获取这个CorrelationData对象的Id spring_returned_message_correlationString correlationDataId (String) message.getMessageProperties().getHeaders().get(spring_returned_message_correlation);// 因为在发送消息的时候已经将数据存储到缓存通过 correlationDataId 来获取缓存的数据String strJson (String) this.redisTemplate.opsForValue().get(correlationDataId);// 消息没有到队列的时候则会调用重试发送方法GmallCorrelationData gmallCorrelationData JSON.parseObject(strJson,GmallCorrelationData.class);// 调用方法 gmallCorrelationData 这对象中至少的有交换机路由键消息等内容.this.retrySendMsg(gmallCorrelationData);
}/*** 重试发送方法* param correlationData 父类对象 它下面还有个子类对象 GmallCorrelationData*/
private void retrySendMsg(CorrelationData correlationData) {// 数据类型转换 统一转换为子类处理GmallCorrelationData gmallCorrelationData (GmallCorrelationData) correlationData;// 获取到重试次数 初始值 0int retryCount gmallCorrelationData.getRetryCount();// 判断if (retryCount3){// 不需要重试了log.error(重试次数已到发送消息失败:JSON.toJSONString(gmallCorrelationData));} else {// 变量更新retryCount1;// 重新赋值重试次数 第一次重试 0-1 1-2 2-3gmallCorrelationData.setRetryCount(retryCount);System.out.println(重试次数\tretryCount);// 更新缓存中的数据this.redisTemplate.opsForValue().set(gmallCorrelationData.getId(),JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);// 调用发送消息方法 表示发送普通消息 发送消息的时候不能调用 new RabbitService().sendMsg() 这个方法this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);}
}测试:只需修改(错误信息)