做国际贸易都用什么网站,手机上如何申请营业执照,网页设计推荐网站,高校网络架构消息队列中的事务#xff0c;主要是解决消息生产者和消息消费者数据一致性的问题。
应用场景
比如订单系统创建订单后#xff0c;会发消息给购物车系统#xff0c;将已下单的商品从购物车中删除。
由于购物车删除商品这一步骤并不是用户下单支付这个主流程中的核心步骤主要是解决消息生产者和消息消费者数据一致性的问题。
应用场景
比如订单系统创建订单后会发消息给购物车系统将已下单的商品从购物车中删除。
由于购物车删除商品这一步骤并不是用户下单支付这个主流程中的核心步骤所以使用消息队列来异步清理购物车是更合理的设计。 对于订单系统来说它做了两件事情
在订单库中插入了一条订单数据创建了订单给 MQ 发送了一条订单消息。
对于购物车系统来说它做了一件事情
接收订单消息删除购物车库中的商品清理购物车。
在分布式系统中上面的这几个步骤都有可能失败如果失败了不做处理的话就会造成订单数据和购物车数据不一致的情况。
比如
创建了订单没有清理购物车购物车中的商品清掉了订单没有创建成功。
所以我们需要做的就是要保证在任何步骤失败的情况下订单数据和购物车数据的一致性。
对于购物车系统失败的处理比较简单只有成功删除商品后再提交消费确认如果发生失败因为没有提交消费确认消息队列会重试。
所以问题的重点在于怎么保证订单系统创建订单和发送消息的步骤要么都成功要么都失败不能一个成功一个失败。
分布式事务
消息队列是如何实现分布式事务的就要用到事务消息了。
事务消息需要消息队列提供相应的功能才能实现Kafka 和 RocketMQ 都提供了事务相关功能。
半消息和普通消息的唯一区别是在事务提交之前对于消费者来说这个消息是不可见的。
在上面的步骤中如果第 4 步提交事务消息失败了比如网络异常怎么办
对于这个问题Kafka 和 RocketMQ 给出了 2 种不同的解决方案。
Kafka 简单粗暴直接抛出异常让用户自行处理。可以在业务代码中反复重试提交直到提交成功或者删除之前创建的订单进行补偿RocketMQ事务反查机制。
RocketMQ方案
在 RocketMQ 的分布式事务实现中增加了事务反查机制来解决事务消息提交失败的问题。
如果订单系统在第 4 步提交或回滚事务消息失败如网络异常Broker 迟迟没有收到提交或回滚的消息Broker 会定期去订单系统上反查这个事务对应的本地事务的状态然后根据反查结果决定提交或者回滚这个事务。
所以订单系统需要提供一个反查本地事务状态的接口即根据消息中的订单ID在订单库中查询这个订单是否存在即可如果订单存在则返回成功否则返回失败。RocketMQ 会自动根据事务反查的结果提交或者回滚事务消息。 使用限制
消息类型一致性
事务消息仅支持在 MessageType 为 Transaction 的主题内使用即事务消息只能发送至类型为事务消息的主题中发送的消息的类型必须和主题的类型一致。
消费事务性
RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理建议消费端做好消费重试如果有短暂失败可以利用重试机制保证最终处理成功。
中间状态可见性
RocketMQ 事务消息为最终一致性即在消息提交到下游消费端处理完成之前下游分支和上游事务之间的状态会不一致。因此事务消息仅适合接受异步执行的事务场景。
事务超时机制
RocketMQ 事务消息的生命周期存在超时机制即半事务消息被生产者发送服务端后如果在指定时间内服务端无法确认提交或者回滚状态则消息默认会被回滚。
使用建议
避免大量未决事务导致超时
RocketMQ支持在事务提交阶段异常的情况下发起事务回查保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损容易导致事务处理延迟。
正确处理进行中的事务
消息回查时对于正在进行中的事务不要返回Rollback或Commit结果应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为事务执行较慢消息回查太快。解决方案如下
将第一次事务回查时间设置较大一些但可能导致依赖回查的事务提交延迟较大。程序能正确识别正在进行中的事务。
使用示例
创建事务主题
sh bin/mqadmin updatetopic -n localhost:9876 -t TransactionTopic -c DefaultCluster -a message.typeTRANSACTION生产者代码
模拟正常流程本地事务成功提交
public class ProducerTransactionExample {public static void main(String[] args) throws Exception {String endpoint 182.92.198.60:8080;String topic TransactionTopic;ClientServiceProvider provider ClientServiceProvider.loadService();ClientConfigurationBuilder builder ClientConfiguration.newBuilder().setEndpoints(endpoint);builder.setRequestTimeout(Duration.ofSeconds(20));ClientConfiguration configuration builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。Producer producer provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).setTransactionChecker(messageView - {System.out.println(5.broker回查事务状态);String orderId messageView.getProperties().get(orderId);if (Strings.isNullOrEmpty(orderId)) {return TransactionResolution.ROLLBACK;}if (checkOrderById(orderId)) {System.out.println(7.本地事务状态成功提交消息);return TransactionResolution.COMMIT;} else {System.out.println(7.本地事务状态失败回滚消息);return TransactionResolution.ROLLBACK;}}).build();//开启事务分支。final Transaction transaction;try {transaction producer.beginTransaction();System.out.println(1.开启事务);} catch (ClientException e) {e.printStackTrace();//事务分支开启失败直接退出。System.out.println(1.事务开启失败);return;}// 普通消息发送。Message message provider.newMessageBuilder().setTopic(topic)// 设置消息Tag用于消费端根据指定Tag过滤消息。.setTag(transaction).addProperty(orderId, o10086)// 消息体。.setBody((测试事务消息订单号o10086).getBytes()).build();//发送半事务消息final SendReceipt sendReceipt;try {sendReceipt producer.send(message, transaction);System.out.println(2.半消息发送成功messageId: sendReceipt.getMessageId());} catch (ClientException e) {//半事务消息发送失败事务可以直接退出并回滚。System.out.println(2.半消息发送失败);return;}boolean localTransactionOk doLocalTransaction();if (localTransactionOk) {try {transaction.commit();System.out.println(4.commit事务消息);} catch (ClientException e) {// 业务可以自身对实时性的要求选择是否重试如果放弃重试可以依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();System.out.println(4.commit事务消息失败);}} else {try {transaction.rollback();System.out.println(4.rollback事务消息);} catch (ClientException e) {// 建议记录异常信息回滚异常时可以无需重试依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();System.out.println(4.rollback事务消息失败);}}}/*** 模拟本地事务的执行结果** return*/private static boolean doLocalTransaction() {System.out.println(3.执行本地事务,处理中);try {TimeUnit.SECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(3.执行本地事务成功提交事务);return true;}/*** 模拟本地事务反查** param orderId* return*/private static boolean checkOrderById(String orderId) {System.out.println(6.反查本地事务状态订单号 orderId 能查到);return true;}
}消费端在第4步后可以消费到消息。 模拟异常流程将第4步提交/回滚的代码注释掉 消费端在第7步后可以消费到消息。 设置第一次事务回查时间 CHECK_IMMUNITY_TIME_IN_SECONDS 属性定义了从事务消息发送到 Broker 后Broker 在多长时间内不会对这条消息发起回查。这个时间窗口为生产者提供了一个缓冲期以确保即使在网络延迟或短暂的服务中断情况下事务消息也不会被过早地回查。
Message message provider.newMessageBuilder().setTopic(topic)// 设置消息Tag用于消费端根据指定Tag过滤消息。.setTag(transaction).addProperty(orderId, o10086).addProperty(CHECK_IMMUNITY_TIME_IN_SECONDS, 300)// 消息体。.setBody((测试事务消息订单号o10086).getBytes()).build();消费端消费