当前位置: 首页 > news >正文

新网个人网站备案福利公众号

新网个人网站备案,福利公众号,免费服务器安全软件,个人备案的网站竞价排名做不了文章目录 文章导图RabbitMQ架构及相关概念四大核心概念名词解读 七大工作模式及四大交换机类型0、前置了解-默认交换机DirectExchange1、简单模式(Simple Queue)-默认DirectExchange2、 工作队列模式(Work Queues)-默认DirectExchange3、发布/订阅模式(Publish/Subscribe)-Fano… 文章目录 文章导图RabbitMQ架构及相关概念四大核心概念名词解读 七大工作模式及四大交换机类型0、前置了解-默认交换机DirectExchange1、简单模式(Simple Queue)-默认DirectExchange2、 工作队列模式(Work Queues)-默认DirectExchange3、发布/订阅模式(Publish/Subscribe)-FanoutExchange4、路由模式(Routing)-自定义DirectExchange5、主题模式(Topics)-TopicExchange总结 三种队列类型普通队列死信队列Dead Letter Queue, DLQ定义触发条件应用场景配置 延迟队列Delayed Queue定义实现方式应用场景 两者区别代码实战1. 延迟队列TTLDLX死信队列配置步骤 2. 延迟队列RabbitMQ延迟消息插件配置步骤 3、死信队列 basic.reject或basic.nack1. 引入依赖2. 配置交换机、队列和死信队列3. 生产者发送消息4. 消费者监听并拒绝消息5. 消费者监听死信队列总结 RabbitMQ系列文章深入RabbitMQ世界探索3种队列、4种交换机、7大工作模式及常见概念TODORabbitMQ可靠性TODORabbtiMQ顺序性TODORabbitMQ常见问题整理 文章导图 RabbitMQ架构及相关概念 四大核心概念 生产者 产生数据发送消息的程序是生产者。 交换机 交换机是 RabbitMQ 非常重要的一个部件一方面它接收来自生产者的消息另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息是将这些消息推送到特定队列还是推 送到多个队列亦或者是把消息丢弃这个得由交换机类型决定 。 队列 队列是RabbitMQ 内部使用的一种数据结构尽管消息流经 RabbitMQ 和应用程序但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 。 消费者 消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者 名词解读 Broker接收和分发消息的应用RabbitMQ Server就是 Message BrokerVirtual host出于多租户和安全因素设计的把 AMQP 的基本组件划分到一个虚拟的分组中类 似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务 时可以划分出多个vhost每个用户在自己的 vhost 创建 exchangequeue 等Connectionpublisherconsumer 和 broker 之间的 TCP 连接 Channel如果每一次访问 RabbitMQ 都建立一个 Connection在消息量大的时候建立 TCP Connection的开销将是巨大的效率也较低。Channel 是在 connection 内部建立的逻辑连接 如果应用程序支持多线程通常每个thread创建单独的channel 进行通讯AMQP method 包含了channel id 帮助客户端和message broker 识别 channel所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销Exchangemessage 到达 broker 的第一站根据分发规则匹配查询表中的 routing key分发 消息到queue中去。常用的类型有direct (point-to-point), topic (publish-subscribe) and fanout (multicast)Queue消息最终被送到这里等待 consumer 取走 Bindingexchange 和 queue 之间的虚拟连接binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中用于 message 的分发依据 七大工作模式及四大交换机类型 网上查了很多资料有的说是五种有的说是四种可以看到在RabbitMQ在官网提到的共有7种工作模式https://www.rabbitmq.com/tutorials 第6种是RPC调用这个我们正常肯定不用这个实现RPC而第7种是confirm 确认模式可以用于保证生产者消息发送的可靠性这个我后面会再专门介绍。 现在我们主要讲前5种工作模式实际上总结来说5种又可以总结为是3种其实第1、2、4根据他们都是Direct交换机可以归结为一种下文我会详细讲解一下。 0、前置了解-默认交换机DirectExchange RabbitMQ有一个自带的交换机也被称为AMQP default exchange。当消息发送到RabbitMQ时如果没有指定交换机就会被发送到默认交换机。默认交换机的类型为direct类型路由键与队列名相同。 如果消息的路由键和某个队列的名称一致那么消息就会被发送到这个队列中。如果消息的路由键和任何一个队列的名称都不一致那么消息会被丢弃。 默认交换机可以通过设置routing_key来指定消息的目的地例如 // 将消息发送到名称为test_queue的队列中空字符串代表默认交换机 channel.basic_publish(exchange, routing_keytest_queue, bodyHello, RabbitMQ!)但是建议应用程序在发送消息时显式地指定交换机以避免不必要的麻烦或错误。默认交换机只是一个简单的机制不应被用于复杂的应用程序。 1、简单模式(Simple Queue)-默认DirectExchange 这个和别的几种模式对比看着没有X这个其实用了默认的交换机我们需要提供一个生产者一个队列以及一个消费者。消息传播图如下 //Config Bean Queue queue1() {return new Queue(simpleQueue); }// 生产者 Autowired private RabbitTemplate rabbitTemplate;public void sendSimpleMessage(String message) {rabbitTemplate.convertAndSend(simpleQueue, message); }// 消费者 RabbitListener(queues simpleQueue) public void receiveSimpleMessage(String message) {System.out.println(Received: message); }这个时候使用的其实是默认的直连交换机DirectExchangeDirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上例如消息队列名为 “simpleQueue”则 routingkey 为 “simpleQueue” 的消息会被该消息队列接收。 具体可以看下源码发送convertAndSend /** Alias for amq.direct default exchange. */ private static final String DEFAULT_EXCHANGE ;private static final String DEFAULT_ROUTING_KEY ;private volatile String exchange DEFAULT_EXCHANGE; private volatile String routingKey DEFAULT_ROUTING_KEY;Override public void convertAndSend(String routingKey, final Object object) throws AmqpException {//可以发现这个this.exchange就是DEFAULT_EXCHANGE convertAndSend(this.exchange, routingKey, object, (CorrelationData) null); }2、 工作队列模式(Work Queues)-默认DirectExchange 这种情况是这样的 一个生产者也是一个默认的交换机DirectExchange一个队列两个消费者。 一个队列对应了多个消费者默认情况下由队列对消息进行平均分配消息会被分到不同的消费者手中。消费者可以配置各自的并发能力进而提高消息的消费能力也可以配置手动 ack来决定是否要消费某一条消息。 和第一种对比主要体现在有多个消费者进行消费主要优势在于可以通过增加消费者来提高处理能力。 //Config Bean Queue queue1() {return new Queue(workQueue); }// Producer Autowired private RabbitTemplate rabbitTemplate;public void sendWorkMessage(String message) {rabbitTemplate.convertAndSend(workQueue, message); }// Consumer RabbitListener(queues workQueue) public void receiveWorkMessage(String message) {System.out.println(Received: message);// Simulate workThread.sleep(1000); }3、发布/订阅模式(Publish/Subscribe)-FanoutExchange FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上在这种策略中routingkey 将不起任何作用FanoutExchange 配置方式如下 在这里首先创建 FanoutExchange参数含义与创建 DirectExchange 参数含义一致然后创建两个 Queue再将这两个 Queue 都绑定到 FanoutExchange 上。 //Config Bean public FanoutExchange fanoutExchange() {return new FanoutExchange(fanoutExchange); }Bean public Queue fanoutQueue1() {return new Queue(fanoutQueue1); }Bean public Queue fanoutQueue2() {return new Queue(fanoutQueue2); }Bean public Binding binding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); }Bean public Binding binding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } 接下来创建两个消费者,两个消费者分别消费两个消息队列中的消息然后在单元测试中发送消息如下 // Producer Autowired private RabbitTemplate rabbitTemplate;public void sendFanoutMessage(String message) {rabbitTemplate.convertAndSend(fanoutExchange, null, message); }// Consumer RabbitListener(queues fanoutQueue1) public void receiveFanoutMessage1(String message) {System.out.println(Queue1 Received: message); }RabbitListener(queues fanoutQueue2) public void receiveFanoutMessage2(String message) {System.out.println(Queue2 Received: message); }注意这里发送消息时不需要 routingkey指定 exchange 即可routingkey 可以直接传一个 null。 4、路由模式(Routing)-自定义DirectExchange DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上例如消息队列名为 “directQueue1”则 routingkey 为 “directQueue1” 的消息会被该消息队列接收。 // Config Bean public DirectExchange directExchange() {return new DirectExchange(directExchange); }Bean public Queue directQueue1() {return new Queue(directQueue1); }Bean public Queue directQueue2() {return new Queue(directQueue2); }Bean public Binding directBinding1(DirectExchange directExchange, Queue directQueue1) {return BindingBuilder.bind(directQueue1).to(directExchange).with(info); }Bean public Binding directBinding2(DirectExchange directExchange, Queue directQueue2) {return BindingBuilder.bind(directQueue2).to(directExchange).with(error); } 可以发现我们可以根据routingKey控制发送到哪个队列上这个本质上和我们前面2种模式都是一样的采用的都是DirectExchange只不过前面2种的交换机DirectExchange是默认值现在我们这里是指定了自己的DirectExchange // Producer Autowired private RabbitTemplate rabbitTemplate;public void sendDirectMessage(String message, String routingKey) {rabbitTemplate.convertAndSend(directExchange, routingKey, message); }// Consumer RabbitListener(queues directQueue1) public void receiveDirectMessage1(String message) {System.out.println(Queue1 Received: message); }RabbitListener(queues directQueue2) public void receiveDirectMessage2(String message) {System.out.println(Queue2 Received: message); }特别注意如果vhost中不存在RouteKey中指定的队列名则该消息会被抛弃。 5、主题模式(Topics)-TopicExchange 在 RabbitMQ 的主题模式Topics中消息通过带有路由键的主题交换机TopicExchange进行路由。消息的路由键是一个点分隔的字符串消费者可以使用绑定键带有通配符来订阅感兴趣的消息。 队列 topicQueue1 使用绑定键 *.orange.*匹配任意第一个和第三个单词以 orange 为第二个单词的消息。队列 topicQueue2 使用绑定键 *.*.rabbit匹配任意前两个单词以 rabbit 为第三个单词的消息。 // Config Bean public TopicExchange topicExchange() {return new TopicExchange(topicExchange); }Bean public Queue topicQueue1() {return new Queue(topicQueue1); }Bean public Queue topicQueue2() {return new Queue(topicQueue2); }Bean public Binding topicBinding1(TopicExchange topicExchange, Queue topicQueue1) {return BindingBuilder.bind(topicQueue1).to(topicExchange).with(*.orange.*); }Bean public Binding topicBinding2(TopicExchange topicExchange, Queue topicQueue2) {return BindingBuilder.bind(topicQueue2).to(topicExchange).with(*.*.rabbit); } topicQueue1 和 topicQueue2 接收匹配其绑定键的消息。灵活路由: 主题模式允许通过复杂的路由键实现灵活的消息路由。使用场景: 适用于需要按模式匹配路由消息的场景比如日志分级、区域性数据分发等。 // Producer Autowired private RabbitTemplate rabbitTemplate;public void sendTopicMessage(String message, String routingKey) {rabbitTemplate.convertAndSend(topicExchange, routingKey, message); }// Consumer RabbitListener(queues topicQueue1) public void receiveTopicMessage1(String message) {System.out.println(Queue1 Received: message); }RabbitListener(queues topicQueue2) public void receiveTopicMessage2(String message) {System.out.println(Queue2 Received: message); }总结 看了上面的5个例子其实本质上我们可以根据Exchange交换机类型归结为3种工作模式Direct、Fanout、Topic Direct定向把消息交给符合指定routing key 的队列 第1、2、4其实都是这种交换机Fanout广播将消息交给所有绑定到交换机的队列 第**第3种模式**Topic通配符把消息交给符合routing pattern路由模式 的队列**第5种模式** 这里提一下交换机还有一种类型Headers头匹配基于MQ的消息头匹配不过这种用的非常少可以忽略 不难发现这三种类型本质上是告诉交换机应该把消息发送给哪些那些队列的三种类别对应着三种判断角度。 direct —— 消息发送时都附带一个字段叫routing_keydirect 模式的交换机就会直接把该字段理解成队列名找到对应的队列并发送fanout —— 相当于广播不作任何选择发送给所有连接的队列topic —— 交换机会把routing_key理解成一个主题恰好队列绑定交换机时也可以以缩略形式指定主题所以找到匹配主题的队列就发送 Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失 三种队列类型 普通队列 我们平常发送的正常都是普通队列比如上面5种工作模式说的都是普通队列就不多说了 死信队列Dead Letter Queue, DLQ 特别注意 队列和消息都有个TTL生存时间队列的TTL到达后队列会自动删除消息不会进入死信队列消息的生存时间到达后会进入死信队列。消息的生存时间可以在队列设置所有消息的TTL也可以对某个消息单独设置TTL。 定义 死信队列是用于处理无法被消费者正确处理的消息的队列。当消息在原始队列中无法被消费时会被转移到死信队列中。 触发条件 消息会变成死信并进入死信队列的几种情况 消息被消费者拒绝通过basic.reject或basic.nack并且requeuefalse。消息在队列中超过了TTLTime To Live时间。队列达到最大长度限制无法再接收新消息。 应用场景 处理无法被消费的消息避免消息堆积影响其他消息的消费。记录和监控消息处理错误方便进行后续处理 配置 通过设置 x-dead-letter-exchange 和 x-dead-letter-routing-key 将消息路由到死信队列。在原始队列中设置死信交换机和死信队列的相关参数 延迟队列Delayed Queue 定义 延迟队列是一种特殊的队列消息在发送到队列后需要等待一段时间后才能被消费。 实现方式 通过死信队列实现延迟任务 把死信队列就当成延迟队列,具体来说是这样 假如一条消息需要延迟 30 分钟执行我们就设置这条消息的有效期为 30 分钟同时为这条消息配置死信交换机和死信 routing_key并且不为这个消息队列设置消费者那么 30 分钟后这条消息由于没有被消费者消费而进入死信队列此时我们有一个消费者就在“蹲点”这个死信队列消息一进入死信队列就立马被消费了。 将消息发送到一个没有消费者的队列设置消息的TTL。消息过期后进入死信队列再由死信队列的消费者处理。 通过RabbitMQ延迟插件 使用RabbitMQ的延迟插件rabbitmq_delayed_message_exchange 插件消息在延迟一段时间后再投递到目标队列中进行消费。 应用场景 订单超时未支付自动取消。用户注册后未登录的提醒。预定会议前的通知 两者区别 使用TTL和死信队列实现延迟插件其实是会有一些问题的 问题一当我们的业务比较复杂的时候 需要针对不同的业务消息类型设置不同的过期时间策略 必然我们也需要为不同的队列消息的过期时间创建很多的Queue的Bean对象 当业务复杂到一定程度时 这种方式维护成本过高问题二就是队列的先进先出原则导致的问题当先进入队列的消息的过期时间比后进入消息中的过期时间长的时候消息是串行被消费的所以必然是等到先进入队列的消息的过期时间结束 后进入队列的消息的过期时间才会被监听然而实际上这个消息早就过期了这就导致了本来过期时间为3秒的消息实际上过了13秒才会被处理这在实际应用场景中肯定是不被允许的 延迟交换机插件可以在一定程度上解决上述两种问题。 特性死信队列延迟队列定义处理无法被消费的消息消息在指定时间后才被消费触发条件消息被拒绝、消息过期、队列满消息设置了TTL或使用延迟插件应用场景处理消费失败的消息避免队列堵塞订单超时取消、提醒通知等延迟处理场景实现方式配置死信交换机和死信队列使用TTL和死信队列或延迟插件消息处理进入死信队列后进行特殊处理延迟一段时间后再投递到目标队列 代码实战 1. 延迟队列TTLDLX死信队列 配置步骤 1、引入依赖 在pom.xml中引入Spring Boot和RabbitMQ的依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId /dependency2、配置交换机和队列 在Spring Boot的配置类中配置一个普通队列、一个死信队列、一个普通交换机和一个死信交换机 import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;Configuration public class RabbitConfig {// 普通交换机Beanpublic DirectExchange normalExchange() {return new DirectExchange(normal_exchange);}// 死信交换机Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(dead_letter_exchange);}// 普通队列并绑定到普通交换机Beanpublic Queue normalQueue() {return QueueBuilder.durable(normal_queue).withArgument(x-dead-letter-exchange, dead_letter_exchange).withArgument(x-dead-letter-routing-key, dead_letter_routing_key).build();}Beanpublic Binding normalBinding(Qualifier(normalQueue) Queue queue, Qualifier(normalExchange) DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(normal_routing_key);}// 死信队列并绑定到死信交换机Beanpublic Queue deadLetterQueue() {return new Queue(dead_letter_queue);}Beanpublic Binding deadLetterBinding(Qualifier(deadLetterQueue) Queue queue, Qualifier(deadLetterExchange) DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(dead_letter_routing_key);} }3、生产者发送消息 在生产者发送消息时可以指定消息的TTLTime-To-Live。TTL到期后消息会被转发到死信队列 创建了一个匿名内部类实现了MessagePostProcessor接口并重写了postProcessMessage()方法。在该方法中设置了消息的延迟时间为传进来的delay延迟时间 java复制代码import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;RestController public class ProducerController {Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(/send)public String send(RequestParam String message, RequestParam int delay) {rabbitTemplate.convertAndSend(normal_exchange, normal_routing_key, message, msg - {//创建了一个匿名内部类实现了MessagePostProcessor接口并重写了postProcessMessage()方法。在该方法中设置了消息的延迟时间为传进来的delay延迟时间msg.getMessageProperties().setExpiration(String.valueOf(delay));return msg;});return Message sent with delay: delay;} }4、消费者监听死信队列 消费者监听死信队列接收到消息后处理 import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;Component public class Consumer {RabbitListener(queues dead_letter_queue)public void receiveMessage(String message) {System.out.println(Received message: message);} }2. 延迟队列RabbitMQ延迟消息插件 RabbitMQ有一个插件 rabbitmq-delayed-message-exchange 可以直接支持延迟消息队列。 配置步骤 1、安装RabbitMQ延迟消息插件 首先确保RabbitMQ服务器上已安装rabbitmq-delayed-message-exchange插件。 rabbitmq-plugins enable rabbitmq_delayed_message_exchange/21、**配置交换机和队列**2、在Spring Boot中配置使用延迟消息交换机 使用CustomExchange类创建一个自定义交换机对象。CustomExchange是Spring AMQP库提供的一个类用于创建自定义的交换机。构造方法的参数依次为交换机的名称、类型、是否持久化、是否自动删除和属性。交换机的名称为DELAYED_EXCHANGE类型为x-delayed-message持久化为true自动删除为false属性为之前创建的HashMap对象。 java复制代码import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;Configuration public class RabbitDelayedConfig {Beanpublic CustomExchange delayedExchange() {return new CustomExchange(delayed_exchange, x-delayed-message, true, false, Map.of(x-delayed-type, direct));}Beanpublic Queue delayedQueue() {return new Queue(delayed_queue);}Beanpublic Binding delayedBinding(Qualifier(delayedQueue) Queue queue, Qualifier(delayedExchange) CustomExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(delayed_routing_key).noargs();} }3、生产者发送消息 生产者在发送消息时可以设置延迟时间 java复制代码import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;import java.util.HashMap; import java.util.Map;RestController public class DelayedProducerController {Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(/sendDelayed)public String sendDelayed(RequestParam String message, RequestParam int delay) {MapString, Object headers new HashMap();headers.put(x-delay, delay);rabbitTemplate.convertAndSend(delayed_exchange, delayed_routing_key, message, msg - {msg.getMessageProperties().getHeaders().putAll(headers);return msg;});return Delayed message sent with delay: delay;} }4、消费者监听延迟队列 与TTLDLX方法相同消费者直接监听队列接收消息 java复制代码import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;Component public class DelayedConsumer {RabbitListener(queues delayed_queue)public void receiveDelayedMessage(String message) {System.out.println(Received delayed message: message);} }3、死信队列 basic.reject或basic.nack 死信队列有3种情况 这里就举常见的手动ack的情况拒绝消息实现死信队列 要在Spring Boot中使用RabbitMQ实现死信队列Dead Letter QueueDLQ并处理消息被消费者拒绝的情况通过basic.reject或basic.nack并且requeuefalse可以按照以下步骤来实现。 1. 引入依赖 首先在pom.xml中引入Spring Boot和RabbitMQ的依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId /dependency2. 配置交换机、队列和死信队列 接下来在Spring Boot的配置类中配置一个普通队列、一个死信队列、一个普通交换机和一个死信交换机。 import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;Configuration public class RabbitConfig {// 普通交换机Beanpublic DirectExchange normalExchange() {return new DirectExchange(normal_exchange);}// 死信交换机Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(dead_letter_exchange);}// 普通队列并绑定到普通交换机Beanpublic Queue normalQueue() {return QueueBuilder.durable(normal_queue).withArgument(x-dead-letter-exchange, dead_letter_exchange) // 设置死信交换机.withArgument(x-dead-letter-routing-key, dead_letter_routing_key) // 设置死信RoutingKey.build();}Beanpublic Binding normalBinding(Qualifier(normalQueue) Queue queue, Qualifier(normalExchange) DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(normal_routing_key);}// 死信队列并绑定到死信交换机Beanpublic Queue deadLetterQueue() {return new Queue(dead_letter_queue);}Beanpublic Binding deadLetterBinding(Qualifier(deadLetterQueue) Queue queue, Qualifier(deadLetterExchange) DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(dead_letter_routing_key);} }3. 生产者发送消息 在生产者中发送消息到普通队列 import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;RestController public class ProducerController {Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(/send)public String send(RequestParam String message) {rabbitTemplate.convertAndSend(normal_exchange, normal_routing_key, message);return Message sent: message;} }4. 消费者监听并拒绝消息 注意这里的前提是要开启手动ack spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:acknowledge-mode: manual # 手动ack消费者监听普通队列并有条件地拒绝消息将消息转发到死信队列 当发送的消息内容为reject时该消息会被拒绝并转发到死信队列。当发送其他内容的消息时消息会被正常消费。 import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message;Component public class Consumer {RabbitListener(queues normal_queue)public void receiveMessage(Message message, Channel channel, Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {try {String msg new String(message.getBody());System.out.println(Received message: msg);// 根据某些条件判断是否拒绝消息if (reject.equals(msg)) {// 拒绝消息并且不重新入队requeuefalsechannel.basicReject(tag, false);System.out.println(Message rejected: msg);} else {// 消费成功确认消息channel.basicAck(tag, false);}} catch (Exception e) {// 异常情况也可以使用basicNack将消息拒绝并且不重新入队channel.basicNack(tag, false, false);}} }5. 消费者监听死信队列 最后消费者监听死信队列处理被拒绝的消息 import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;Component public class DeadLetterConsumer {RabbitListener(queues dead_letter_queue)public void receiveDeadLetterMessage(String message) {System.out.println(Received dead letter message: message);} }总结 配置普通队列和死信队列并通过设置x-dead-letter-exchange和x-dead-letter-routing-key来实现消息被拒绝后的处理。消费者可以根据业务逻辑通过basic.reject或basic.nack拒绝消息并指定不重新入队requeuefalse从而将消息转发到死信队列。死信队列中的消息可以被另一个消费者监听和处理。
http://www.hkea.cn/news/14368120/

