seo网站关键词优化方式,做ps可以在哪些网站上找素材,南县网站设计,企业网站结构图专栏导航 RabbitMQ入门指南 从零开始了解大数据 目录
专栏导航
前言
一、消费者确认机制
二、失败重试机制
三、失败处理策略
四、业务幂等性
1.通过唯一标识符保证操作的幂等性
2.通过业务判断保证操作的幂等性
总结 前言
RabbitMQ是一个高效、可靠的开源消息队列系… 专栏导航 RabbitMQ入门指南 从零开始了解大数据 目录
专栏导航
前言
一、消费者确认机制
二、失败重试机制
三、失败处理策略
四、业务幂等性
1.通过唯一标识符保证操作的幂等性
2.通过业务判断保证操作的幂等性
总结 前言
RabbitMQ是一个高效、可靠的开源消息队列系统广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。 当RabbitMQ向消费者投递消息后了解消费者的处理状态是非常重要的。因为消息的投递并不代表消费者一定能够正确地消费这些消息可能会出现各种故障
网络故障在消息投递过程中如果RabbitMQ和消费者之间的网络连接出现故障可能会导致消息无法正确投递给消费者。消费者宕机如果消费者在接收消息后突然宕机那么消息可能无法被正确处理。消费者处理异常消费者在接收到消息后由于处理不当或者出现异常情况可能会导致消息处理失败。
以上情况都可能导致消息丢失因此RabbitMQ需要知道消费者的处理状态以便在消息处理失败时重新投递消息。 一、消费者确认机制
RabbitMQ的消费者确认机制Consumer Acknowledgement是一种确保消息被成功处理的机制。当消费者处理消息结束后需要向RabbitMQ发送一个回执以告知消息的处理状态。这个机制对于确保消息的可靠传递非常重要因为它可以防止消息在消费者端处理失败而没有被正确处理的情况。
回执有三种可选值
ACK确认表示消费者成功处理了消息RabbitMQ会从队列中删除该消息。NACK否定确认表示消息处理失败RabbitMQ需要再次投递该消息。REJECT拒绝表示消息处理失败并且被拒绝RabbitMQ会从队列中删除该消息。
在实际应用中一般使用ACK或NACK两种方式。REJECT方式的使用相对较少通常只在消息格式存在问题即存在开发错误的情况下使用。因此大多数情况下需要将消息处理的代码通过try catch机制捕获消息处理成功时返回ACK处理失败时返回NACK。
在consumer服务的application.yml文件中添加配置修改Spring AMQP的ACK处理方式
spring:rabbitmq:listener:simple:acknowledge-mode: auto
RabbitMQ 支持三种不同的确认模式这些模式通过acknowledge-mode属性进行配置
none关闭ACK。消费者接收到消息后不需要发送任何确认给发送者发送者将继续发送下一条消息。在这种模式下如果消费者处理消息失败消息将会丢失无法保证消息的可靠性。manual手动ACK。消费者接收到消息后需要手动发送确认给发送者发送者才会继续发送下一条消息。在这种模式下如果消费者处理消息失败可以手动发送NACK给发送者告诉发送者这条消息处理失败以便发送者重新发送消息。这种模式可以保证消息的可靠性但需要消费者手动处理确认和NACK。auto自动ACK。Spring AMQP提供了一种自动的消息确认机制。它利用AOP面向切面编程对消息处理逻辑做了环绕增强。当业务正常执行时Spring AMQP会自动返回ACK。当业务出现异常时根据异常判断返回不同结果业务异常自动返回NACK消息处理或校验异常自动返回REJECT。
二、失败重试机制
当消费者出现异常后消息会不断requeue重入队到队列再重新发送给消费者。如果消费者再次执行依然出错消息会再次requeue到队列再次投递直到消息处理成功为止。 如果消费者持续出现异常消息会不断地在队列中重新排队并重新发送这可能会导致消息处理延迟和队列持续增长给系统带来不必要的压力。
为了解决这个问题Spring框架提供了消费者失败重试机制在消费者出现异常时利用本地重试而不是无限制的requeue到MQ队列 。
在consumer服务的application.yml文件中添加配置
spring:rabbitmq:listener:simple:retry:enabled: trueinitial-interval: 1000msmultiplier: 1max-attempts: 3stateless: true
enabled: true开启消费者失败重试initial-interval: 1000ms初始的等待时长multiplier: 1每次重试的等待时长是上次等待时长的倍数max-attempts: 3最大重试次数stateless: truetrue表示重试是无状态的即每次重试都是独立的不会考虑之前的重试状态。如果业务中包含事务需要改为false。
通过这样的配置当消费者出现异常时消息会在本地进行重试而不是无限期地重新排队发送。在达到最大重试次数后SpringAMQP会抛出AmqpRejectAndDontRequeueException异常并将消息从队列中删除。这意味着最后一次处理消息的结果是失败的并且消息不会被重新排队发送给消费者。
这种失败重试机制可以有效地减少消息处理的延迟和队列的增长提高系统的稳定性和可用性。当然在使用失败重试机制时也需要考虑到业务逻辑和异常处理的合理性避免因过度重试而导致的问题。
三、失败处理策略
在失败重试机制中当达到最大重试次数后消息会被直接丢弃。尽管这在某些场景中可能是可接受的但对于那些对消息可靠性要求极高的业务来说这显然是一个潜在的风险点。
Spring AMQP为此提供了强大的支持允许开发人员自定义重试次数耗尽后的消息处理策略。这个策略是由MessageRecovery接口来定义的它有三种不同的实现方式
RejectAndDontRequeueRecoverer当重试次数耗尽后直接拒绝消息并丢弃该消息。这是默认的处理方式。ImmediateRequeueMessageRecoverer当重试次数耗尽后返回一个NACK给生产者使消息重新入队以便再次发送。RepublishMessageRecoverer当重试次数耗尽后可以将失败的消息投递到一个指定的交换机和队列中这个交换机和队列专门用来存放异常的消息。
在处理策略中一种比较合适的方式是使用RepublishMessageRecoverer。当消息失败后它会被投递到一个特定的、专门用于存放异常消息的队列中。这个队列可以由人工进行集中处理使得开发人员可以更精细地处理和诊断问题。
在consumer服务中定义处理失败消息的交换机和队列 Beanpublic DirectExchange errorExchange(){return new DirectExchange(error.direct);}Beanpublic Queue errorQueue(){return new Queue(error.queue);}Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){return BindingBuilder.bind(errorQueue).to(errorExchange).with(error);}
定义一个RepublishMessageRecoverer
Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error);
}
完整代码如下
Configuration
ConditionalOnProperty(name spring.rabbitmq.listener.simple.retry.enabled, havingValue true)
public class ErrorConfiguration {Beanpublic DirectExchange errorExchange() {return new DirectExchange(error.direct);}Beanpublic Queue errorQueue() {return new Queue(error.queue);}Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with(error);}Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error);}
}
通过这样的配置当消息在尝试多次重试后仍然失败时它们会被自动投递到定义的异常消息队列中。这样就可以集中处理这些异常消息进行进一步的诊断或处理。这种策略为开发人员在处理复杂分布式系统中的消息问题提供了一种更加专业和灵活的方式。
四、业务幂等性
在计算机科学和软件开发中幂等性是一个重要的概念。简单来说如果一个操作或函数不论执行一次还是多次其结果都是相同的那么称这个操作或函数是幂等的。在业务处理中幂等性尤其关键。它可以保证系统的稳定性确保在某些异常情况下多次执行某个业务操作不会对业务状态产生不一致的结果。幂等性的重要性在于它能够避免因重复执行操作而产生的数据不一致、状态冲突等问题。在涉及金融交易、库存管理、用户认证等关键领域幂等性是确保系统稳定和数据准确的重要前提。
1.通过唯一标识符保证操作的幂等性
为每个操作生成唯一的标识符如ID并在系统中跟踪这些标识符以检测重复操作。当接收到具有已知标识符的操作时可以跳过重复的操作。Spring AMQP的MessageConverter自带了MessageID的功能只要开启这个功能即可。
Jackson的消息转换器示例
Bean
public MessageConverter messageConverter(){// 定义消息转换器Jackson2JsonMessageConverter jjmc new Jackson2JsonMessageConverter();// 配置自动创建消息ID用于识别不同消息也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}
在publisher服务中编写测试类并利用RabbitTemplate实现消息发送 Testvoid testSendMessage2Queue() {String queueName demo.queue;String msg Idempotent Test;rabbitTemplate.convertAndSend(queueName, msg);}
运行测试用例查看结果 2.通过业务判断保证操作的幂等性
业务判断是一种基于业务逻辑和状态的检查以确定是否对重复的请求或消息进行处理。在多种业务场景中这一策略的思路各有不同。
比如在支付订单的案例中业务逻辑主要是支付并将订单状态从“未支付”修改为“已支付”需要防止重复支付。因此在执行这一业务时可以判断订单的状态是否为“未支付”。若状态不是“未支付”则说明该订单已经被处理过无需重复处理。与基于唯一标识符的方案相比业务判断方案无需对原有数据库进行改造因此更为推荐。
以支付修改订单的业务为例 Overridepublic void markOrderPaySuccess(Long orderId) {// 查询订单Order order getById(orderId);// 判断订单状态,订单不存在或者订单状态不是1放弃处理if (order null || order.getStatus() ! 1) {return;}// 尝试更新订单order.setStatus(2);order.setPayTime(LocalDateTime.now());orderService.updateById(order);}
以上代码示例判断和更新是两步动作 极小概率下可能存在线程安全问题所以可以进行以下修改 Overridepublic void markOrderPaySuccess(Long orderId) {// UPDATE order SET status ? , pay_time ? WHERE id ? AND status 1orderService.lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, LocalDateTime.now()).eq(Order::getId, orderId).eq(Order::getStatus, 1).update();} 总结
RabbitMQ是一个开源的消息队列软件旨在提供可靠的消息传递和消息队列功能。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容希望对大家有所帮助。