做暧暧视频网站下载,做外贸是不是必须有网站,营销型网站建设实战感想,孝感企业做网站一、springboot整合RabbitMQ#xff08;jdk17#xff09;#xff08;创建两个项目#xff0c;一个生产者项目#xff0c;一个消费者项目#xff09;
上面使用原生JAVA操作RabbitMQ较为繁琐#xff0c;很多的代码都是重复书写的#xff0c;使用springboot可以简化代码的…一、springboot整合RabbitMQjdk17创建两个项目一个生产者项目一个消费者项目
上面使用原生JAVA操作RabbitMQ较为繁琐很多的代码都是重复书写的使用springboot可以简化代码的编写。
生产者项目 第一步创建springboot工程然后引入rabbitmq的依赖
!-- RabbitMQ起步依赖 --
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency第二步编写配置文件
spring:rabbitmq:host: 192.168.70.130 # 虚拟机的地址port: 5672username: adminpassword: adminvirtual-host: /#日志格式
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n
第三步编写RabbitMQ的配置类
Configuration
public class RabbitmqConfig1 {private final String EXCHANGE_NAME boot_exchange;private final String QUEUE_NAME boot_queue;private final String ROUTE_NAME boot_route;//创建交换机Bean(EXCHANGE_NAME)public Exchange getExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//创建队列Bean(QUEUE_NAME)public Queue getQueue(){return new Queue(QUEUE_NAME);}//交换机和队列绑定Beanpublic Binding exchangeBindQueue(Qualifier(QUEUE_NAME) Queue queue, Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTE_NAME).noargs();}
}
第四步编写发送消息测试类
//编写发送消息测试类
SpringBootTest
public class RabbitmqTest {// 注入RabbitTemplate工具类Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSendMessage(){/*** 发送消息* 参数1交换机* 参数2路由key* 参数3要发送的消息*/rabbitTemplate.convertAndSend(boot_exchange,boot_route,你好我有一个毛衫);System.out.println(发送消息成功);}
}消费者项目 第一步创建springboot工程然后引入rabbitmq的依赖
!-- RabbitMQ起步依赖 --
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency第二步编写配置文件
spring:rabbitmq:host: 192.168.70.130 # 虚拟机的地址port: 5672username: adminpassword: adminvirtual-host: /#日志格式
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n
第三步编写消费者监听队列
Component
public class Consumer1 {/*** 监听队列* param message* queues表示监听的队列的名称*/RabbitListener(queues boot_queue)public void listener(String message){System.out.println(接受到消息 message);}
}
4、rabbitmq的消息可靠性 RabbitMQ消息投递的路径为 生产者---交换机---队列---消费者 在RabbitMQ工作的过程中每个环节消息都可能传递失败那么RabbitMQ是如何监听消息是否成功投递的呢 确认模式confirm可以监听消息是否从生产者成功传递到交换机。 退回模式return可以监听消息是否从交换机成功传递到队列。 消费者消息确认Consumer Ack可以监听消费者是否成功处理消息。
【一】rabbitmq的消息可靠性——确认模式
确认模式confirm可以监听消息是否从生产者成功传递到交换机。创建一个新的生产者项目导入mq上面的第一步操作依赖进行开发也可以在原来的基础上修改信息 代码组成和上面的生产者项目是一样的也是三步内容。
第一步修改配置文件
只是添加了一句代码
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: / # 表示使用默认的virtual-host#开启确认模式publisher-confirm-type: correlated#????
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n第二步在生产者的配置类创建交换机和队列RabbitMQ的配置类
Configuration
public class RabbitmqConfig2Confirm {public final String EXCHANGE_NAME confirm_exchange;public final String QUEUE_NAME confirm_queue;public final String ROUTING_NAME confirm_routing;// 创建交换机Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}// 创建队列Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME).build();}
// 创建交换机和队列绑定Beanpublic Binding exchangeBindQueue(Qualifier(QUEUE_NAME) Queue queue, Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}
第三步编写测试类发生消息生产者定义确认模式的回调方法springboot的测试类能够加载到第二步的配置类 Testvoid testConfirm() {//回调确认rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** param correlationData 配置信息* param b 是否成功true 是 false 否* param s 失败原因*/Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if (b){System.out.println(发送成功);}else{System.out.println(发送失败原因s);}}});//发送消息/*** 发送消息* 参数1交换机* 参数2路由key* 参数3要发送的消息*/rabbitTemplate.convertAndSend(confirm_exchange,confirm_routing,send message...confirm);}由于rabbitmq的confirm确认模式是确认消息是否从生产者成功传递到交换机的所以就没必要写消费者进行信息的消费了
当我们执行测试类的时候先执行rabbitTemplate.convertAndSend(“confirm_exchange”,“confirm_routing”,“send message…confirm”);无论消息是否成功发送都会调用 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()方法如果发送成功则执行if语句的代码如果发送失败则调用else语句的代码。 根据执行的是if或者else的语句就能判断消息是否成功传递到交换机了。
【二】rabbitmq的消息可靠性——退回模式
退回模式return可以监听消息是否从交换机成功传递到队列。创建一个新的生产者项目导入mq上面的第一步操作依赖进行开发也可以在原来的基础上修改信息 代码组成和上面的生产者项目是一样的也是三步内容。
第一步修改配置文件
只是添加了一句
# rabbitmq???
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开启确认模式publisher-confirm-type: correlated#开始回退模式publisher-returns: true#????
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n第二步编写配置类RabbitMQ的配置类
Configuration
public class RabbitmqConfig3Return {public final String EXCHANGE_NAME return_exchange;public final String QUEUE_NAME return_queue;public final String ROUTING_NAME return_routing;
// 创建交换机Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}// 创建队列Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME).build();}// 创建交换机和队列绑定Beanpublic Binding exchangeBindQueue(Qualifier(QUEUE_NAME) Queue queue, Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}
第三步编写测试类发生消息生产者定义退回模式的回调方法springboot的测试类能够加载到第二步的配置类
Testvoid testReturnSendMessage(){
// 调用回退模式的回调方法只有失败才会回调成功不会回调哦
// 失败后将失败信息封装到参数中rabbitTemplate.setReturnsCallback(returned -{Message message returned.getMessage();System.out.println(消息对象message);System.out.println(错误码returned.getReplyCode());System.out.println(错误信息returned.getReplyText());System.out.println(交换机returned.getExchange());System.out.println(路由键returned.getRoutingKey());});// 发送消息/*** 发送消息* 参数1交换机* 参数2路由key* 参数3要发送的消息*/rabbitTemplate.convertAndSend(return_exchange,return_routing,send message...return);}由于rabbitmq的return回退模式是确认消息是否从交换机成功传递到队列的还没有传递到消费者所以就没必要写消费者进行信息的消费了
当我们执行测试类的时候先执行rabbitTemplate.convertAndSend(“return_exchange”,“return_routing”,“send message…return”);如果消息成功发送到队列上则不会调用 rabbitTemplate.setReturnsCallback方法如果发送步成功则调用回调方法rabbitTemplate.setReturnsCallback 根据运行结果就可以知道在传递消息到队列上的时候哪里发生错误了 【三】rabbitmq的消息可靠性——消费者消息确认Consumer Ack
在RabbitMQ中消费者接收到消息后会向队列发送确认签收的消息只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认Consumer Acknowledge简称Ack。 类似快递员派送快递也需要我们签收否则一直存在于快递公司的系统中。 消费者消息确认Consumer Acknowledge简称Ack分为自动确认和手动确认。 自动确认指消息只要被消费者接收到无论是否成功处理消息则自动签收并将消息从队列中移除。但是在实际开发中收到消息后可能业务处理出现异常那么消息就会丢失。此时需要设置手动签收即在业务处理成功后再通知签收消息如果出现异常则拒签消息让消息依然保留在队列当中。
● 自动确认spring.rabbitmq.listener.simple.acknowledge“none” ● 手动确认spring.rabbitmq.listener.simple.acknowledge“manual”
创建一个新的生产者项目和新的消费者项目导入mq上面的第一步操作依赖进行开发也可以在原来的基础上修改信息 代码组成和上面的生产者项目是一样的也是三步内容。
生产者项目第一步修改配置文件
不用修改
# rabbitmq???
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开启确认模式publisher-confirm-type: correlated#开始回退模式publisher-returns: true#????
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n生产者项目第二步编写配置类RabbitMQ的配置类
Configuration
public class RabbitmqConfig4ACK {public final String EXCHANGE_NAME ack_exchange;public final String QUEUE_NAME ack_queue;public final String ROUTING_NAME ack_routing;
// 创建交换机Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}// 创建队列Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME).build();}// 创建交换机和队列绑定Beanpublic Binding exchangeBindQueue(Qualifier(QUEUE_NAME) Queue queue, Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}生产者项目第三步编写测试类发生消息springboot的测试类能够加载到第二步的配置类 Testvoid testAck(){// 发送消息rabbitTemplate.convertAndSend(ack_exchange,ack_routing,send message...ack);}消费者项目自动确认第一步修改配置文件
消费者消息确认——自动确认的配置文件
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开动手动签收listener:simple:acknowledge-mode: none # 默认就是自动确认
#????
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n自动签收模式就是消息只要被消费者接收到无论是否成功处理消息则自动签收并将消息从队列中移除。当我们拿到消息的时候业务出现异常了所以无法正确处理消息导致消息丢失了。
消费者项目自动确认第二步编写消费者类监听队列
自动确认的消费者类
Component
public class AckConsumer {
// 自动签收RabbitListener(queues ack_queue)public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 获取消息String s new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(s);
// TODO处理事务
// 故意出错int i 1/0;}}
消费者项目手动确认第一步修改配置文件
消费者消息确认——手动确认的配置文件
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开动手动签收listener:simple:acknowledge-mode: manual
#????
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n消费者项目手动确认第二步编写消费者类监听队列
手动确认
Component
public class AckConsumer {// 手动签收RabbitListener(queues ack_queue)public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 消息投递序号消息每次投递该值都会1long deliveryTag message.getMessageProperties().getDeliveryTag();try {
// int i 1/0; //模拟处理消息出现bugSystem.out.println(成功接受到消息:message);// 签收消息/*** 参数1消息投递序号* 参数2是否一次可以签收多条消息*/channel.basicAck(deliveryTag,true);}catch (Exception e){System.out.println(消息消费失败);Thread.sleep(2000);// 拒签消息/*** 参数1消息投递序号* 参数2是否一次可以拒签多条消息* 参数3拒签后消息是否重回队列*/channel.basicNack(deliveryTag,true,true);}}
} 手动签收模式就是如果出现异常则拒签消息让消息依然保留在队列当中。方便下次请求能够请求到这次因为异常而没有接收到的消息。
【四】RabbitMQ高级特性——消费端限流 前面说过MQ可以对请求进行“削峰填谷”即通过消费端限流的方式限制消息的拉取速度达到保护消费端的目的。使用【三】rabbitmq的消息可靠性——消费者消息确认Consumer Ack的项目消费者使用手动确认模式的代码即可但是要修改配置文件
第一步先在生产者项目中发送多个消息
Testpublic void testLimitSendBatch() {// 发送十条消息for (int i 0; i 10; i) {rabbitTemplate.convertAndSend(ack_exchange, ack_routing, 这是第i条消息);}}第二步修改消费者项目的配置文件
最主要就是配置文件的修改
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开动手动签收listener:simple:acknowledge-mode: manual #none是默认的prefetch: 5 # 每次消费者从队列拉取的消息数量限制#????
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n第三步重新编写消费者类
Component
public class ConsumerLimit {
// 手动签收RabbitListener(queues limit_queue)public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 获取消息String s new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(s);// 模拟业务处理Thread.sleep(3000);long deliveryTag message.getMessageProperties().getDeliveryTag();
// 手动签收channel.basicAck(deliveryTag,true);}}
其实就是修改了消费者项目的配置文件添加一条配置信息限制消费者消息的拉取速度。
【五】RabbitMQ高级特性——利用限流实现不公平分发
在RabbitMQ中多个消费者监听同一条队列则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好例如消费者1处理任务的速度非常快而其他消费者处理速度却很慢。此时如果采用公平分发则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发即谁处理的快谁处理的消息多。
在【四】RabbitMQ高级特性——消费端限流的基础上修改一消费者项目的配置文件然后在消费者类中多写几个监听消息的方法或者多写几个消费者类。
第一步修改消费者项目的配置文件
最主要就是配置文件的修改
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开动手动签收listener:simple:acknowledge-mode: manual #none是默认的prefetch: 1 # 消费端最多拉取1条消息消费这样谁处理的快谁拉取下一条消息实现了不公平分发#????
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n
第二步修改消费者类编写多个监听方法
Component
public class ConsumerUnfair {
// 消费者1RabbitListener(queues ack_queue)public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 获取消息String s new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(消费者1s);Thread.sleep(3000);long deliveryTag message.getMessageProperties().getDeliveryTag();
// 手动签收channel.basicAck(deliveryTag,true);}// 消费者2RabbitListener(queues ack_queue)public void listenMessage2(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 获取消息String s new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(消费者2s);Thread.sleep(1000);long deliveryTag message.getMessageProperties().getDeliveryTag();
// 手动签收channel.basicAck(deliveryTag,true);}// .......监听方法
}
最主要的就是消费者项目的配置文件的修改 配置消费端最多拉取1条消息消费这样谁处理的快谁拉取下一条消息实现了不公平分发。
【六】RabbitMQ高级特性——消息存活时间
RabbitMQ可以设置消息的存活时间Time To Live简称TTL当消息到达存活时间后还没有被消费会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间也可以对某条消息设置存活时间。
使用【三】rabbitmq的消息可靠性——消费者消息确认Consumer Ack的项目消费者使用手动确认模式的代码
第一步修改生产者项目的配置类 Configuration
public class RabbitmqConfig7ttl {public final String EXCHANGE_NAME ack_exchange;public final String QUEUE_NAME ack_queue;public final String ROUTING_NAME ack_routing;
// 创建交换机Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}// 创建队列Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME)
// 设置队列的超时的时间单位是毫秒.ttl(10000).build();}// 创建交换机和队列绑定Beanpublic Binding exchangeBindQueue(Qualifier(QUEUE_NAME) Queue queue, Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}
第二步修改生产者项目的测试类
设置单条消息存活时间 Testpublic void testTtlSendBatch() {// 发送十条消息for (int i 0; i 100; i) {if (i%5 0) {//设置消息属性MessageProperties messageProperties new MessageProperties();//设置存活时间messageProperties.setExpiration(10000);// 创建消息对象可以配置消息的一些配置Message message new Message((这是第 i 条消息).getBytes(StandardCharsets.UTF_8), messageProperties);// 发送消息rabbitTemplate.convertAndSend(ack_exchange, ack_routing, message);}else {rabbitTemplate.convertAndSend(ack_exchange, ack_routing, 这是第 i 条消息);}}}如果设置了单条消息的存活时间也设置了队列的存活时间以时间短的为准。 消息过期后并不会马上移除消息只有消息消费到队列顶端时才会移除该消息
【七】RabbitMQ高级特性——优先级队列
假设在电商系统中有一个订单催付的场景即客户在一段时间内未付款会给用户推送一条短信提醒但是系统中分为大型商家和小型商家。比如像苹果小米这样大商家一年能给我们创造很大的利润所以在订单量大时他们的订单必须得到优先处理此时就需要为不同的消息设置不同的优先级此时我们要使用优先级队列。
使用【三】rabbitmq的消息可靠性——消费者消息确认Consumer Ack的项目消费者使用手动确认模式的代码
第一步修改生产者项目的配置类 Configuration
public class RabbitmqConfig8Priority {public final String EXCHANGE_NAME priority_exchange;public final String QUEUE_NAME priority_queue;public final String ROUTING_NAME priority_routing;
// 创建交换机Bean(EXCHANGE_NAME)public Exchange exchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}// 创建队列Bean(QUEUE_NAME)public Queue queue(){return QueueBuilder.durable(QUEUE_NAME)
// 设置队列的优先级,值越大优先级越高一般不超过10.maxPriority(10).build();}// 创建交换机和队列绑定Beanpublic Binding exchangeBindQueue(Qualifier(QUEUE_NAME) Queue queue, Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_NAME).noargs();}
}
第二步修改生产者项目的测试 Testpublic void testPrioritySendBatch() {// 发送十条消息for (int i 0; i 100; i) {if (i%5 0) {//设置消息属性MessageProperties messageProperties new MessageProperties();
// 设置优先级messageProperties.setPriority(9);// 创建消息对象可以配置消息的一些配置Message message new Message((这是第 i 条消息).getBytes(StandardCharsets.UTF_8), messageProperties);// 发送消息rabbitTemplate.convertAndSend(priority_exchange, priority_routing, message);}else {rabbitTemplate.convertAndSend(priority_exchange, priority_routing, 这是第 i 条消息);}}}设置了消息的优先级那么消费者项目在消费消息的时候就会优先消费等级高的消息。
【八】RabbitMQ高级特性——死信队列
在MQ中当消息成为死信Dead message后消息中间件可以将其从当前队列发送到另一个队列中当前队列就是死信队列。而在RabbitMQ中由于有交换机的概念实际是将死信发送给了死信交换机Dead Letter Exchange简称DLX。死信交换机和死信队列和普通的没有区别。 消息成为死信的情况 队列消息长度到达限制。 消费者拒接消费消息basicNack/basicReject,并且不把消息重新放入原目标队列,requeuefalse 消息到达存活时间未被消费。
生产者项目第一步修改配置文件
# rabbitmq???
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开启确认模式publisher-confirm-type: correlated#开始回退模式publisher-returns: true#????
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n生产者项目第二步编写配置类RabbitMQ的配置类
Configuration
public class RabbitmqConfig9Dead {// 死信private final String DEAD_EXCHANGE dead_exchange;private final String DEAD_QUEUE dead_queue;private final String DEAD_ROUTING dead_routing;// 死信交换机Bean(DEAD_EXCHANGE)public Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build();}// 死信队列Bean(DEAD_QUEUE)public Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE).build();}// 死信交换机绑定死信队列Beanpublic Binding bindDeadQueue(Qualifier(DEAD_EXCHANGE) Exchange exchange,Qualifier(DEAD_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING).noargs();}// 普通private final String NORMAL_EXCHANGE normal_exchange;private final String NORMAL_QUEUE normal_queue;private final String NORMAL_ROUTING normal_routing;// 普通交换机Bean(NORMAL_EXCHANGE)public Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).durable(true).build();}// 普通队列Bean(NORMAL_QUEUE)public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机.deadLetterRoutingKey(DEAD_ROUTING) // 死信队列路由关键字.ttl(10000) // 消息存活10s.maxLength(10) // 队列最大长度为10.build();}// 普通交换机绑定普通队列Beanpublic Binding bindNormalQueue(Qualifier(NORMAL_EXCHANGE) Exchange exchange,Qualifier(NORMAL_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING).noargs();}
}
生产者项目第三步编写测试类发生消息springboot的测试类能够加载到第二步的配置类
Test
public void testDlx(){// 存活时间过期后变成死信// rabbitTemplate.convertAndSend(normal_exchange,normal_routing,测试死信);// 超过队列长度后变成死信// for (int i 0; i 20; i) {// rabbitTemplate.convertAndSend(normal_exchange,normal_routing,测试死信);// }// 消息拒签但不返回原队列后变成死信rabbitTemplate.convertAndSend(normal_exchange,normal_routing,测试死信);
}消费者项目手动确认第一步修改配置文件
消费者消息确认——手动确认的配置文件
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开动手动签收listener:simple:acknowledge-mode: manual
#????
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n消费者项目手动确认第二步编写消费者类监听队列
手动确认
Component
public class ConsumerDead {RabbitListener(queues normal_queue)public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException, IOException {
// 获取消息String s new String(message.getBody(), StandardCharsets.UTF_8);System.out.println(消费者1s);Thread.sleep(500);long deliveryTag message.getMessageProperties().getDeliveryTag();
// 拒绝签收channel.basicNack(deliveryTag,true,false);}
死信队列小结 死信交换机和死信队列和普通的没有区别 当消息成为死信后如果该队列绑定了死信交换机则消息会被死信交换机重新路由到死信队列 消息成为死信的三种情况 队列消息长度到达限制 消费者拒接消费消息并且不重回队列 原队列存在消息过期设置消息到达超时时间未被消费
【九】RabbitMQ高级特性——延迟队列
延迟队列即消息进入队列后不会立即被消费只有到达指定时间后才会被消费。 例如 下单后30分钟未支付取消订单回滚库存。 新用户注册成功7天后发送短信问候。 实现方式 定时器 延迟队列
RabbitMQ中并未提供延迟队列功能我们可以使用死信队列实现延迟队列的效果。 延迟队列 指消息进入队列后可以被延迟一定时间再进行消费。 RabbitMQ没有提供延迟队列功能但是可以使用 TTL DLX 来实现延迟队列效果。
第一步创建springboot项目并添加依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId
/dependency
dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId
/dependency
第二步编写配置文件
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开动手动签收listener:simple:acknowledge-mode: manual
# ????
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n
第三步编写配置类
Configuration
public class RabbitMQConfig {private final String DEAD_EXCHANGE order_expire_exchange;private final String DEAD_QUEUE order_expire_queue;private final String DEAD_ROUTING order_expire_routing;private final String ORDER_EXCHANGE order_exchange;private final String ORDER_QUEUE order_queue;private final String ORDER_ROUTING order_routing;// 死信交换机Bean(DEAD_EXCHANGE)public Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build();}// 死信队列Bean(DEAD_QUEUE)public Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE).build();}// 死信交换机绑定死信队列Beanpublic Binding bindDeadQueue(Qualifier(DEAD_EXCHANGE) Exchange exchange, Qualifier(DEAD_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING).noargs();}// 普通交换机Bean(ORDER_EXCHANGE)public Exchange normalExchange(){return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build();}// 普通队列Bean(ORDER_QUEUE)public Queue normalQueue(){return QueueBuilder.durable(ORDER_QUEUE).deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机.deadLetterRoutingKey(DEAD_ROUTING) // 死信队列路由关键字.ttl(10000) // 消息存活10s模拟30min超时.build();}// 普通交换机绑定普通队列Beanpublic Binding bindNormalQueue(Qualifier(ORDER_EXCHANGE) Exchange exchange,Qualifier(ORDER_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with(ORDER_ROUTING).noargs();}
}第四步创建控制器完成下单功能
RestController
public class OrderController {//注入MQAutowiredprivate RabbitTemplate rabbitTemplate;RequestMapping(/addOrder)public String addOrder(){//生成订单号String orderNumber 2030061812251234;//在service层完成订单逻辑//将订单号发送到订单mq,30分钟过期进入死信队列死信队列消费查询订单支付状态做对应处理rabbitTemplate.convertAndSend(order_exchange,order_routing,orderNumber);return 下单成功! 您的订单号为 orderNumber;}
}第五步创建消费者监听消息
Component
public class ListenerOrder {//监听订单过期队列RabbitListener(queues order_expire_queue)public void orderListener(String orderId){System.out.println(orderId orderId);//根据订单id查询订单状态是否支付/*** 监听死信队列的类回去30min超时订单号根据订单号查询订单的支付状态* 支付走下一步流程* 未支付关闭订单库存回滚*/}
}手动签收模式的结果
在手动签收模式的时候当我们启动项目访问订单功能时立刻生成了一个队列消息 然后因为是手动签收模式所以在消息的存活时间过去了之后成为了死信消息所以被消息被拒收了但是还存在队列中。
自动签收模式的结果
spring:rabbitmq:host: 192.168.70.130port: 5672username: adminpassword: adminvirtual-host: /#开动自动签收listener:simple:acknowledge-mode: none # 默认的
# ????
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n
在自动签收模式的时候当我们启动项目访问订单功能时立刻生成了一个队列消息 因为是自动签收的所以消息过了存活时间之后就没了自动确认指消息只要被消费者接收到无论是否成功处理消息则自动签收并将消息从队列中移除
RabbitMQ一、RabbitMQ的介绍与安装docker
RabbitMQ二、RabbitMQ的六种模式