相关文章:

  • 定制柜设计网站视频拍摄报价单
  • 怎么做网站内部链接上海优化外包公司
  • 如何选择盐城网站开发建筑设计方案怎么做
  • 如皋网站定制域名网站教程
  • 有域名了也备案了怎么做网站成都设计院待遇
  • 无锡公司做网站百度seo发帖推广
  • 惠州网站建设找哪个公司wordpress仿家居商城
  • 高等院校网站建设方案全立体网站建设
  • 潍坊网站排名优化建设一个怎样的自己的网站首页
  • 管理系统是网站吗坪山医院网站建设
  • 淮北论坛人才招聘网柳州网站优化
  • 大气时尚的网站网站流程图设计
  • 上海网站建设-中国互联wordpress 怎么安装
  • 邢台建网站哪里有免费推广引流平台推荐
  • wordpress怎么当站长手机网页被禁止访问了怎么办
  • 建设网站的费用微信小程序的制作流程
  • 珠宝企业的门户网站开发app设计思路
  • 如何制作可以下单的网站艺术字体在线生成器华康海报
  • 网站手机端做排名北京网站建设 爱牛
  • 做网站套模板爱做网址
  • 怎么做网站服务器吗广州有什么好玩的好吃的
  • 推广网站可以做跳转吗健康饮食网站设计论文
  • 互联网有多少网站建立自我追求无我什么意思
  • 揭阳网站制作方案太原seo外包平台
  • iis7 网站权限设置seo排名工具
  • wordpress 支持 插件下载衡水做网站优化
  • 手机网站怎么dw做中国最大型网站
  • 网站开发案例详解下载wordpress七牛云
  • 一一影视网站源码做营销网站 如何让商家入驻
  • 用自己的电脑做网站wordpress侧边栏加图片