大连 响应式网站制作,宁波建设网 公积金网点,网站建设费用计入无形资产,网站建设公司的服务特点前言
在上一篇文章Spring Boot自动装配原理以及实践我们完成了服务通用日志监控组件的开发#xff0c;确保每个服务都可以基于一个注解实现业务功能的监控。 而本文我们尝试基于RocketMQ实现下单的分布式的事务。可能会有读者会有疑问#xff0c;之前我们不是基于Seata完成了…前言
在上一篇文章Spring Boot自动装配原理以及实践我们完成了服务通用日志监控组件的开发确保每个服务都可以基于一个注解实现业务功能的监控。 而本文我们尝试基于RocketMQ实现下单的分布式的事务。可能会有读者会有疑问之前我们不是基于Seata完成了分布式事务为什么我们还要用到RocketMQ呢
我们的再来回顾一下我们下单功能大抵是做以下三件事情:
创建订单将订单记录存到数据库中。扣款记录用户扣款后钱包所剩下的额度。扣除商品库存并发放商品。
我们将该场景放到高并发场景下这个功能势必要考虑性能和可靠性问题所以我们在业务需求清楚明了的情况下就希望能有一种方式确保下单功能在高并发场景保证性能、可靠性。 而Seata的AT模式确实可以保证最终一致性但由于需要用到undo_log和lock_table等涉及数据持久化以及锁相关的操作可能存在一定的性能问题。而且Seata一旦报错会直接回滚事务不存在任何重试机制对于我们这种付款下单的场景是非常不可取的。 而RocketMQ实现分布式的方式是基于消息通信的既确保了业务功能解耦保证了并发场景的性能而且RocketMQ还对消息消费可靠性做了许多不错的优化例如:失败重试、死信队列等所以我们还是尝试使用RocketMQ来改良我们的下单分布式事务问题。
需求介绍以及实现思路
用户下单大抵需要在三个服务中完成:订单创建、钱包扣款、库存扣减等业务逻辑。这其中会跨域三个服务分别是订单服务创建订单、账户服务扣款、商品服务扣减库存。 以我们业务为最终目标RocketMQ实现分布式事务的原理是基于2PC的流程大抵如下:
订单服务发送一个事务消息到消息队列消息内容就是我们的订单信息这里面包含用户账号、购买的产品代码、购买产品数量等数据。MQ收到half消息并回复确认。生产者(订单服务order-service)得知我们发送的消息已被收到订单服务则执行本地事务并提交事务即将订单数据插入数据库中。生产者(订单服务order-service)完成本地事务的提交告知MQ将事务消息commit此时消费者就可以消费这条消息了注意若生产者消费失败则将消息rollback一切就当没有发生过。如果上述的消息是commit则将消息持久化到commitLog中以便后续MQ宕机或者服务宕机后依然可以继续消费这条没有被消费的消息。(非必要步骤)若MQ长时间没有收到生产者的commit或者rollback的信号则会主动找生产者索要当前消息状态。消费者即我们的用户服务或者库存服务收到消息则执行本地事务并提交若失败则会不断重试直到达到上限则将消息存到死信队列中。 常见问题
什么是half消息
half消息即半消息它和普通消息一样都是存储在MQ中唯一区别就是这个消息不会立马被消费者消费到。只有生产者本地事务成功并发送commit通知后这个消息才会被提交到topic队列中后消费者拿到这个消息并进行消费。
如何发送half消息
基于MQ事务消息的实现接口完成实现(具体后文会演示)。
为什么要先发送half消息再执行本地事务先执行本地事务成功后在发送不行吗
先发送half消息的原因是为了尽可能确保生产者和消息队列通信正常只有通信正常了才能确保生产者本地事务提交后发送的commit通知可以消息队列收到通知从而将消息提交到topic队列中让消费者消费由此保证分布式事务的可靠性。
如果mq收到half消息准备发送success的消息给生产者但因为网络波动导致生产者没有收到这个消息要怎么办
这也就意味着生产者没有收到确认的通知随后消息队列就会因为长时间没有收到生产者commit或者rollback的通知而去回调生产者的接口询问事务提交结果。
MQ没有收到生产者(订单服务)的commit或者rollback信号我们如何回查怎么提供回查的依据
常规的做法就是建立一张表记录日志只要我们订单信息插入成功就需要日志一下这条数据所以我们必须保证订单数据插入和日志插入表中的原子性这一点我们基于spring的事务注解即可实现。
如果生产者执行本地事务失败了怎么办
首先将本地事务回滚再向消息队列提交一个rollback的请求对应的half消息就会回滚而不会被消费者消费保证最终一致性。
前面说的都是事务流程这和事务消息如何保证数据最终一致性有什么关系
生产者和消息队列事务流程可以确保生产者和消息队列写操作的一致性确保写操作都是成功或者失败。只有保证两者正常通信才能确保消费者可以消费MQ中的消息从而完成数据最终一致性。
消费者提交本地事务失败了怎么办
我们都知道消息队列只能保证消息可靠性而无法保证分布式事务的强一致性出现这种情况消息队列会进行N次重试如果还是失败则可以到死信队列中查看失败消息然后通过补偿机制实现分布式事务最终一致性。
实践-基于RocketMQ实现分布式事务
部署RocketMQ
在编写业务代码之前我们必须完成一下RocketMQ的部署首先我们自然要下载一下RocketMQ下载地址如下笔者下载的是rocketmq-all-4.8.0-bin-release这个版本
https://rocketmq.apache.org/download/
完成完成后我们将其解压到自定义的路径并配置一个名为ROCKETMQ_HOME的环境变量以笔者为例因为mq存放在D:\myinstall\rocketmq所以我们将这个路径配置到环境变量中。 完成环境变量配置后我们到达mq的bin目录先键入这条命令启动nameserver
start mqnamesrv.cmd如果弹窗输出下面这条结果则说明mq的NameServer启动成功。
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeTypeJSON然后我们再键入下面这条命令启动broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnabletrue若弹窗输出下面所示的文字则说明broker启动成功自此mq就在windows环境部署成功了。我们就可以开始编码工作了。
The broker[DESKTOP-BI4ATFQ, 192.168.237.1:10911] boot success. serializeTypeJSON and name server is 127.0.0.1:9876服务引入MQ完成下单功能开发
服务引入RocketMQ依赖
完成RocketMQ部署之后我们就可以着手编码工作了首先我们要在在三个服务中引入RocketMQ的依赖由于笔者的spring-boot版本比较老所以这里笔者为了统一管理在父pom中指定了mq较新的版本号: !--rocketmq--!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.1.1/version/dependency然后我们分别对order、account、product三个服务中引入依赖 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactId/dependency注册中心配置RocketMQ信息
由于我们的分布式事务涉及3个服务而且mq的消费模式采用的是发布订阅模式所以我们的生产者(order-service)和消费者(account-serivce)都配置为cloud-group
rocketmq:name-server: 127.0.0.1:9876producer:group: cloud-group之所以没有没将消费者2(product-service)也配置到cloud-group中的原因也很简单同一个消息只能被同一个消费者组中的一个成员消费假如我们的将product-service配置到同一个消费者组中就会出现一条消息只能被一个Java服务消费。 对此我们实现思路有两种:
将服务都放到同一个消费者组消费模式改为广播模式。将product-service设置到别的消费者组中。
考虑后续扩展笔者选择方案2设置到别的组中。
rocketmq:name-server: 127.0.0.1:9876producer:group: cloud-group2创建消息日志表
我们在上文进行需求梳理时有提到一个MQServer没收到生产者本地事务执行状态的情况所以我们在生产者在执行本地事务时需要创建一张表记录生产者本地事务执行状态建表SQL如下:
DROP TABLE IF EXISTS rocketmq_transaction_log;
CREATE TABLE rocketmq_transaction_log (id int(11) NOT NULL AUTO_INCREMENT,transaction_id varchar(50) DEFAULT NULL,log varchar(500) DEFAULT NULL,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8;完成order服务half消息发送、监听、回查回调逻辑
我们的订单服务需要做以下三件事:
发送half消息给MQ。half消息发送成功执行本地事务并记录日志。告知MQ可以提交事务消息。
所以我们需要定义一下消息格式对象类中必须包含订单号、产品编码、用户编码、购买产品数量等信息。
NoArgsConstructor
AllArgsConstructor
Getter
Setter
public class OrderDto {private static final long serialVersionUID 1L;//设置主键自增避免插入时没必要的报错TableId(value ID, type IdType.AUTO)private Integer id;/*** 订单号*/private String orderNo;/*** 用户编码*/private String accountCode;/*** 产品编码*/private String productCode;/*** 产品扣减数量*/private Integer count;/*** 余额*/private BigDecimal amount;/*** 本次扣减金额*/private BigDecimal price;
}然后我们就可以编写控制层的代码了通过获取前端传输的参数调用orderService完成half消息发送。
PostMapping(/order/createByMQ)public ResultDataString createByMQ(RequestBody OrderDto orderDTO) {log.info(基于mq完成用户下单流程请求参数: JSON.toJSONString(orderDTO));orderService.createByRocketMQ(orderDTO);return ResultData.success(基于mq完成用户下单完成);}orderService的实现逻辑很简单定义好消息设置消息头内容和消息载体的对象通过sendMessageInTransaction方法完成半消息发送需要了解一下消息的主题(topic)为createByRocketMQ只有订阅这个主题的消费者才能消费这条消息。
Autowiredprivate RocketMQTemplate rocketMQTemplate;Overridepublic void createByRocketMQ(OrderDto orderDto) {//创建half消息消息内容为告知account服务要退款给用户String transactionId UUID.randomUUID().toString();MessageOrderDto message MessageBuilder.withPayload(orderDto).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).setHeader(accountCode, orderDto.getAccountCode()).setHeader(productCode, orderDto.getProductCode()).setHeader(count, orderDto.getCount()).setHeader(amount, orderDto.getPrice().multiply(new BigDecimal(orderDto.getCount()))).build();//发送half消息rocketMQTemplate.sendMessageInTransaction(createByRocketMQ, message, orderDto);}完成half消息发送之后我们就必须知晓消息发送结果才能确定是否执行本地事务并提交所以我们的订单服务必须创建一个监听器了解half消息的发送情况executeLocalTransaction方法就是mq成功收到半消息后的回调函数一旦我们得知消息成功发送之后MQ就会执行这个方法笔者通过这个方法获取消息头的参数创建订单对象调用createOrderWithRocketMqLog完成订单的创建的本地事务成功的日志记录。
Slf4j
RocketMQTransactionListener
RequiredArgsConstructor(onConstructor __(Autowired))
public class OrderListener implements RocketMQLocalTransactionListener {private final IOrderService orderService;private final RocketmqTransactionLogMapper rocketMqTransactionLogMapper;/*** 监听到发送half消息执行本地事务*/Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {log.info(order执行本地事务);try {MessageHeaders headers message.getHeaders();String amount (String) headers.get(amount);Order order Order.builder().accountCode((String) headers.get(accountCode)).amount(new BigDecimal(amount) ).productCode((String) headers.get(productCode)).count(Integer.valueOf(String.valueOf(headers.get(count)))).build();String transactionId (String) headers.get(RocketMQHeaders.TRANSACTION_ID);orderService.createOrderWithRocketMqLog(order, transactionId);return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {log.error(创建订单失败失败原因: e.getMessage(), e);return RocketMQLocalTransactionState.ROLLBACK;}}/*** 本地事务的检查检查本地事务是否成功*/Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {MessageHeaders headers message.getHeaders();//获取事务IDString transactionId (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info(检查本地事务,事务ID:{}, transactionId);//根据事务id从日志表检索QueryWrapperRocketmqTransactionLog queryWrapper new QueryWrapper();queryWrapper.eq(transaction_id, transactionId);RocketmqTransactionLog rocketmqTransactionLog rocketMqTransactionLogMapper.selectOne(queryWrapper);if (null ! rocketmqTransactionLog) {return RocketMQLocalTransactionState.COMMIT;}return RocketMQLocalTransactionState.ROLLBACK;}
}createOrderWithRocketMqLog做了两件事分别是插入订单信息和创建消息日志这里笔者用到了事务注解确保了两个操作的原子性。 这样一来MQserver后续的回查逻辑完全可以基于RocketmqTransactionLog 进行判断如果消息的事务id在表中存在则说明生产者本地事务成功反之就是失败。 Transactional(rollbackFor RuntimeException.class)Overridepublic void createOrderWithRocketMqLog(Order order, String transactionId) {order.setOrderNo(UUID.randomUUID().toString());orderMapper.insert(order);RocketmqTransactionLog log RocketmqTransactionLog.builder().transactionId(transactionId).log(执行创建订单操作).build();rocketmqTransactionLogMapper.insert(log);}补充一下基于MP生成的RocketmqTransactionLog 类代码
TableName(rocketmq_transaction_log)
ApiModel(value RocketmqTransactionLog对象, description )
Data
NoArgsConstructor
AllArgsConstructor
Builder
public class RocketmqTransactionLog implements Serializable {private static final long serialVersionUID 1L;TableId(value ID, type IdType.AUTO)private Integer id;private String transactionId;private String log;}完成account、product监听事件
然后我们就可以实现用户服务和商品服务的监听事件了一旦生产者提交事务消息之后这几个消费者都会收到这个topic(主题)的消息进而完成当前服务的业务逻辑。
先来看看实现扣款的用户服务我们的监听器继承了RocketMQListener基于RocketMQMessageListener注解设置它订阅的主题为createByRocketMQ一旦收到这个主题的消息时这个监听器就会执行onMessage方法我们的逻辑很简单就是获取消息的内容完成扣款唯一需要注意的就是线程安全问题。我们的压测的情况下单用户可能会频繁创建订单在并发期间同一个用户的扣款消息可能同时到达扣款服务中这就导致单位时间内扣款服务从数据库中查询到相同的余额执行相同的扣款逻辑导致金额少扣了。 所以我们必须保证扣款操作互斥和原子化考虑到笔者当前项目环境是单体所以就用简单的synchronized 关键字解决问题。
Slf4j
Service
RocketMQMessageListener(topic createByRocketMQ, consumerGroup cloud-group)
RequiredArgsConstructor(onConstructor __(Autowired))
public class SubtracAmountListener implements RocketMQListenerOrderDto {Resourceprivate AccountMapper accountMapper;//强制转为runTimeExceptionSneakyThrowsOverridepublic void onMessage(OrderDto orderDto) {log.info(账户服务收到消息开始消费);QueryWrapperAccount query new QueryWrapper();query.eq(account_code, orderDto.getAccountCode());//解决单体服务下线程安全问题synchronized (this){Account account accountMapper.selectOne(query);BigDecimal subtract account.getAmount().subtract(orderDto.getAmount());if (subtract.compareTo(BigDecimal.ZERO)0){throw new Exception(用户余额不足);}account.setAmount(subtract);log.info(更新账户服务请求参数:{}, JSON.toJSONString(account));accountMapper.updateById(account);}}
}然后就说商品服务逻辑也很简单也同样要注意一下线程安全问题
Slf4j
Service
RocketMQMessageListener(topic createByRocketMQ, consumerGroup cloud-group2)
RequiredArgsConstructor(onConstructor __(Autowired))
public class ProductSubtractListener implements RocketMQListenerOrderDto {Resourceprivate ProductMapper productMapper;Overridepublic void onMessage(OrderDto orderDto) {log.info(产品服务收到消息开始消费);QueryWrapperProduct queryWrappernew QueryWrapper();queryWrapper.eq(product_code,orderDto.getProductCode());synchronized (this){Product product productMapper.selectOne(queryWrapper);if (product.getCount()orderDto.getCount()){throw new RuntimeException(库存不足);}product.setCount(product.getCount()-orderDto.getCount());log.info(更新产品库存信息请求参数:{}, JSON.toJSONString(product));productMapper.updateById(product);}}
}
测试
完整编码工作后自测是非常有必要的我们日常完成开发任务后都会结合需求场景以及功能编排一些自测用例查看最终结果是否与预期一致。 需要注意的是由于订单业务逻辑较为复杂很多业务场景一篇博客是不可能全部覆盖所以这里我们就测试一下基于RocketMQ实现分布式事务常见的几个问题场景是否和预期一致。
在测试前我们必须做好前置准备工作准备功能测试时涉及到的SQL语句以本次用户购买产品的业务为例涉及到订单表、用户账户信息表、产品表、以及生产者本地事务日志表。
SELECT * FROM t_order to2 ;
SELECT * from account a ;
SELECT * from product p ;
SELECT * FROM rocketmq_transaction_log rtl ;在每次测试完成之后我们希望数据能够还原所以这里也需要准备一下每次测试结束后的更新语句由于订单表和消息日志表都是主键自增考虑到这两张表只涉及插入所以笔者为了重置主键的值采取的是truncate语句。
truncate table t_order;
truncate rocketmq_transaction_log ;
UPDATE account set amount10000 ;
UPDATE product set count10000;测试用例1
第一个用例是查看所有服务都正常的情况下订单表是否有数据用户表的用户是否会正常扣款以及商品表库存是否会扣减。
测试前我们先查看订单表确认没有数据 查看我们的测试用户钱包额度为10000 再查看库存表可以看到数量为1000 确认完数据之后我们就可以测试服务是否按照预期的方式执行将所有服务启动 我们通过网关发起调用,请求地址如下:
http://localhost:8090/order/order/createByMQ请求参数如下从参数可以看出这个请求意为用户代码(accountCode)为demoData这个用户希望购买1个(count)产品代码(productCode)为P001的产品该产品当前售价(price)为1元。
{accountCode: demoData,productCode: P001,count: 1,amount: 1,price: 1
}调用完成后查看订单表订单数据生成无误: 查看用户服务是否完成用户扣款扣款无误 查看产品表可以看到产品数量也准确扣减 测试用例2
我们希望测试一下发送完half消息之后执行本地事务完成但是未提交commit请求时MQServer是否会调用回查逻辑。
为了完成这一点我们必须按照以下两个步骤执行:
在订单服务提交事务消息处打个断点。 发起请求当代码执行到这里的时候通过jps定位到进程号将其强制杀死。如下所示我们的代码执行到了提交事务消息这一步: 我们通过jps定位并将其杀死 完成这些步骤后我们再次将服务启动等待片刻之后可以发现MQServer会调用checkLocalTransaction回查生产者本地事务的情况。我们放行这块代码让程序执行下去最后再查看数据库中的数据结果是否符合预期。 测试用例3
测试消费者执行报错后是否会进行重试这一点就比较好测试了我们在消费者监听器中插入随便插入一个报错查看其是否会不断重试。这里笔者就不多做演示实验结果是会进行不断重试当重试次数达到阈值时会将结果存到死信队列中。 压测MQ和Seata的性能
由于MQ是采用异步消费的形式解耦了服务间的业务而我们的Seata采用默认的AT模式每次执行分布式事务时都会需要借助undo-log、全局锁等的方式保证最终一致性。所以理论上RocketMQ的性能肯定是高于Seata的对此我们不妨使用Jmeter进行压测来验证一下。
本次压测只用了10个并发MQ和seata的压测结果如下,可以看到MQ无论从执行时间还是成功率都远远优秀于Seata的。
MQ的压测结果: Seata的压测结果: 参考文献
SpringCloud Alibaba微服务实战三十二 - 集成RocketMQ实现分布式事务
Lombok注解-SneakyThrows
RocketMq 广播模式
使用RocketMQTemplate发送各种消息
RocketMQ事务消息如何保证数据的最终一致性