网站模板套用,山东专业网站解决方案制作,在哪里做推广效果好,哪些网站做外链好一#xff1a; RabbitMQ 高级特性
前面主要讲解了 RabbitMQ 的概念和应用。RabbitMQ 实现了 AMQP 0-9-1 规范#xff0c;并在此基础上进行了多项扩展。在 RabbitMQ 官方网站中详细介绍了其特性#xff0c;我们将其中一些重要且常用的特性挑选出来进行讲解。 1.1 消息确认
…一 RabbitMQ 高级特性
前面主要讲解了 RabbitMQ 的概念和应用。RabbitMQ 实现了 AMQP 0-9-1 规范并在此基础上进行了多项扩展。在 RabbitMQ 官方网站中详细介绍了其特性我们将其中一些重要且常用的特性挑选出来进行讲解。 1.1 消息确认
生产者发送的消息在到达消费者端后可能会出现以下几种情况
情况描述消息处理成功消费端成功处理消息完成相应的业务逻辑。消息处理异常消费端在处理消息时发生异常可能导致消息未被正确消费。 RabbitMQ 在向消费者发送消息后会立即将该消息从队列中删除但如果消费者处理消息时出现异常则可能导致消息丢失。为了解决这一问题RabbitMQ 提供了消息确认机制用于确保消息被消费者成功接收并正确处理。在消费者订阅队列时可以通过设置 autoAck 参数来控制消息确认机制根据该参数的不同消息确认机制分为两种模式。
确认模式描述适用场景自动确认当 autoAck 等于 true 时RabbitMQ 会在消息发送后立即将其置为确认并从内存或磁盘中删除无论消费者是否真正消费成功。适用于对消息可靠性要求不高的场景。手动确认当 autoAck 等于 false 时RabbitMQ 会等待消费者显式调用 Basic.Ack 命令确认后才删除消息。适用于对消息可靠性要求较高的场景确保消息被正确处理后再删除。
// 创建一个 DefaultConsumer 对象用于处理接收到的消息
DefaultConsumer consumer new DefaultConsumer(channel) {// 当有消息送达时会触发该方法Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 输出接收到的消息内容将字节数组转换为字符串System.out.println(接收到消息: new String(body));}
};// 开始监听队列指定队列名称、是否自动确认消息以及消费者对象
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, false, consumer);当 autoAck 参数设置为 false 时对于 RabbitMQ 服务端而言队列中的消息会分为两部分一部分是尚未投递给消费者的消息另一部分是已投递但尚未收到消费者确认的消息。如果 RabbitMQ 长时间未收到消费者的确认信号并且该消费者已断开连接则 RabbitMQ 会将消息重新放回队列等待重新投递给下一个消费者或者可能再次投递给原来的消费者。 从 RabbitMQ 的 Web 管理平台上也可以看到当前队列中 Ready 状态和 Unacked 状态的消息数
状态描述Ready等待投递给消费者的消息数。Unacked已投递给消费者但尚未收到消费者确认信号的消息数。 1.1.1 自动确认
自动确认之前已经提到过这里不再赘述。
1.1.2 手动确认
消费者在收到消息后可以选择确认消息、拒绝消息或跳过消息。RabbitMQ 提供了多种确认应答方式消费者可以通过调用其对应的 channel 方法进行操作主要包括以下三种方式
确认类型描述方法肯定确认RabbitMQ 知道消息已被成功处理可以将其丢弃。Channel.basicAck(long deliveryTag, boolean multiple)否定确认消费者调用方法通知 RabbitMQ 拒绝该消息可选择是否重新入队。Channel.basicReject(long deliveryTag, boolean requeue)批量否定确认如果需要批量拒绝消息可以使用 Basic.Nack 命令通知 RabbitMQ 拒绝多条消息可选择是否重新入队。Channel.basicNack(long deliveryTag, boolean multiple,boolean requeue)
下面是参数解释
参数作用deliveryTag消息的唯一标识用于标识 RabbitMQ 中的每条消息。它是一个由 RabbitMQ 生成的单调递增的 64 位长整型值每个通道Channel独立维护确保唯一性消费者在确认、拒绝或重新入队消息时必须通过对应的 deliveryTag 来标识消息。multiple是否启用批量操作用于减少网络流量。在消息确认或拒绝时如果设置为 true则会批量操作所有小于或等于指定 deliveryTag 的消息。如果设置为 false则仅对当前指定的 deliveryTag 消息进行操作。requeue表示拒绝这条消息后如何处理控制被拒绝的消息是否重新入队用于消息的再投递。true: 消息会重新进入队列等待被其他消费者或同一消费者再次消费。false: 消息会直接从队列中移除不再被重新投递。适用于无法处理的消息或需要丢弃的场景。 我们将基于 SpringBoot 演示消息的确认机制。与直接使用 RabbitMQ Java Client 库相比Spring-AMQP 对消息确认机制的使用方式有所不同但是也提供了三种策略来实现消息确认。
确认模式描述特点AcknowledgeMode.NONE消息一旦投递给消费者不管是否成功处理RabbitMQ 都会自动确认并移除消息。如果消费者处理失败消息可能会丢失。适用于对消息可靠性要求较低的场景。AcknowledgeMode.AUTO默认模式。消费者在成功处理消息时会自动确认但如果处理过程中抛出异常则不会确认消息。提供了一定的可靠性但在异常情况下消息会回到队列。AcknowledgeMode.MANUAL手动确认模式。消费者需要在成功处理消息后显式调用 basicAck 方法确认。如果消息未被确认RabbitMQ 会重新投递消息。提高消息处理的可靠性确保消息不会因处理失败而丢失。适用于高可靠性场景。
如果需要配置的话在配置文件中进行配置即可下面演示一种确认模式的使用
spring:rabbitmq:addresses: amqp://study:study110.41.51.65:15673/bitelistener:simple:acknowledge-mode: none生产者代码 public static final String ACK_EXCHANGE_NAME ack_exchange;public static final String ACK_QUEUE ack_queue;Configuration
public class RabbitMQConfig {// 声明交换机Bean(ackExchange)public Exchange ackExchange() {return ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();}// 声明队列Bean(ackQueue)public Queue ackQueue() {return QueueBuilder.durable(Constant.ACK_QUEUE).build();}// 绑定队列和交换机Bean(ackBinding)public Binding ackBinding(Qualifier(ackExchange) Exchange exchange,Qualifier(ackQueue) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(ack).noargs();}
}RestController
RequestMapping(/producer)
public class ProductController {Autowiredprivate RabbitTemplate rabbitTemplate;// 发送消息到指定交换机和路由键RequestMapping(/ack)public String ack() {rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, ack, consumer ack test...);return 发送成功!;}
}消费者代码
Component
public class AckQueueListener {// 监听队列 Constant.ACK_QUEUERabbitListener(queues Constant.ACK_QUEUE)public void listenerQueue(Message message, Channel channel) throws Exception {try {// 获取消息内容和 deliveryTagString messageBody new String(message.getBody(), UTF-8);long deliveryTag message.getMessageProperties().getDeliveryTag();System.out.printf(接收到消息: %s, deliveryTag: %d%n, messageBody, deliveryTag);// 模拟处理失败int num 3 / 0;System.out.println(处理完成);} catch (Exception e) {// 处理异常逻辑日志记录或其他操作System.err.println(消息处理失败: e.getMessage());// 根据业务需求这里可以选择是否重新入队或丢弃消息channel.basicNack(deliveryTag, false, true);}}
}1.2 持久性
前面我们讨论了如何在消费端处理消息时确保消息不丢失但如果 RabbitMQ 服务停止或崩溃后如何确保生产者发送的消息不丢失呢默认情况下RabbitMQ 在退出或崩溃时会丢失队列和消息除非明确配置其持久化机制。RabbitMQ 的持久化包括三个部分交换机的持久化、队列的持久化和消息的持久化。
1.2.1 交换机持久化
交换器的持久化通过在声明交换机时将 durable 参数设置为 true 来实现这会将交换机的属性保存到服务器中。当 RabbitMQ 服务器发生意外或关闭后重启时交换机会自动恢复无需重新创建相当于一直存在。如果未设置持久化则在 RabbitMQ 重启后交换机的元数据会丢失。对于长期使用的交换机建议将其设置为持久化以确保其可靠性和持久性。
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()1.2.2 队列的持久化
队列的持久化通过在声明队列时将 durable 参数设置为 true 来实现。如果队列未设置持久化在 RabbitMQ 服务重启后该队列会被删除同时其中的消息也会丢失队列消失消息无处存储。队列的持久化可以保证队列本身的元数据在异常情况下不会丢失但不能保证队列中的消息不丢失。要确保消息的可靠性还需将消息设置为持久化。我们之前创建的队列都是持久化队列。
QueueBuilder.durable(Constant.ACK_QUEUE).build(); // 持久化队列
QueueBuilder.nonDurable(Constant.ACK_QUEUE).build(); // 非持久化队列1.2.3 消息的持久化
要实现消息的持久化需要 MessageProperties 中的 deliveryMode 设置为 2即 MessageDeliveryMode.PERSISTENT。这样可以确保消息在服务器重启或出现异常时不会丢失。
public enum MessageDeliveryMode {NON_PERSISTENT, // 非持久化PERSISTENT // 持久化
}设置队列和消息的持久化后RabbitMQ 服务重启后消息依旧存在。如果仅设置队列持久化重启后消息会丢失如果仅设置消息持久化重启后队列消失消息也无法存储。因此单独设置消息持久化而不设置队列持久化是没有意义的二者需同时设置才能确保消息可靠性。
public static final BasicProperties PERSISTENT_TEXT_PLAIN new BasicProperties(text/plain, // 内容类型 (ContentType)null, // 内容编码 (ContentEncoding)null, // 头信息 (Headers)2, // deliveryMode: 持久化 (2 表示消息持久化)0, // 优先级 (Priority)null, // CorrelationIdnull, // ReplyTonull, // Expirationnull, // MessageIdnull, // Timestampnull, // Typenull, // UserIdnull, // AppIdnull // ClusterId
);// 发送非持久化消息
channel.basicPublish(, QUEUE_NAME, null, msg.getBytes()
);// 发送持久化消息
channel.basicPublish(, QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, // 持久化msg.getBytes()
);RabbitMQ 默认会将消息视为持久化消息除非队列被声明为非持久化或消息在发送时被标记为非持久化。但是需要注意将所有消息都设置为持久化会显著影响 RabbitMQ 的性能因为将消息写入磁盘的速度远远慢于写入内存的速度。对于可靠性要求不高的消息可以选择不进行持久化处理以提升系统的整体吞吐量。我们在选择是否持久化消息时需要在消息的可靠性和系统吞吐量之间进行权衡。如果使用 RabbitTemplate 发送持久化消息代码示例如下
// 要发送的消息内容
String message This is a persistent message;// 创建一个 Message 对象并设置为持久化
MessageProperties messageProperties new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message messageObject new Message(message.getBytes(), messageProperties);// 使用 RabbitTemplate 发送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, ack, messageObject
);将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗答案是否定的。
问题描述解决方法消费者未处理完成时宕机如果消费者在订阅队列时将 autoAck 参数设置为 true那么消息在被消费者接收后尚未处理完毕就发生宕机消息将丢失。将 autoAck 参数设置为 false并采用手动确认方式确保消息被正确处理后再确认。详细可参考消息确认章节。消息未及时落盘时 RabbitMQ 宕机持久化的消息在正确写入 RabbitMQ 后还需要一段时间才能真正存入磁盘。在此过程中RabbitMQ 并不会为每条消息都立即调用内核的 fsync 方法进行同步存盘而是可能暂时存储在操作系统缓存中。如果此时 RabbitMQ 节点发生宕机或重启未落盘的消息会丢失。1.开启 RabbitMQ 的发布确认模式确保消息被可靠写入磁盘后再确认发送成功。 2. 引入事务机制 3. 引入 RabbitMQ 的仲裁队列 (后⾯再讲),
1.3 发送发确认
持久化的消息在正确写入 RabbitMQ 后还需要一段时间才能真正存入磁盘。在此过程中RabbitMQ 并不会为每条消息都立即调用内核的 fsync 方法进行同步存盘而是可能暂时存储在操作系统缓存中。如果此时 RabbitMQ 节点发生宕机或重启未落盘的消息会丢失为了解决这一问题RabbitMQ 提供了两种机制来确保消息投递的可靠性。
解决方案描述事务机制通过事务机制确保消息可靠投递但性能消耗较大实际工作中使用较少。发送方确认publisher confirm通过 confirm 机制实现消息的可靠投递是实际工作中常用的解决方案。
控制消息可靠性投递方式描述confirm 确认模式用于确认消息是否成功到达 RabbitMQ 服务器。return 退回模式用于在消息无法路由到指定队列时将消息退回给生产者确保消息不会意外丢失。
1.3.1 confirm 确认模式
在 confirm 确认模式下生产者在发送消息时可以设置一个 ConfirmCallback 监听器无论消息是否成功到达交换机该监听器都会被触发。如果消息成功到达交换机则 ACK确认标识为 true如果未到达则 ACK 为 false。
配置 RabbitMQ
spring:rabbitmq:addresses: amqp://study:study110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual # 消息接收确认publisher-confirm-type: correlated # 消息发送确认设置确认回调逻辑并发送消息无论消息确认成功还是失败都会触发 ConfirmCallback 的 confirm 方法。如果消息成功发送到 Broker则 ack 为 true如果消息发送失败则 ack 为 false同时 cause 提供失败的原因。
Configuration
public class RabbitTemplateConfig {// 配置带 ConfirmCallback 的 RabbitTemplateBean(confirmRabbitTemplate)public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);// 设置 ConfirmCallback 回调函数通过 Lambda 表达式实现 ConfirmCallback 接口的 confirm 方法。rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {if (ack) {System.out.printf(消息接收成功, id: %s%n, correlationData ! null ? correlationData.getId() : null);} else {System.out.printf(消息接收失败, id: %s, cause: %s%n,correlationData ! null ? correlationData.getId() : null, cause);}});return rabbitTemplate;}
}RestController
RequestMapping(/producer)
public class ConfirmController {Resource(name confirmRabbitTemplate)private RabbitTemplate confirmRabbitTemplate;// 发送消息并带有确认机制RequestMapping(/confirm)public String confirm() {CorrelationData correlationData new CorrelationData(1);// 发送消息confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, confirm, confirm test..., correlationData);return 消息发送确认完成;}
}区别ConfirmListenerConfirmCallback所属库RabbitMQ Java Client 库Spring AMQP 框架使用场景用于直接与 RabbitMQ 服务器交互专为 Spring 环境设计简化与 RabbitMQ 的交互过程方法提供两个方法handleAck 和 handleNack提供一个方法confirm功能处理消息的确认和否定确认事件处理消息确认的回调集成性需要手动与 RabbitMQ 的 Channel 进行交互与 Spring 框架无缝集成支持依赖注入和配置管理使用场景推荐非 Spring 项目中直接使用 RabbitMQ Java Client 库Spring Boot 或 Spring 应用中推荐使用 ConfirmCallback
1.3.2 return 退回模式
在 RabbitMQ 的 return 退回模式中消息到达 Exchange 后会根据路由规则匹配将消息投递到队列。如果消息无法被任何队列消费可以选择将消息退回给发送者。此时可以通过设置一个返回回调方法对被退回的消息进行处理。
配置 RabbitMQ
spring:rabbitmq:addresses: amqp://study:study110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual # 消息接收确认publisher-confirm-type: correlated # 消息发送确认设置返回回调逻辑并发送消息当消息无法被路由到任何队列时会被退回给发送者此时触发通过 setReturnCallback 设置的回调方法进行处理。
Bean(confirmRabbitTemplate)
public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);// 设置强制退回机制rabbitTemplate.setMandatory(true);// 设置 ReturnsCallback 回调方法rabbitTemplate.setReturnsCallback(returned - System.out.printf(消息被退回: %s%n, returned));return rabbitTemplate;
}RestController
RequestMapping(/producer)
public class MessageController {Resource(name confirmRabbitTemplate)private RabbitTemplate confirmRabbitTemplate;// 测试消息退回RequestMapping(/msgReturn)public String msgReturn() {// 创建 CorrelationData 对象CorrelationData correlationData new CorrelationData(2);// 发送消息confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, confirm11, message return test..., correlationData);return 消息发送成功;}
}1.4 总结一下 RabbitMQ 如何保证消息的可靠传输
先展示一张 RabbitMQ 消息传递的示意图 场景可能原因解决办法生产者将消息发送到 RabbitMQ 失败网络问题等参考本章节发送方确认-confirm确认模式消息在交换机中无法路由到指定队列代码或配置层面错误导致消息路由失败参考本章节发送方确认-return模式消息队列自身数据丢失消息到达 RabbitMQ 后RabbitMQ 服务器宕机导致消息丢失开启 RabbitMQ 持久化机制消息会写入磁盘在服务器恢复后RabbitMQ 会自动读取之前存储的数据。 在极端情况下消息未持久化时服务器宕机可能会导致少量数据丢失可以通过集群方式提升可靠性。消费者异常导致消息丢失消息到达消费者后未及时消费消费者宕机或消费者逻辑问题参考本章节消息确认。启用消费者手动确认机制当消费者确认消息成功后才会删除消息从而避免消息丢失。 除此之外可配置重试机制确保消息消费的可靠性。
1.5 重试机制
在消息传递过程中可能会因网络故障、服务不可用或资源不足等问题导致消息处理失败。为了解决这些问题RabbitMQ 提供了重试机制允许消息在处理失败后重新发送如果对异常进⾏捕获那么就不会进⾏重试。然而如果失败是由程序逻辑错误引起的多次重试也无济于事此时可以通过设置重试次数来避免无限重试的情况。
spring:rabbitmq:addresses: amqp://study:study110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto # 消息接收确认retry:enabled: true # 开启消费者失败重试initial-interval: 5000 # 初始失败等待时长为5秒max-attempts: 5 # 最大重试次数包括首次消费在手动确认模式下重试次数的限制不会像自动确认模式那样直接生效因为是否重试以及何时重试更多取决于应用程序的逻辑和消费者的实现。在自动确认模式下RabbitMQ 会在消息投递给消费者后自动确认消息。如果消费者在处理消息时抛出异常RabbitMQ 会根据配置的重试参数自动将消息重新入队从而实现重试。重试次数和重试间隔等参数可以直接在 RabbitMQ 的配置中设定RabbitMQ 会负责执行这些重试策略。
而在手动确认模式下消费者需要显式地对消息进行确认。如果消费者在处理消息时遇到异常可以选择不确认消息让消息重新入队。此时重试的控制权完全由应用程序掌握而不是依赖 RabbitMQ 的内部机制。应用程序可以通过自身逻辑和结合 RabbitMQ 的高级特性灵活实现自定义的重试策略。
1.6 TTL
TTLTime to Live过期时间是 RabbitMQ 提供的一种机制可以为消息或队列设置存活时间。当消息超过指定的存活时间且未被消费时会被自动清除。这一机制常用于一些特定场景例如在电商平台中下单后超过24小时未付款订单会自动取消或者申请退款后超过7天未处理则系统会自动退款。 目前有两种方式可以设置消息的 TTL一种是为队列设置 TTL此时队列中的所有消息共享相同的过期时间另一种是为每条消息单独设置 TTL使每条消息的过期时间可以不同。如果同时使用这两种方式消息的实际 TTL 以两者中较小的数值为准。
1.6.1 设置消息的TTL
RestController
RequestMapping(/producer)
public class TtlController {Autowiredprivate RabbitTemplate rabbitTemplate;// 发送消息并设置 TTLRequestMapping(/ttl)public String sendTtlMessage() {String ttlTime 10000; // 10秒// 发送消息并设置过期时间rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, , ttl test..., messagePostProcessor - {// messagePostProcessor 是一个消息后处理器MessageProperties 包含了消息的元数据信息例如内容类型、优先级、消息ID以及过期时间等。setExpiration(ttlTime) 是 MessageProperties 类中的一个方法用于设置消息的过期时间。messagePostProcessor.getMessageProperties().setExpiration(ttlTime);return messagePostProcessor;});return 发送成功!;}
}1.6.2设置队列的TTL
设置队列 TTL 可以在创建队列时通过添加参数 x-message-ttl 实现单位为毫秒。
Configuration
public class TtlQueueConfig {// 声明队列 ttlQueue2设置过期时间为 20 秒Bean(ttlQueue2)public Queue ttlQueue2() {return QueueBuilder.durable(Constant.TTL_QUEUE2) // 设置队列持久化.ttl(20 * 1000) // 设置 TTL 为 20 秒.build();}// 或者通过参数设置过期时间为 20 秒Bean(ttlQueue2Alt)public Queue ttlQueue2Alt() {MapString, Object arguments new HashMap();arguments.put(x-message-ttl, 20000); // 20 秒过期return QueueBuilder.durable(Constant.TTL_QUEUE2).withArguments(arguments).build();}// 队列和交换机绑定Bean(ttlBinding2)public Binding ttlBinding2(Qualifier(ttlExchange) FanoutExchange exchange,Qualifier(ttlQueue2) Queue queue) {return BindingBuilder.bind(queue).to(exchange);}
}运行后会发现新增了一个队列该队列的 Features 中显示有一个 TTL 标识表示已成功设置消息的过期时间。 1.6.3 两种方式的区别
设置方式描述原因设置队列 TTL 属性消息过期后会立即从队列中删除。队列中的过期消息一定位于队列头部RabbitMQ 只需定期从队列头开始扫描并删除过期消息效率较高。设置消息 TTL 属性消息过期后不会立即从队列中删除而是在消息投递给消费者之前判定是否过期若过期则删除。每条消息的过期时间不同若需要删除所有过期消息需要扫描整个队列成本较高因此延迟到消费时再判定是否过期。
1.7 死信队列
死信是指因各种原因无法被正常消费的消息。当消息在一个队列中变成死信后会被重新发送到一个专门的交换机称为死信交换机Dead Letter ExchangeDLX。绑定到死信交换机的队列被称为死信队列Dead Letter QueueDLQ。消息通常因以下原因变成死信
原因描述消息被拒绝消费者使用 Basic.Reject 或 Basic.Nack 拒绝消息并且 requeue 参数设置为 false。消息过期消息的 TTL存活时间到期后未被消费。队列达到最大长度队列中的消息数量达到最大限制新消息无法写入导致消息变成死信。
死信交换机和死信队列本质上与普通的交换机和队列没有区别它们只是被专门用来处理无法被正常消费的消息。
//死信队列
public static final String DLX_EXCHANGE_NAME dlx_exchange;
public static final String DLX_QUEUE dlx_queue;Configuration
public class DLXConfig {// 声明死信交换机Bean(dlxExchange)public Exchange dlxExchange() {return ExchangeBuilder.topicExchange(Constant.DLX_EXCHANGE_NAME).durable(true) // 设置为持久化.build();}// 声明死信队列Bean(dlxQueue)public Queue dlxQueue() {return QueueBuilder.durable(Constant.DLX_QUEUE) // 设置为持久化.build();}// 绑定死信队列和死信交换机Bean(dlxBinding)public Binding dlxBinding(Qualifier(dlxExchange) Exchange exchange,Qualifier(dlxQueue) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(dlx) // 设置路由键.noargs();}
}上述代码只是声明了一个队列和交换机此时他们还不是死信的我们需要通过下面的代码指定死信队列和死信交换机
Bean(normalQueue)
public Queue normalQueue() {// 设置队列参数MapString, Object arguments new HashMap();// 这两行代码指定哪些队列和交换机是死信的arguments.put(x-dead-letter-exchange, Constant.DLX_EXCHANGE_NAME); // 绑定死信交换机arguments.put(x-dead-letter-routing-key, dlx); // 设置死信队列的路由键// 创建普通队列并设置参数return QueueBuilder.durable(Constant.NORMAL_QUEUE) // 设置队列持久化.withArguments(arguments) // 添加参数.build();
}死信队列和死信交换机的指定可以简写为
return QueueBuilder.durable(Constant.NORMAL_QUEUE) // 声明普通队列并设置持久化.deadLetterExchange(Constant.DLX_EXCHANGE_NAME) // 设置死信交换机.deadLetterRoutingKey(dlx) // 设置死信队列的路由键.build(); // 构建队列队列 Features 说明
特性描述Ddurable 的缩写表示队列设置为持久化。TTLTime to Live表示队列设置了 TTL过期时间。Lim表示队列设置了长度限制x-max-length。DLX表示队列设置了死信交换机x-dead-letter-exchange。DLK表示队列设置了死信路由键x-dead-letter-routing-key。
对于 RabbitMQ 来说死信队列是一个非常有用的特性。它可以在消息无法被消费者正常消费时将这些消息放入死信队列中进行处理。应用程序可以通过消费死信队列中的消息分析异常原因从而改善和优化系统的稳定性和可靠性。
在支付场景中当用户完成支付后支付系统会将支付结果发送到订单系统。为了避免在消息传递或处理过程中因异常导致支付信息丢失可以使用死信队列机制。当消息处理失败时异常消息会被投递到死信队列。订单系统的其他消费者可以监听死信队列对这些消息进行进一步处理例如生成工单进行人工核实或执行补偿操作确保支付信息的完整性和系统的可靠性死信队列还有别的应用场景
应用场景描述消息重试将死信消息重新发送到原队列或其他队列进行重试处理尝试再次消费消息。消息丢弃直接丢弃无法处理的消息避免这些消息占用系统资源影响正常消息的处理。日志收集将死信消息作为日志收集起来用于后续的分析、问题定位和系统优化。
1.8 延迟队列
延迟队列Delayed Queue是一种消息队列机制消息在发送后不会立即被消费者获取而是经过指定的延迟时间后才可供消费者消费延迟队列有广泛的应用场景例如
应用场景描述智能家居用户希望通过手机远程控制智能设备在指定时间执行操作。通过延迟队列指令会在设定时间到达后推送到设备。日常管理预定会议后可在会议开始前 15 分钟通过延迟队列发送提醒通知给参会人。提高用户活跃度用户注册成功后延迟 7 天发送短信提醒提升用户活跃度或回访率。其他场景根据实际需求可将延迟队列用于需要定时触发的各类业务逻辑中。
RabbitMQ 本身不直接支持延迟队列功能通常可以通过以下两种方式实现
方法优点缺点基于死信实现的延迟队列灵活不需要额外插件支持存在消息顺序问题 需要额外逻辑处理死信队列的消息增加系统复杂性基于插件实现的延迟队列通过插件直接创建延迟队列简化延迟消息的实现 避免了死信队列的时序问题需要依赖特定插件增加运维工作 插件适用范围有限仅支持特定版本的 RabbitMQ
通过结合 TTL 和死信队列的方式可以实现模拟延迟队列的功能。例如在一个应用中需要让每条消息延迟 10 秒后被消费。生产者将消息通过 normal_exchange 发送到 normal_queue 队列并为该队列设置消息的 TTL 为 10 秒。当消息在 normal_queue 中过期后会被转移到绑定了死信交换机的 dlx_queue 队列。消费者订阅 dlx_queue 队列从而在延迟 10 秒后消费到这条消息。 注意事项当发送 20 秒过期的消息后再发送 10 秒过期的消息会发现 10 秒过期的消息也是在 20 秒后才进入死信队列。这是因为 RabbitMQ 只会检查队首消息是否过期如果队首消息未过期即使后续消息已达到过期时间也不会被立即丢弃并转移到死信队列。这会导致第一个消息的延时时间较长时第二个消息的延时时间即使较短也无法优先执行。
因此在使用 TTL 死信队列实现延迟任务队列时需要确保业务中每个任务的延迟时间是一致的。如果业务需要为不同的任务类型设置不同的延迟时间则需要为每种延迟时间分别创建单独的消息队列以避免消息处理顺序受到影响。
1.8.1 延迟队列插件
RabbitMQ 官方提供了一个延迟插件来实现延迟功能详情可参考 RabbitMQ 官方文档。安装插件后可以通过 RabbitMQ 管理平台验证是否成功在新建交换机时查看是否出现延迟消息选项Delayed Message。如果该选项可用说明延迟消息插件已成功运行。 1.8.2 基于插件延迟队列实现延迟队列
声明交换机、队列和绑定关系
Configuration
public class DelayedConfig {// 声明延迟交换机Bean(delayedExchange)public Exchange delayedExchange() {return ExchangeBuilder.directExchange(Constant.DELAYED_EXCHANGE_NAME).durable(true) // 设置为持久化.delayed() // 开启延迟功能.build();}// 声明队列Bean(delayedQueue)public Queue delayedQueue() {return QueueBuilder.durable(Constant.DELAYED_QUEUE) // 设置为持久化.build();}// 绑定队列和延迟交换机Bean(delayedBinding)public Binding delayedBinding(Qualifier(delayedExchange) Exchange exchange,Qualifier(delayedQueue) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(delayed) // 设置路由键.noargs();}
}编写生产者代码
RestController
RequestMapping(/producer)
public class DelayedMessageController {Autowiredprivate RabbitTemplate rabbitTemplate;// 发送带延迟的消息RequestMapping(/delay2)public String sendDelayedMessages() {// 发送 20 秒延迟的消息rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME,delayed,delayed test 20s... new Date(),messagePostProcessor - {messagePostProcessor.getMessageProperties().setDelay(20000); // 设置 20 秒延迟return messagePostProcessor;});// 发送 10 秒延迟的消息rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME,delayed,delayed test 10s... new Date(),messagePostProcessor - {messagePostProcessor.getMessageProperties().setDelay(10000); // 设置 10 秒延迟return messagePostProcessor;});return 发送成功!;}
}编写消费者代码
Component
public class DelayedQueueListener {// 监听延迟队列的消息RabbitListener(queues Constant.DELAYED_QUEUE)public void listenerDLXQueue(Message message, Channel channel) throws Exception {// 打印接收到的消息和当前时间System.out.printf(%tc 死信队列接收到消息: %s%n, new Date(), new String(message.getBody(), UTF-8));}
}1.9 事务
RabbitMQ 基于 AMQP 协议实现该协议支持事务机制因此 RabbitMQ 也具备事务功能。同时Spring AMQP 提供了对事务操作的支持。通过 RabbitMQ 的事务机制开发者可以确保消息的发送和接收具有原子性即要么全部成功要么全部失败。
配置事务管理器
Configuration
public class TransactionConfig {// 配置 RabbitMQ 事务管理器Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}// 配置支持事务的 RabbitTemplateBeanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true); // 启用事务支持return rabbitTemplate;}
}声明队列
Bean(transQueue)
public Queue transQueue() {return QueueBuilder.durable(trans_queue) // 声明持久化队列.build();
}编写生产者代码
RestController
RequestMapping(/trans)
public class TransactionProducer {Autowiredprivate RabbitTemplate rabbitTemplate;// 发送事务消息TransactionalRequestMapping(/send)public String send() {// 发送第一条消息rabbitTemplate.convertAndSend(, trans_queue, trans test 1...);// 模拟异常int a 5 / 0;// 发送第二条消息rabbitTemplate.convertAndSend(, trans_queue, trans test 2...);return 发送成功;}
}如果不添加 Transactional即使代码中发生异常消息1仍然会成功发送当添加了 Transactional 后发生异常时事务将回滚导致消息1和消息2都不会发送成功。
1.10 消息分发
当 RabbitMQ 队列有多个消费者时消息会被分发给不同的消费者每条消息只会发送给一个订阅了该队列的消费者。这种方式非常适合扩展处理能力当负载加重时可以通过增加更多的消费者来分担消息处理任务。然而默认情况下RabbitMQ 以轮询方式分发消息而不考虑消费者是否已处理并确认消息。这可能会导致部分消费者处理速度慢、消息堆积而其他消费者处理速度快却处于空闲状态从而降低系统整体的吞吐量。
为了解决这一问题可以使用 channel.basicQos(int prefetchCount) 方法来限制每个消费者未确认消息的最大数量。例如当消费者调用了 channel.basicQos(5) 后RabbitMQ 会为该消费者计数每发送一条消息计数加1每确认一条消息计数减1。当未确认消息数量达到5时RabbitMQ 将停止向该消费者发送新消息直到其确认了部分消息。这种机制类似于 TCP/IP 中的滑动窗口能够有效均衡消费者的负载提高系统效率消息分发的常见应用场景如下:
应用场景描述限流通过 channel.basicQos 方法设置每个消费者未确认消息的最大数量防止某些消费者处理过慢导致消息堆积。非公平分发默认情况下RabbitMQ 以轮询方式分发消息而不考虑消费者的处理能力这种方式称为非公平分发也叫负载均衡。
1.10.1 限流
在订单系统中订单系统每秒最多可处理 5000 个请求正常情况下能够满足需求但在秒杀高峰期请求量瞬间激增到每秒 1 万个如果这些请求全部通过 MQ 发送到订单系统可能会导致系统崩溃。为了解决这一问题RabbitMQ 提供了限流机制通过设置 prefetchCount 参数控制消费者每次从队列中预取的消息数量从而实现流量控制和负载均衡。同时限流机制要求将消息应答方式设置为手动应答以确保消费者处理完消息后再拉取新的消息。 配置 prefetch 参数, 设置应答方式为⼿动应答
# ack 确认方式开启手动 ack
listener:simple:acknowledge-mode: manual # 手动确认prefetch: 5 # 每次预取 5 条消息配置交换机和队列
Configuration
public class QosConfig {// 声明限流交换机Bean(qosExchange)public Exchange qosExchange() {return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE_NAME) // 直连交换机.durable(true) // 持久化.build();}// 声明队列Bean(qosQueue)public Queue qosQueue() {return QueueBuilder.durable(Constant.QOS_QUEUE) // 持久化队列.build();}// 绑定队列和交换机Bean(qosBinding)public Binding qosBinding(Qualifier(qosExchange) Exchange exchange,Qualifier(qosQueue) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(qos) // 路由键.noargs();}
}发送消息, ⼀次发送 20 条消息
RestController
RequestMapping(/qos)
public class QosController {Autowiredprivate RabbitTemplate rabbitTemplate;// 发送消息RequestMapping(/send)public String sendQosMessages() {for (int i 0; i 20; i) {rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE_NAME, qos, qos test... i);}return 发送成功!;}
}编写消费者代码
Component
public class QosQueueListener {// 监听指定队列的消息RabbitListener(queues Constant.QOS_QUEUE)public void listenerQueue(Message message, Channel channel) throws Exception {// 获取消息的 deliveryTaglong deliveryTag message.getMessageProperties().getDeliveryTag();// 打印接收到的消息和 deliveryTagSystem.out.printf(接收到消息: %s, deliveryTag: %d%n, new String(message.getBody(), UTF-8), deliveryTag);// 手动签收可启用channel.basicAck(deliveryTag, true);}
}1.10.2 负载均衡
我们可以通过设置 prefetch1 的方式实现消息的负载均衡。比如在有两个消费者的情况下如果一个消费者处理任务非常快而另一个处理较慢默认情况下RabbitMQ 只在消息进入队列时分发消息不考虑消费者未确认消息的数量可能会导致一个消费者一直繁忙而另一个消费者空闲。通过设置 prefetch1可以让 RabbitMQ 一次只分配一条消息给消费者直到该消费者处理并确认当前消息后才会发送下一条消息。如果某个消费者繁忙RabbitMQ 会将消息分派给下一个空闲的消费者从而更好地实现负载均衡。 配置 prefetch 参数, 设置应答方式为⼿动应答
# 配置 ack 确认方式开启手动 ack 和预取限制
listener:simple:acknowledge-mode: manual # 手动确认prefetch: 1 # 每次预取 1 条消息Component
public class QosQueueListener {// 指定监听队列的名称消费者1RabbitListener(queues Constant.QOS_QUEUE)public void listenerQosQueue(Message message, Channel channel) throws Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();// 打印接收到的消息System.out.printf(接收到消息: %s, deliveryTag: %d%n,new String(message.getBody(), UTF-8), deliveryTag);// 手动签收channel.basicAck(deliveryTag, true);}// 指定监听队列的名称消费者2RabbitListener(queues Constant.QOS_QUEUE)public void listenerQueue2(Message message, Channel channel) throws Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();// 打印接收到的消息System.out.printf(消费者2接收到消息: %s, deliveryTag: %d%n,new String(message.getBody(), UTF-8), deliveryTag);// 模拟处理流程慢Thread.sleep(100);// 手动签收channel.basicAck(deliveryTag, true);}
}接收到消息: qos test...1, deliveryTag: 1
消费者2接收到消息: qos test...0, deliveryTag: 1
接收到消息: qos test...2, deliveryTag: 2
接收到消息: qos test...3, deliveryTag: 3
接收到消息: qos test...4, deliveryTag: 4
接收到消息: qos test...5, deliveryTag: 5
消费者2接收到消息: qos test...6, deliveryTag: 2
接收到消息: qos test...7, deliveryTag: 6
接收到消息: qos test...8, deliveryTag: 7
接收到消息: qos test...9, deliveryTag: 8
接收到消息: qos test...10, deliveryTag: 9
消费者2接收到消息: qos test...11, deliveryTag: 3
接收到消息: qos test...12, deliveryTag: 10
接收到消息: qos test...13, deliveryTag: 11
接收到消息: qos test...14, deliveryTag: 12
接收到消息: qos test...15, deliveryTag: 13
消费者2接收到消息: qos test...16, deliveryTag: 4
接收到消息: qos test...17, deliveryTag: 14
接收到消息: qos test...18, deliveryTag: 15
接收到消息: qos test...19, deliveryTag: 16deliveryTag 出现重复是因为两个消费者使用的是不同的 Channel每个 Channel 上的 deliveryTag 都是独立计数的因此在不同的 Channel 中可能会出现相同的 deliveryTag 值。