高性价比网站建设,太原网站建设谁家好,瑞安商业网站建设,网络架构三层目录前言工作流程-灵魂画手名词解释交换机类型一、安装1.1 [RabbitMQ官网安装](https://www.rabbitmq.com/download.html)1.2 Docker安装并启动二、食用教程2.1.导入依赖2.2 添加配置2.3 代码实现2.3.1 直连#xff08;Direct#xff09;类型2.3.2 引入消息手动确认机制2.3.2…
目录前言工作流程-灵魂画手名词解释交换机类型一、安装1.1 [RabbitMQ官网安装](https://www.rabbitmq.com/download.html)1.2 Docker安装并启动二、食用教程2.1.导入依赖2.2 添加配置2.3 代码实现2.3.1 直连Direct类型2.3.2 引入消息手动确认机制2.3.2 广播Fanout类型2.3.3 主题Topic类型三、实战应用场景3.1 如何控制消息有序3.2 保证消息不被重复消费幂等性3.3 保证消息的可靠性3.4 死信队列解决订单超时未支付总结前言
RabbitMQ是一个由erlang语言开发实现了AMQP(Advanved Message Queue Protocol)高级消息队列协议的消息服务中间件。
工作流程-灵魂画手 1、生产者Producer和消费者Consumer都需要在与RabbitMQ建立长连接Connection的前提下才能收发消息 2、客户端生产者、消费者和服务端RabbitMQ只能建立一条长连接在长连接中开辟一条条的信道进行收发消息 3、生产者发送消息消息到达Broker指定虚拟主机服务会配置的指定交换机发送消息会指定根据路由键和交换机与队列的绑定关系把消息发送给对应的队列 3、消费者通过信道监听队列消息进入队列就可以被消费者实时拿到
名词解释
1、Broker message broker 消息代理消息队列服务器实体简单理解为邮局寄收件都要通过它。 2、JMSJava Message ServiceJAVA消息服务。是基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现 3、AMQPAdvanced Message Queuing Protocol 高级消息队列协议也是一个消息代理的规范兼容JMS RabbitMQ是AMQP的实现 4、Message 消息由消息头和消息体组成。消息体是不透明的而消息头则由一系列的可选属性组成这些属性包括routing-key路由键、priority相对于其他消息的优先权、delivery-mode指出该消息可能需要持久性存储等。 5、Producer消息的生产者也是一个向交换器发布消息的客户端应用程序。 6、Exchange 交换机 用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Exchange常用有3种类型direct、fanout、 topic、不同类型的Exchange转发消息的策略有所区别 7、Queue 消息队列用来保存消息直到发送给消费者。它是消息的容器也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面等待消费者连接到这个队列将其取走。 8、Binding绑定用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则所以可以将交换器理解成一个由绑定构成的路由表。 Exchange 和 Queue 的绑定可以是多对多的关系。 9、Connection 网络连接比如一个TCP连接。 10、Channel 信道多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接AMQP 命令都是通过信道发出去的不管是发布消息、订阅队列还是接收消息这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销所以引入了信道的概念以复用一条 TCP 连接。 11、Consumer 消费者表示一个从消息队列中取得消息的客户端应用程序。 12、Virtual Host 虚拟主机表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础必须在连接时指定RabbitMQ 默认的 vhost 是 / 。
交换机类型 一、安装
1.1 RabbitMQ官网安装
1.2 Docker安装并启动
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management# 开机自启
docker update rabbitmq --restartalways● 5672 (AMQP端口) ● 15672 (web管理后台端口)
本地安装可通过http://127.0.0.1:15672/访问用户名密码默认guest
二、食用教程
2.1.导入依赖
!--RabbitMQ--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency2.2 添加配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: root #用户名 默认guestpassword: root #密码 默认guestvirtual-host: springboot-test #虚拟主机 默认/2.3 代码实现
2.3.1 直连Direct类型
直连型交换机根据消息携带的路由键将消息投递给对应队列。 直连类型初始化配置 import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 1、直连交换机配置*/
Configuration
public class DirectRabbitConfig {public static final String DIRECT_QUEUE DirectQueue;public static final String DIRECT_EXCHANGE DirectExchange;public static final String DIRECT_ROUTING DirectRouting;/*** durable:是否持久化,默认是false,持久化队列会被存储在磁盘上当消息代理重启时仍然存在暂存队列当前连接有效* exclusive:默认也是false只能被当前创建的连接使用而且当连接关闭后队列即被删除。此参考优先级高于durable* autoDelete:是否自动删除当没有生产者或者消费者使用此队列该队列会自动删除。* return new Queue(TestDirectQueue,true,true,false);*/Beanpublic Queue directQueue() {return new Queue(DIRECT_QUEUE,false);}BeanDirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE,false,false);}BeanBinding binding() {return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTING);}
}该配置主要把队列、交换机、绑定都交由spring管理记得声明队列、交换机、建立绑定关系。消息指定交换机发送后交换机就可以根据路由键把消息发送到匹配的队列上。 消费者 import com.chendi.springboot_rabbitmq.config.DirectRabbitConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;Slf4j
Component
public class DirectReceiver {RabbitListener(queues DirectRabbitConfig.DIRECT_QUEUE)public void receiver(String dataMsg) {log.info(接收者A dataMsg:{} ,dataMsg);}RabbitListener(queues DirectRabbitConfig.DIRECT_QUEUE)public void receiver(String dataMsg) {log.info(接收者B dataMsg:{} ,dataMsg);}
}生产者 RestController
RequiredArgsConstructor
public class RabbitMQTestController {final RabbitTemplate rabbitTemplate;GetMapping(/sendDirectMessage)public String sendDirectMessage() {for (int i 0; i 10; i) {String messageData Hello World i;//可自定义消息体类型rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE, DirectRabbitConfig.DIRECT_ROUTING, messageData);}return 发送完成;}
}运行发现默认情况下RabbitMQ轮询分发将按顺序将每个消息发送给下一个使用者。有如下缺点 1、无法保证消息已被消费 2、处理消息快的服务得到的消息和处理消息慢的服务是一样多的公平分发、能者多劳。 2.3.2 引入消息手动确认机制 配置文件 spring:rabbitmq:listener:simple:acknowledge-mode: manual # 设置消费端手动 ack#表示消费者端每次从队列拉取多少个消息进行消费,直到手动确认消费完毕后,才会继续拉取下一条prefetch: 1 # 预加载消息数量--QOS消费者应答 Slf4j
Component
public class DirectReceiver {RabbitListener(queues DirectRabbitConfig.DIRECT_QUEUE)public void receiver(String dataMsg, Channel channel, Message message) throws IOException, InterruptedException {long deliveryTag message.getMessageProperties().getDeliveryTag();Thread.sleep(1000);log.info(接收者A deliveryTag:{} dataMsg:{} ,deliveryTag ,dataMsg);channel.basicAck(deliveryTag,true);}RabbitListener(queues DirectRabbitConfig.DIRECT_QUEUE)public void receiver2(String dataMsg, Channel channel, Message message) throws IOException {long deliveryTag message.getMessageProperties().getDeliveryTag();log.info(接收者B deliveryTag:{} dataMsg:{} ,deliveryTag ,dataMsg);channel.basicAck(deliveryTag,true);}
}回执方法( 1、channel.basicAck表示成功确认使用此回执方法后消息会被rabbitmq broker 删除。 2、channel.basicNack表示失败确认一般在消费消息业务异常时用到此方法、可决定消息是否重新入列 3、channel.basicReject 拒绝消息与basicNack区别在于不能进行批量操作其他用法很相似。 2.3.2 广播Fanout类型
扇型交换机这个交换机没有路由键概念这个交换机在接收到消息后会直接转发到绑定到它上面的所有队列。 广播类型配置类 import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*** 2、广播、扇出交换机*/
Configuration
public class FanoutRabbitConfig {public final static String FANOUT_EXCHANGE fanoutExchange;public static final String FANOUT_QUEUE_A fanoutQueueA;public static final String FANOUT_QUEUE_B fanoutQueueB;public static final String FANOUT_QUEUE_C fanoutQueueC;/*** 创建三个队列* 将三个队列都绑定在交换机 fanoutExchange 上* 因为是扇型交换机, 路由键无需配置,配置也不起作用*/Beanpublic Queue queueA() {return new Queue(FANOUT_QUEUE_A);}Beanpublic Queue queueB() {return new Queue(FANOUT_QUEUE_B);}Beanpublic Queue queueC() {return new Queue(FANOUT_QUEUE_C);}BeanFanoutExchange fanoutExchange() {return new FanoutExchange(FANOUT_EXCHANGE);}BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}
}消费者 import com.chendi.springboot_rabbitmq.config.FanoutRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;//如果开启了消息手动确认机制一定要记得应答消息噢
//不然消息会一直堆积在mq里
Slf4j
Component
public class FanoutReceiver {RabbitListener(queues FanoutRabbitConfig.FANOUT_QUEUE_A)public void fanout_A(String message) {log.info(fanout_A {} , message);}RabbitListener(queues FanoutRabbitConfig.FANOUT_QUEUE_B)public void fanout_B(String message) {log.info(fanout_B {} , message);}RabbitListener(queues FanoutRabbitConfig.FANOUT_QUEUE_C)public void fanout_C(String message) {log.info(fanout_C {} , message);}
}测试生产者 Controller加上 GetMapping(/sendFanoutMessage)
public String sendFanoutMessage() {String messageData 这是一条广播消息;rabbitTemplate.convertAndSend(FanoutRabbitConfig.FANOUT_EXCHANGE, , messageData);return 发送完成;
}2.3.3 主题Topic类型
主题交换机特点就是在它的路由键和绑定键之间是有规则的。 「*」 (星号) 用来表示一个单词 (必须出现的) 「#」 (井号) 用来表示任意数量零个或多个单词 主题交换机不绑定路由键时是直连交换机绑定「#」号时是扇形交换机。 主题模式配置类 import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*** 主题交换机* 转发规则* #匹配一个或者多个词* *匹配一个或者0个词* 比如 有msg.# 、msg.* 匹配规则* msg.# 会匹配 msg.email、msg.email.b、msg.email.a* msg.* 只会匹配 msg.email 和 msg */
Configuration
public class TopicRabbitConfig {//绑定键public final static String MSG_EMAIL msg.email;public final static String MSG_EMAIL_A msg.email.a;public final static String MSG_SMS msg.sms;public final static String TOPIC_EXCHANGE topicExchange;Beanpublic Queue firstQueue() {return new Queue(TopicRabbitConfig.MSG_EMAIL);}Beanpublic Queue secondQueue() {return new Queue(TopicRabbitConfig.MSG_EMAIL_A);}Beanpublic Queue thirdQueue() {return new Queue(TopicRabbitConfig.MSG_SMS);}BeanTopicExchange exchange() {return new TopicExchange(TOPIC_EXCHANGE);}BeanBinding bindingExchangeMessage() {return BindingBuilder.bind(firstQueue()).to(exchange()).with(MSG_EMAIL);}BeanBinding bindingExchangeMessage2() {return BindingBuilder.bind(secondQueue()).to(exchange()).with(msg.#);}BeanBinding bindingExchangeMessage3() {return BindingBuilder.bind(thirdQueue()).to(exchange()).with(msg.*);}
}消费者 import com.chendi.springboot_rabbitmq.config.TopicRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
Slf4j
Component
public class TopicReceiver {RabbitListener(queues TopicRabbitConfig.MSG_EMAIL)public void topic_man(String message) {log.info(队列{} 收到消息{} ,TopicRabbitConfig.MSG_EMAIL, message);}RabbitListener(queues TopicRabbitConfig.MSG_SMS)public void topic_woman(String message) {log.info(队列{} 收到消息{} ,TopicRabbitConfig.MSG_SMS, message);}RabbitListener(queues TopicRabbitConfig.MSG_EMAIL_A)public void xxx(String message) {log.info(队列{} 收到消息{} ,TopicRabbitConfig.MSG_EMAIL_A, message);}
}测试生产者 Controller加上 GetMapping(/sendTopicMessage)
public String sendTopicMessage() {rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TopicRabbitConfig.MSG_EMAIL, Hello Topic所有队列都可以收到这条信息);rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TopicRabbitConfig.MSG_EMAIL_A, 只有 msg.email.a可以收到这条信息);rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TopicRabbitConfig.MSG_SMS, msg.email.a 和 msg.sms可以收到这条信息);return 发送完成;
}如果开启了消息手动确认机制一定要记得应答消息噢 以上整合就完成了。
三、实战应用场景
3.1 如何控制消息有序 1、当只有一个消费者可以保证消息有序但是效率低。 2、生产者顺序发送消息到队列但是多个消费者监听一个队列时会轮询分发导致乱序。修改为一个消费者只监听一个队列生产者自定义投放策略1、2、3投放到A队列4、5、6投放到B队列顺序的消息为一个整体投放至一个队列。 3.2 保证消息不被重复消费幂等性 在消费者消费结束后正常情况下会发送回执给消息队列证明该消息已被消费。但是此时消费者网络传输故障或者宕机了消息队列收不到消息被消费的回执会将消息再分发给其他消费者进而导致消息被消费多次。 ······· 解决方法具体问题具体分析 1、在redis中维护一个set生产者在发送消息前加上全局唯一的id消费者消费之前去redis中查一下看是否消费过如果没有消费过则继续执行。 //生产者
public void sendMessageIde() {MessageProperties properties new MessageProperties();properties.setMessageId(UUID.randomUUID().toString());Message message new Message(消息.getBytes(), properties);rabbitTemplate.convertAndSend(exchange, , message);
}//消费者
RabbitListener(queues queue)
RabbitHandler
public void processIde(Message message, Channel channel) throws IOException {if (stringRedisTemplate.opsForValue().setIfAbsent(message.getMessageProperties().getMessageId(),1)){// 业务操作...System.out.println(消费消息 new String(message.getBody(), UTF-8));// 手动确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}3.3 保证消息的可靠性 消息发送流程 可以看出生产者发送的消息准确抵达消费者分为两部分 1、发送端 消息投递到Broker成功时回调confirmCallback交换机投递到队列失败时回调returnCallback 2、消费端的ack 配置文件 spring:rabbitmq:publisher-returns: true # 开启消息抵达队列的确认 # 低版本 publisher-confirms: truepublisher-confirm-type: correlated # 开启发送端确认配置类 /*** 常用的三个配置如下* 1---设置手动应答acknowledge-mode: manual* 2---设置生产者消息发送的确认回调机制 ( #这个配置是保证提供者确保消息推送到交换机中不管成不成功都会回调* publisher-confirm-type: correlated* #保证交换机能把消息推送到队列中* publisher-returns: true* template:* #以下是rabbitmqTemplate配置* mandatory: true)* 3---设置重试*/
Slf4j
Configuration
public class RabbitConfig {Autowiredprivate ConnectionFactory rabbitConnectionFactory;// 存在此名字的bean 自带的容器工厂会不加载yml下rabbitmq下的template的配置如果想自定义来区分开 需要改变bean 的名称Beanpublic RabbitTemplate rabbitTemplate(){RabbitTemplate rabbitTemplatenew RabbitTemplate(rabbitConnectionFactory);//默认是用jdk序列化//数据转换为json存入消息队列方便可视化界面查看消息数据rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);//此处设置重试template后会再生产者发送消息的时候调用该template中的调用链rabbitTemplate.setRetryTemplate(rabbitRetryTemplate());rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {if(!ack){System.out.println(ConfirmCallback 相关数据 correlationData);System.out.println(ConfirmCallback 确认情况 ack);System.out.println(ConfirmCallback 原因 cause);}});rabbitTemplate.setReturnsCallback((ReturnedMessage returned) - {System.out.println(ReturnsCallback 消息 returned.getMessage());System.out.println(ReturnsCallback 回应码 returned.getReplyCode());System.out.println(ReturnsCallback 回应消息 returned.getReplyText());System.out.println(ReturnsCallback 交换机 returned.getExchange());System.out.println(ReturnsCallback 路由键 returned.getRoutingKey());});return rabbitTemplate;}//重试的TemplateBeanpublic RetryTemplate rabbitRetryTemplate() {RetryTemplate retryTemplate new RetryTemplate();// 设置监听 调用重试处理过程retryTemplate.registerListener(new RetryListener() {Overridepublic T, E extends Throwable boolean open(RetryContext retryContext, RetryCallbackT, E retryCallback) {// 执行之前调用 返回false时会终止执行//log.info(执行之前调用 返回false时会终止执行);return true;}Overridepublic T, E extends Throwable void close(RetryContext retryContext, RetryCallbackT, E retryCallback, Throwable throwable) {// 方法结束的时候调用if(retryContext.getRetryCount() 0){log.info(最后一次调用);}}Overridepublic T, E extends Throwable void onError(RetryContext retryContext, RetryCallbackT, E retryCallback, Throwable throwable) {// 方法异常时会调用log.info(第{}次调用, retryContext.getRetryCount());}});return retryTemplate;}
}发送端测试 import com.chendi.springboot_rabbitmq.config.DirectRabbitConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;RestController
public class SendCallbackMessageController {AutowiredRabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法ResponseBodyGetMapping(/sendMessageToExchangeFail)public Object sendMessageToExchangeFail() {String messageData 这条消息不会到达交换机;rabbitTemplate.convertAndSend(不存在的交换机, , messageData, new CorrelationData(UUID.randomUUID().toString()));return messageData;}ResponseBodyGetMapping(/sendMessageToQueueFail)public Object sendMessageToQueueFail() {String messageData 这条消息不会到达队列;rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE, 不存在的路由键, messageData, new CorrelationData(UUID.randomUUID().toString()));return messageData;}
}
请求结果
3.4 死信队列解决订单超时未支付 场景当顾客购买一件商品存在的操作 生成订单 》 扣减库存 》 完成支付 当库存只剩1件时A用户下单但是迟迟未支付会导致B用户下单时判断库存不足导致生成订单失败。 此时就需要解决订单超时未支付的问题。 流程 初始化两组正常队列和交换机A、BA组的初始化参数x-dead-letter-exchange、x-dead-letter-routing-key指向B组的交换机和路由键。意在A中删除或过期的数据可以放入指定交换机指定路由键的队列中。 -这样如果设置了订单超过5min未支付 发送方在发送消息时指定过期时间为5 * 60 * 1000 时间过期后此消息会投递到队列B死信队列中队列B根据订单id去判断是否支付去做加库存等相应的操作。 死信队列配置类 import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 解决订单超时未支付的问题** 创建两个队列* 1、队列A正常的队列只是设置了某些参数设置队列中的超时未消费信息指定丢到对应的队列B* 2、队列B也是一个正常的队列只是把超时的信息丢给它所以称呼为死信队列*/Configuration
public class DeadLetterExchangeConfig {/*** x-message-tti(Time-To-Live)发送到队列的消息在丟弃之前可以存活多长时间(毫秒)* x-max-length限制队列最大长度新增后挤出最早的单位个数* x-expires队列没有访问超时时自动删除包含没有消费的消息单位毫秒* x-max-length-bytes限制队列最大容量* x-dead-letter-exchange死信交换机将删除/过期的数据放入指定交换机* x-dead-letter-routing-key死信路由将删除/过期的数据放入指定routingKey* x-max-priority队列优先级* x-queue-mode对列模式默认lazy将数据放入磁盘消费时放入内存* x-queue-master-locator镜像队列*/Beanpublic Queue orderQueue(){MapString, Object args new HashMap(2);// 绑定我们的死信交换机args.put(x-dead-letter-exchange, orderDeadExChange);// 绑定我们的路由keyargs.put(x-dead-letter-routing-key, orderDeadRoutingKey);return new Queue(orderQueue, true, false, false, args);}Beanpublic Queue orderDeadQueue(){return new Queue(orderDeadQueue);}Beanpublic DirectExchange orderExchange(){return new DirectExchange(orderExchange);}Beanpublic DirectExchange orderDeadExchange(){return new DirectExchange(orderDeadExChange);}//绑定正常队列到交换机Beanpublic Binding orderBindingExchange(Queue orderQueue, DirectExchange orderExchange) {return BindingBuilder.bind(orderQueue).to(orderExchange).with(orderRoutingKey);}//绑定死信队列到死信交换机Beanpublic Binding deadBindingExchange(Queue orderDeadQueue, DirectExchange orderDeadExchange) {return BindingBuilder.bind(orderDeadQueue).to(orderDeadExchange).with(orderDeadRoutingKey);}
}消费者 import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/*** 死信队列的消费者*/
Slf4j
Component
public class DeadLetterReceiver {RabbitListener(queues orderDeadQueue)public void orderDeadQueueReceiver(String dataMsg, Channel channel, Message message) {try{long deliveryTag message.getMessageProperties().getDeliveryTag();log.info(死信队列接收者A收到消息根据订单id查询订单是否支付未支付解锁库存 deliveryTag:{} dataMsg:{} ,deliveryTag ,dataMsg);channel.basicAck(deliveryTag,false);} catch (Exception e){log.info(如果报错了执行补偿机制);}}
}生产者 GetMapping(/createOrder)
public String createOrder() {rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, 我是订单json, message - {//设置过期时间10smessage.getMessageProperties().setExpiration(10000);return message;});return 发送完成;
}总结
MQ的应用场景
异步处理注册发邮件发短消息应用解耦用户下单后,订单系统需要通知库存系统扣减库存就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失.流量削峰秒杀活动一般会因为流量过大导致应用挂掉设置消息队列参数如果长度超过最大值,则直接抛弃用户请求或跳转到错误页面