西安免费做网站价格,网络优化器下载,杭州网站建设开发有限公司,wordpress 扁平化生产者重连 消费者重试 Confirm模式简介
消息的confirm确认机制#xff0c;是指生产者投递消息后#xff0c;到达了消息服务器Broker里面的exchange交换机#xff0c;则会给生产者一个应答#xff0c;生产者接收到应答#xff0c;用来确定这条消息是否正常的发送到Broker…生产者重连 消费者重试 Confirm模式简介
消息的confirm确认机制是指生产者投递消息后到达了消息服务器Broker里面的exchange交换机则会给生产者一个应答生产者接收到应答用来确定这条消息是否正常的发送到Broker的exchange中这也是消息可靠性投递的重要保障 具体代码设置 yml
server:port: 8080spring:application:name: confirm-learn1rabbitmq:host: 192.168.126.130port: 5672username: adminpassword: 123456virtual-host: powernodepublisher-confirm-type: correlated # 开启生产者的确认模式设置关联模式my:exchangeName: exchange.confirm.1queueName: queue.confirm.1 配置类
package com.powernode.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitConfig {Value(${my.exchangeName})private String exchangeName;Value(${my.queueName})private String queueName;Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(exchangeName).build();}Beanpublic Queue queue(){return QueueBuilder.durable(queueName).build();}Beanpublic Binding binding(DirectExchange directExchange,Queue queue){return BindingBuilder.bind(queue).to(directExchange).with(info);}}写法一 配置回调类 package com.powernode.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;Component
Slf4j
public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info(关联id为{},correlationData.getId());if (ack){log.info(消息正确的达到交换机);return;}//ack false 没有到达交换机log.error(消息没有到达交换机原因为{},cause);}
}发送消息类
package com.powernode.service;import com.powernode.config.MyConfirmCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;Service
Slf4j
public class MessageService {Resourceprivate RabbitTemplate rabbitTemplate;Resourceprivate MyConfirmCallBack confirmCallBack;PostConstruct //构造方法后执行它相当于初始化作用public void init(){rabbitTemplate.setConfirmCallback(confirmCallBack);}public void sendMsg(){Message message MessageBuilder.withBody(hello world.getBytes()).build();CorrelationData correlationDatanew CorrelationData(); //关联数据correlationData.setId(order_123456); //发送订单信息rabbitTemplate.convertAndSend(exchange.confirm.1,info,message,correlationData);log.info(消息发送完毕发送时间为{},new Date());}
}启动类
package com.powernode;import com.powernode.service.MessageService;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.annotation.Resource;SpringBootApplication
public class Application implements ApplicationRunner {Resourceprivate MessageService messageService;public static void main(String[] args) {SpringApplication.run(Application.class, args);}Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}
} 方法二
利用lambda 可以省掉配置回调类
package com.powernode.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;Service
Slf4j
public class MessageService{Resourceprivate RabbitTemplate rabbitTemplate;PostConstruct //构造方法后执行它相当于初始化作用public void init(){rabbitTemplate.setConfirmCallback(//lambda 表达式(correlationData, ack, cause)-{log.info(关联id为{},correlationData.getId());if (ack){log.info(消息正确的达到交换机);return;}//ack false 没有到达交换机log.error(消息没有到达交换机原因为{},cause);});}public void sendMsg(){Message message MessageBuilder.withBody(hello world.getBytes()).build();CorrelationData correlationDatanew CorrelationData(); //关联数据correlationData.setId(order_123456); //发送订单信息rabbitTemplate.convertAndSend(exchange.confirm.4dddd,info,message,correlationData);log.info(消息发送完毕发送时间为{},new Date());}}RabbitMQ消息Return模式 消息可靠性投递 rabbitmq 整个消息投递的路径为 producer — exchange — queue — consumer 消息从 producer 到 exchange 则会返回一个 confirmCallback 消息从 exchange – queue 投递失败则会返回一个 returnCallback 我们可以利用这两个callback控制消息的可靠性投递 开启 确认模式
使用rabbitTemplate.setConfirmCallback设置回调函数当消息发送到exchange后回调confirm方法。在方法中判断ack如果为true则发送成功如果为false则发送失败需要处理 注意配置文件中开启 退回模式 spring.rabbitmq.publisher-returns: true 使用rabbitTemplate.setReturnCallback设置退回函数当消息从exchange路由到
queue失败后则会将消息退回给producer并执行回调函数returnedMessage yml server:port: 8080spring:application:name: ttl-learn1rabbitmq:host: 192.168.126.130port: 5672username: adminpassword: 123456virtual-host: powernodepublisher-returns: true #开启return模式my:exchangeName: exchange.return.1queueName: queue.return.1
配置类
package com.powernode.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitConfig {Value(${my.exchangeName})private String exchangeName;Value(${my.queueName})private String queueName;Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(exchangeName).build();}Beanpublic Queue queue(){return QueueBuilder.durable(queueName).build();}Beanpublic Binding binding(DirectExchange directExchange,Queue queue){return BindingBuilder.bind(queue).to(directExchange).with(info);}
}方式一
发送消息类
package com.powernode.service;import com.powernode.config.MyReturnCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;Service
Slf4j
public class MessageService {Resourceprivate RabbitTemplate rabbitTemplate;Resourceprivate MyReturnCallBack myReturnCallBack;PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(myReturnCallBack); //设置回调}public void sendMsg(){Message message MessageBuilder.withBody(hello world.getBytes()).build();rabbitTemplate.convertAndSend(exchange.return.1,info1111,message);log.info(消息发送完毕发送时间为{},new Date());}
}回调配置类
package com.powernode.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 当消息从交换机 没有正确地 到达队列则会触发该方法* 如果消息从交换机 正确地 到达队列了那么就不会触发该方法** param returned*/
Component
Slf4j
public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error(消息从交换机没有正确的路由到投递到队列原因为{},returnedMessage.getReplyText());}
}方式二 lambda
package com.powernode.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;Service
Slf4j
public class MessageService{Resourceprivate RabbitTemplate rabbitTemplate;PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(//使用lambda表达式message-{log.error(消息从交换机没有正确的路由到投递到队列原因为{},message.getReplyText());}); //设置回调}public void sendMsg(){Message message MessageBuilder.withBody(hello world.getBytes()).build();rabbitTemplate.convertAndSend(exchange.return.4,info,message);log.info(消息发送完毕发送时间为{},new Date());}}RabbitMQ交换机详细属性
3.1具体参数
1、Name交换机名称就是一个字符串
2、Type交换机类型direct, topic, fanout, headers四种
3、Durability持久化声明交换机是否持久化代表交换机在服务器重启后是否还存在
4、Auto delete是否自动删除曾经有队列绑定到该交换机后来解绑了那就会自动删除该交换机
5、Internal内部使用的如果是yes客户端无法直接发消息到此交换机它只能用于交换机与交换机的绑定。
6、Arguments只有一个取值alternate-exchange表示备用交换机
3.2代码演示
结论1没发消息之前不会创建交换机和对列
结论2发消息后如果交换机不存在才开始创建交换机如果队列不存在则创建新的对列
结论3创建交换机或者队列完成后再重新创建如果修改交换机或队列参数则会报错
406错误inequivalent arg durable for exchange exchange.durability in vhost powernode: received false but current is true, class-id40, method-id10)
结论4设置持久化为false 重启rabbitmq-server则交换机丢失实验durable参数先看下控制台然后重启rabbitmq-server
结论5实验自动删除为 true 从控制台上手动解绑会发现自动删除
3.3 备用交换机
3.3.1 备用交换机使用场景
当消息经过交换器准备路由给队列的时候发现没有对应的队列可以投递信息在rabbitmq中会默认丢弃消息如果我们想要监测哪些消息被投递到没有对应的队列我们可以用备用交换机来实现可以接收备用交换机的消息然后记录日志或发送报警信息。 3.3.2 主要代码和注意事项
备用交换机示例如下
注意备用交换机一般使用fanout交换机
测试时指定一个错误路由
重点普通交换机设置参数绑定到备用交换机 MapString, Object arguments new HashMap(); //指定当前正常的交换机的备用交换机是谁 arguments.put(alternate-exchange, EXCHANGE_ALTERNATE); //DirectExchange(String name, boolean durable, boolean autoDelete, MapString, Object arguments) return new DirectExchange(EXCHANGE, true, false, arguments); //return ExchangeBuilder.directExchange(EXCHANGE).withArguments(args).build();
3.3.3 参考配置代码
yml
server:port: 8080spring:application:name: ttl-learn1rabbitmq:host: 192.168.126.130port: 5672username: adminpassword: 123456virtual-host: powernodemy:exchangeNormalName: exchange.normal.alternate #正常交换机exchangeAlternateName: exchange.alternate.1 #备用交换机queueNormalName: queue.normal.alternate #正常队列queueAlternateName: queue.alternate.1 # 备用队列
配置类
package com.powernode.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitConfig {Value(${my.exchangeNormalName})private String exchangeNormalName;Value(${my.exchangeAlternateName})private String exchangeAlternateName;Value(${my.queueNormalName})private String queueNormalName;Value(${my.queueAlternateName})private String queueAlternateName;Beanpublic DirectExchange normalExchange(){return ExchangeBuilder // 默认为持久化的默认不自动删除.directExchange(exchangeNormalName) // 交换机的名字.alternate(exchangeAlternateName) //设置备用交换机 alternate-exchange.build();}Beanpublic Queue queueNormal(){return QueueBuilder.durable(queueNormalName).build();}Beanpublic Binding binding(DirectExchange normalExchange,Queue queueNormal){return BindingBuilder.bind(queueNormal).to(normalExchange).with(info);}Bean //备用交换机public FanoutExchange alternateExchange(){return ExchangeBuilder.fanoutExchange(exchangeAlternateName).build();}Beanpublic Queue alternateQueue(){return QueueBuilder.durable(queueAlternateName).build();}Beanpublic Binding bindingAlternate(FanoutExchange alternateExchange,Queue alternateQueue){return BindingBuilder.bind(alternateQueue).to(alternateExchange);}
}3.3.4 参考发送消息代码 Service public class MessageService { Resource private RabbitTemplate rabbitTemplate; /** * 发送消息 */ public void sendMessage() { //我们故意写错路由key,由于我们正常交换机设置了备用交换机所以该消息就会进入备用交换机//从而进入备用对列我们可以写一个程序接收备用对列的消息接收到后通知相关人员进行处理//如果正常交换机没有设置备用交换机则该消息会被抛弃。 rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, info1223, hello); System.out.println(消息发送完毕......); } } RabbitMQ队列详细属性
4.1 具体参数
Type队列类型
Name队列名称就是一个字符串随便一个字符串就可以
Durability声明队列是否持久化代表队列在服务器重启后是否还存在
Auto delete 是否自动删除如果为true当没有消费者连接到这个队列的时候队列会自动删除
Exclusiveexclusive属性的队列只对首次声明它的连接可见并且在连接断开时自动删除
基本上不设置它设置成false
Arguments队列的其他属性例如指定DLX死信交换机等
1、x-expiresNumber
当Queue队列在指定的时间未被访问则队列将被自动删除
2、x-message-ttlNumber
发布的消息在队列中存在多长时间后被取消单位毫秒
3、x-overflowString
设置队列溢出行为当达到队列的最大长度时消息会发生什么有效值为Drop Head或Reject Publish
4、x-max-lengthNumber
队列所能容下消息的最大长度当超出长度后新消息将会覆盖最前面的消息类似于Redis的LRU算法 5、 x-single-active-consumer默认为false
激活单一的消费者也就是该队列只能有一个消息者消费消息
6、x-max-length-bytesNumber
限定队列的最大占用空间当超出后也使用类似于Redis的LRU算法
7、x-dead-letter-exchangeString
指定队列关联的死信交换机有时候我们希望当队列的消息达到上限后溢出的消息不会被删除掉而是走到另一个队列中保存起来
8.x-dead-letter-routing-keyString
指定死信交换机的路由键一般和6一起定义
9.x-max-priorityNumber
如果将一个队列加上优先级参数那么该队列为优先级队列
1、给队列加上优先级参数使其成为优先级队列
x-max-priority10【0-255取值范围】
2、给消息加上优先级属性
通过优先级特性将一个队列实现插队消费 MessageProperties messagePropertiesnew MessageProperties();messageProperties.setPriority(8); 10、x-queue-modeString理解下即可
队列类型x-queue-modelazy懒队列在磁盘上尽可能多地保留消息以减少RAM使用如果未设置则队列将保留内存缓存以尽可能快地传递消息
11、x-queue-master-locatorString用的较少不讲
在集群模式下设置队列分配到的主节点位置信息
每个queue都有一个master节点所有对于queue的操作都是事先在master上完成之后再slave上进行相同的操作
每个不同的queue可以坐落在不同的集群节点上这些queue如果配置了镜像队列那么会有1个master和多个slave。
基本上所有的操作都落在master上那么如果这些queues的master都落在个别的服务节点上而其他的节点又很空闲这样就无法做到负载均衡那么势必会影响性能
关于master queue host 的分配有几种策略可以在queue声明的时候使用x-queue-master-locator参数或者在policy上设置queue-master-locator或者直接在rabbitmq的配置文件中定义queue_master_locator有三种可供选择的策略
1min-masters选择master queue数最少的那个服务节点host
2client-local选择与client相连接的那个服务节点host
3random随机分配
4.2 参考代码 Configuration public class RabbitConfig { public static final String EXCHANGE exchange; public static final String QUEUE queue; public static final String KEY info; QueueBuilder builder; Bean public DirectExchange directExchange() { return ExchangeBuilder.directExchange(EXCHANGE).build(); } Bean public Queue queue() { MapString, Object arguments new HashMap(); //arguments.put(x-expires, 5000); //arguments.put(x-max-length, 5); //arguments.put(x-overflow, reject-publish); arguments.put(x-single-active-consumer, false); //TODO ??? //arguments.put(x-max-length-bytes, 20); // 单位是字节 //arguments.put(x-max-priority, 10); // 0-255 //表示把当前声明的这个队列设置成了优先级队列那么该队列它允许消息插队 //将队列设置为延迟模式在磁盘上保留尽可能多的消息以减少RAM内存的使用如果未设置队列将保留内存缓存以尽可能快地传递消息 //有时候我们把这种队列叫惰性队列 //arguments.put(x-queue-mode, lazy); //设置队列版本。默认为版本1。 //版本1有一个基于日志的索引它嵌入了小消息。 //版本2有一个不同的索引可以在许多场景中提高内存使用率和性能并为以前嵌入的消息提供了按队列存储。 //arguments.put(x-queue-version, 2); // x-queue-master-locator在集群模式下设置镜像队列的主节点信息。 //arguments.put(x-queue-master-locator, QueueBuilder.LeaderLocator.clientLocal.getValue()); //------------------------- //arguments.put(x-expires, 10000); //自动过期10秒 //arguments.put(x-message-ttl, 10000); //自动过期10秒不会删除队列 //QueueBuilder 类里面有定义设置队列溢出行为当达到队列的最大长度时消息会发生什么有效值是drop-head、reject-publish //arguments.put(x-max-length, 5); //arguments.put(x-overflow, QueueBuilder.Overflow.dropHead.getValue()); //表示队列是否是单一活动消费者true时注册的消费组内只有一个消费者消费消息其他被忽略false时消息循环分发给所有消费者(默认false) //arguments.put(x-single-active-consumer, true); // x-max-length-bytes队列消息内容占用最大空间受限于内存大小超过该阈值则从队列头部开始删除消息 //arguments.put(x-max-length-bytes, 10); //参数是1到255之间的正整数表示队列应该支持的最大优先级数字越大代表优先级越高没有设置priority优先级字段那么priority字段值默认为0如果优先级队列priority属性被设置为比x-max-priority大那么priority的值被设置为x-max-priority的值。 //arguments.put(x-max-priority, 10); //将队列设置为延迟模式在磁盘上保留尽可能多的消息以减少RAM的使用;如果未设置队列将保留内存缓存以尽可能快地传递消息 //arguments.put(x-queue-mode, lazy); arguments.put(x-queue-version, 2); // x-queue-master-locator在集群模式下设置镜像队列的主节点信息。 arguments.put(x-queue-master-locator, QueueBuilder.LeaderLocator.clientLocal.getValue()); //--------------------------------------------- // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Nullable MapString, Object arguments) return new Queue(QUEUE, true, false, false, arguments); } Bean public Binding binding(DirectExchange directExchange, Queue queue) { return BindingBuilder.bind(queue).to(directExchange).with(KEY); } }
实验durable 参数 重启rabbitmq-server队列丢失
实验autodelete参数加入接收者发现停掉服务那么久没有消费者了对列就会自动删除 消息可靠性投递
消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功那么这肯定会牺牲一些性能性能与可靠性是无法兼得的
如果业务实时一致性要求不是特别高的场景可以牺牲一些可靠性来换取性能。 确保消息在队列正确地存储
可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失即③出现问题
解决方案
、队列持久化
代码 QueueBuilder.durable(QUEUE).build();
、交换机持久化
代码 ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
、消息持久化
代码
默认持久化 MessageProperties messageProperties new MessageProperties(); //设置消息持久化当然它默认就是持久化所以可以不用设置可以查看源码 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); 、集群镜像队列高可用 确保消息从队列正确地投递到消费者
采用消息消费时的手动ack确认机制来保证
如果消费者收到消息后未来得及处理即发生异常或者处理过程中发生异常会导致④失败。
为了保证消息从队列可靠地达到消费者RabbitMQ提供了消息确认机制message acknowledgement
开启手动ack消息消费确认 spring.rabbitmq.listener.simple.acknowledge-modemanual yml server:port: 8080spring:application:name: rabbit-12-reliabilityrabbitmq:host: 192.168.126.130port: 5672username: adminpassword: 123456virtual-host: powernodepublisher-confirm-type: correlated # 开启发布者的确认模式publisher-returns: true # 开启发布者的return模式listener:simple:acknowledge-mode: manual # 开始消费者的手动确认模式my:exchangeName: exchange.reliabilityqueueName: queue.reliability 配置类
package com.powernode.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitConfig {Value(${my.exchangeName})private String exchangeName;Value(${my.queueName})private String queueName;Beanpublic DirectExchange directExchange(){//默认就是持久化的return ExchangeBuilder.directExchange(exchangeName).build();}Beanpublic Queue queue(){//队列持久化return QueueBuilder.durable(queueName).build();}Beanpublic Binding binding(DirectExchange directExchange,Queue queue){return BindingBuilder.bind(queue).to(directExchange).with(info);}
}消息类
package com.powernode.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;Service
Slf4j
public class MessageService {Resourceprivate RabbitTemplate rabbitTemplate;/*** 构造方法执行后自动执行*/PostConstructpublic void init(){//开启生产者的确定模式rabbitTemplate.setConfirmCallback((correlationData, ack, cause)-{if(!ack){log.error(消息没有到达交换机原因为{},cause);//TODO 重发消息或者记录错误日志}});rabbitTemplate.setReturnsCallback(returnedMessage-{log.error(消息没有从交换机正确的投递路由到队列原因为{},returnedMessage.getReplyText());//TODO 记录错误日志给程序员发短信或者或者邮件});}public void sendMsg(){MessageProperties messagePropertiesnew MessageProperties();//设置单条消息的持久化默认就是持久化messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message MessageBuilder.withBody(hello world.getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend(exchange.reliability,info,message);log.info(消息发送完毕发送时间为{},new Date());}
}发送消息
package com.powernode;import com.powernode.service.MessageService;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.annotation.Resource;SpringBootApplication
public class Application implements ApplicationRunner {Resourceprivate MessageService messageService;public static void main(String[] args) {SpringApplication.run(Application.class, args);}Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}
} 消费者消费消息类手动确认
package com.powernode.message;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;Component
Slf4j
public class ReceiveMessage {RabbitListener(queues {queue.reliability})public void receiveMsg(Message message, Channel channel){//获取消息的唯一标识long deliveryTag message.getMessageProperties().getDeliveryTag();try {log.info(接收到的消息为{},new String(message.getBody()));// TODO 插入订单等
// int a10/0;//手动确认channel.basicAck(deliveryTag,false);} catch (Exception e) {log.error(消息处理出现问题);try {channel.basicNack(deliveryTag,false,true);} catch (IOException ex) {throw new RuntimeException(ex);}throw new RuntimeException(e);}}
}消费者在订阅队列时通过上面的配置不自动确认采用手动确认RabbitMQ会等待消费者显式地回复确认信号后才从队列中删除消息
如果消息消费失败也可以调用basicReject()或者basicNack()来拒绝当前消息而不是确认。如果requeue参数设置为true可以把这条消息重新存入队列以便发给下一个消费者当然只有一个消费者的时候这种方式可能会出现无限循环重复消费的情况可以投递到新的队列中或者只打印异常日志 消息的幂等性
消息消费时的幂等性消息不被重复消费 同一个消息第一次接收正常处理业务如果该消息第二次再接收那就不能再处理业务否则就处理重复了
幂等性是对于一个资源不管你请求一次还是请求多次对该资源本身造成的影响应该是相同的不能因为重复的请求而对该资源重复造成影响
以接口幂等性举例
接口幂等性是指一个接口用同样的参数反复调用不会造成业务错误那么这个接口就是具有幂等性的
注册接口
发送短信验证码接口
比如同一个订单我支付两次但是只会扣款一次第二次支付不会扣款这说明这个支付接口是具有幂等性的
如何避免消息的重复消费问题消息消费时的幂等性
全局唯一ID Redis
生产者在发送消息时为每条消息设置一个全局唯一的messageId消费者拿到消息后使用setnx命令将messageId作为key放到redis中setnx(messageId, 1)若返回1说明之前没有消费过正常消费若返回0说明这条消息之前已消费过抛弃
具体代码参考以下代码
参考代码 //1、把消息的唯一ID写入redis boolean flag stringRedisTemplate.opsForValue().setIfAbsent(idempotent: orders.getId(), String.valueOf(orders.getId())); //如果redis中该key不存在那么就设置存在就不设置 if (flag) { //key不存在返回true //相当于是第一次消费该消息 //TODO 处理业务 System.out.println(正常处理业务..... orders.getId()); }