个人网站 建设,百度wap,桂林人生活论坛,衡水住房和城乡建设局网站1. 简介
RocketMQ自身实现了事务消息#xff0c;可以通过这个机制来实现一些对数据一致性有强需求的场景#xff0c;保证上下游数据的一致性。
以电商交易场景为例#xff0c;用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统…1. 简介
RocketMQ自身实现了事务消息可以通过这个机制来实现一些对数据一致性有强需求的场景保证上下游数据的一致性。
以电商交易场景为例用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括
主分支订单系统状态更新由未支付变更为支付成功。物流系统状态新增新增待发货物流记录创建订单物流记录。积分系统状态变更变更用户积分更新用户积分表。购物车系统状态变更清空购物车更新用户购物车记录。
引入RocketMQ之后保证上下游数据的一致性。 使用普通消息和订单事务无法保证一致的原因本质上是由于普通消息无法像单机数据库事务一样具备提交、回滚和统一协调的能力。 而基于 RocketMQ 的分布式事务消息功能在普通消息基础上支持二阶段的提交能力。将二阶段提交和本地事务绑定实现全局提交结果的一致性。
事务消息发送分为两个阶段。第一阶段会发送一个半事务消息半事务消息是指暂不能投递的消息生产者已经成功地将消息发送到了 Broker但是Broker 未收到生产者对该消息的二次确认此时该消息被标记成“暂不能投递”状态如果发送成功则执行本地事务并根据本地事务执行成功与否向 Broker 半事务消息状态commit或者rollback半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因导致某条事务消息的二次确认丢失Broker 端会通过扫描发现某条消息长期处于“半事务消息”时需要主动向消息生产者询问该消息的最终状态Commit或是Rollback。这样最终保证了本地事务执行成功下游就能收到消息本地事务执行失败下游就收不到消息。总而保证了上下游数据的一致性。
整个事务消息的详细交互流程如下图所示 事务消息详细步骤
生产者将半事务消息发送至 RocketMQ Broker。RocketMQ Broker 将消息持久化成功之后向生产者返回 Ack 确认消息已经发送成功此时消息暂不能投递为半事务消息。生产者开始执行本地事务逻辑。生产者根据本地事务执行结果向服务端提交二次确认结果Commit或是Rollback服务端收到确认结果后处理逻辑如下 二次确认结果为Commit服务端将半事务消息标记为可投递并投递给消费者。二次确认结果为Rollback服务端将回滚事务不会将半事务消息投递给消费者。 在断网或者是生产者应用重启的特殊情况下若服务端未收到发送者提交的二次确认结果或服务端收到的二次确认结果为Unknown未知状态经过固定时间后服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。需要注意的是服务端仅仅会按照参数尝试指定次数超过次数后事务会强制回滚因此未决事务的回查时效性非常关键需要按照业务的实际风险来设置。
2. 实战
分别启动namesrv、broker、consumer、producer
producer示例代码
public class TransactionProducer {public static void main(String[] args) throws MQClientException, InterruptedException {TransactionListener transactionListener new TransactionListenerImpl();TransactionMQProducer producer new TransactionMQProducer(please_rename_unique_group_name);ExecutorService executorService new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueueRunnable(2000), new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {Thread thread new Thread(r);thread.setName(client-transaction-msg-check-thread);return thread;}});producer.setNamesrvAddr(127.0.0.1:9876);producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();String[] tags new String[] {TagA, TagB, TagC, TagD, TagE};for (int i 0; i 10; i) {try {Message msg new Message(TopicTest, tags[i % tags.length], KEY i,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult producer.sendMessageInTransaction(msg, null);System.out.printf(%s%n, sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i 0; i 100000; i) {Thread.sleep(1000);}producer.shutdown();}static class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex new AtomicInteger(0);private ConcurrentHashMapString, Integer localTrans new ConcurrentHashMap();Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value transactionIndex.getAndIncrement();int status value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status localTrans.get(msg.getTransactionId());if (null ! status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;default:return LocalTransactionState.COMMIT_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}}
}消费者示例代码
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(please_rename_unique_group_name_4);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setNamesrvAddr(127.0.0.1:9876);consumer.subscribe(TopicTest1234, *);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf(Consumer Started.%n);}
}将producer和consumer的topic都设置为TopicTest1234。然后再启动consumer和producer。可以看到TransactionProducer控制台输出日志 Consumer控制台输出日志
3. 原理分析
分别启动namesrv和broker服务随后运行org.apache.rocketmq.example.transaction.TransactionProducer.java构建出一个TransactionListenerImpl对象之后添加到TransactionProducer中为半消息的发送以及本地事务校验做做准备。
核心入口TransactionProducer#sendMessageInTransaction() Overridepublic TransactionSendResult sendMessageInTransaction(final Message msg,final Object arg) throws MQClientException {if (null this.transactionListener) {throw new MQClientException(TransactionListener is null, null);}// 包装Topic判断是否是重试Topic以及DLQ的Topicmsg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));// 发送事务消息return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);}DefaultMQProducerImpl#sendMessageInTransaction() public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {// 获取事务监听器就程序开头的时候传入的TransactionListenerImpl对象TransactionListener transactionListener getCheckListener();// 新版本推荐事务监听器已不使用localTransactionExecuterif (null localTransactionExecuter null transactionListener) {throw new MQClientException(tranExecutor is null, null);}// 设置DelayTimeLevel 参数if (msg.getDelayTimeLevel() ! 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);}// 消息校验Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult null;// 设置属性TRAN_MSG为trueMessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, true);// 设置属性PGROUP为producerGroupMessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {// ① 发送半消息sendResult this.send(msg);} catch (Exception e) {throw new MQClientException(send message Exception, e);}LocalTransactionState localTransactionState LocalTransactionState.UNKNOW;Throwable localException null;switch (sendResult.getSendStatus()) {case SEND_OK: {try {// 事务ID不为空if (sendResult.getTransactionId() ! null) {// 设置事务idmsg.putUserProperty(__transactionId__, sendResult.getTransactionId());}String transactionId msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null ! transactionId !.equals(transactionId)) {msg.setTransactionId(transactionId);}if (null ! localTransactionExecuter) {// 执行本地事务检查localTransactionState localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener ! null) {log.debug(Used new transaction API);// ② 执行本地事务检查 localTransactionState transactionListener.executeLocalTransaction(msg, arg);}if (null localTransactionState) {localTransactionState LocalTransactionState.UNKNOW;}if (localTransactionState ! LocalTransactionState.COMMIT_MESSAGE) {log.info(executeLocalTransactionBranch return {}, localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info(executeLocalTransactionBranch exception, e);log.info(msg.toString());localException e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}try {// ③ 调用endTransaction判断是否结束事务this.endTransaction(msg, sendResult, localTransactionState, localException);} catch (Exception e) {log.warn(local transaction execute localTransactionState , but end broker transaction failed, e);}// 构建事务消息发送结果TransactionSendResult transactionSendResult new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;}在该方法中核心步骤分别位于①②③标识处。
①调用链路为最终抵达DefaultMQProducerImpl#sendDefaultImpl()也就是producer发消息核心方法。 在sendDefaultImpl()中有一处比较关键的代码 // 事务final String tranMsg msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(tranMsg)) {sysFlag | MessageSysFlag.TRANSACTION_PREPARED_TYPE;}此处代码给这条消息标识上事务消息的标志最后通过mQClientAPIImpl往broker发送一条RPC的请求。
Broker端接受到请求之后经过请求code的分发这条请求将由SendMessageProcessor#asyncSendMessage进行处理。 private CompletableFutureRemotingCommand asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 相关代码省略...// 事务消息标识从请求头中获取String transFlag origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(the broker[ this.brokerController.getBrokerConfig().getBrokerIP1() ] sending transaction message is forbidden);return CompletableFuture.completedFuture(response);}// 事务消息处理putMessageResult this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 将消息内容存储到CommitLog文件putMessageResult this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}asyncSendMessage方法处会从消息属性中获取到transFlag标识判断到这条消息是一条半消息然后调用this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner)。
方法调用链最终来到TrasactionalMessageBridge#asyncPutHalfMessage() public CompletableFuturePutMessageResult asyncPutHalfMessage(MessageExtBrokerInner messageInner) {return store.asyncPutMessage(parseHalfMessageInner(messageInner));}/*** 消息事务半消息* param msgInner* return*/private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {// 设置真正要发送的TopicMessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());// 设置真正要发送的queueIdMessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));// 设置半消息TopicmsgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());// 设置半消息queueIdmsgInner.setQueueId(0);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));return msgInner;}parseHalfMessageInner方法做了两件事将这条半消息真正要发送的Topic以及queueId另外存起来然后将这条半消息要发送到指定存储半消息的的RMQ_SYS_TRANS_HALF_TOPIC中然后将queueId设置为0。存储半消息的Topic只有1个queueId 最后将这条半消息存入到CommitLog中则步骤①执行完成。
② 步骤①发送半消息到CommitLog中存储之后此时消息发送结果为SEND_OK接着将调用TransactionListener#executeLocalTransaction()方法检查本地事务的状态此处由开发者自行实现代码逻辑。
③ 检查本地事务结束之后会调用endTransaction()方法来尝试结束此次的事务消息。
public void endTransaction(final Message msg,final SendResult sendResult,final LocalTransactionState localTransactionState,final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {final MessageId id;if (sendResult.getOffsetMsgId() ! null) {id MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());} else {id MessageDecoder.decodeMessageId(sendResult.getMsgId());}// 获取消息事务idString transactionId sendResult.getTransactionId();// 获取broker地址final String brokerAddr this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());EndTransactionRequestHeader requestHeader new EndTransactionRequestHeader();requestHeader.setTransactionId(transactionId);// 设置消息comimtlog偏移量requestHeader.setCommitLogOffset(id.getOffset());switch (localTransactionState) {case COMMIT_MESSAGE: // 提交消息requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE: // 回滚消息requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);break;case UNKNOW: // 未知状态requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);break;default:break;}doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());requestHeader.setMsgId(sendResult.getMsgId());String remark localException ! null ? (executeLocalTransactionBranch exception: localException.toString()) : null;// 往broker发送endTransactionOneway请求this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,this.defaultMQProducer.getSendMsgTimeout());}broker端EndTransactionProcessor#processRequest接收到endTransaction请求
Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {final RemotingCommand response RemotingCommand.createResponseCommand(null);final EndTransactionRequestHeader requestHeader (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);LOGGER.debug(Transaction request:{}, requestHeader);if (BrokerRole.SLAVE brokerController.getMessageStoreConfig().getBrokerRole()) {response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);LOGGER.warn(Message store is slave mode, so end transaction is forbidden. );return response;}// 判断是事务check类型if (requestHeader.getFromTransactionCheck()) {switch (requestHeader.getCommitOrRollback()) {case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn(Check producer[{}] transaction state, but its pending status. RequestHeader: {} Remark: {},RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {LOGGER.warn(Check producer[{}] transaction state, the producer commit the message. RequestHeader: {} Remark: {},RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn(Check producer[{}] transaction state, the producer rollback the message. RequestHeader: {} Remark: {},RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}} else {switch (requestHeader.getCommitOrRollback()) {case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn(The producer[{}] end transaction in sending message, and its pending status. RequestHeader: {} Remark: {},RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn(The producer[{}] end transaction in sending message, rollback the message. RequestHeader: {} Remark: {},RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}}OperationResult result new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE requestHeader.getCommitOrRollback()) {// 这里提交requestHeader实际是从commitlog中获取halfMessageresult this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);if (result.getResponseCode() ResponseCode.SUCCESS) {// 检查halfMessage状态RemotingCommand res checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() ResponseCode.SUCCESS) {// ④ 半消息处理成功之后调用endMessageTransaction()发起事务消息结束MessageExtBrokerInner msgInner endMessageTransaction(result.getPrepareMessage());msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());// 清楚事务标识MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);// ⑤ 发送真正的完整的消息RemotingCommand sendResult sendFinalMessage(msgInner);if (sendResult.getCode() ResponseCode.SUCCESS) {// ⑦ 删除半消息this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE requestHeader.getCommitOrRollback()) {// ⑥ 发起消息回滚获取要回滚的半消息result this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() ResponseCode.SUCCESS) {// 查询半消息RemotingCommand res checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() ResponseCode.SUCCESS) {// ⑦ 删除半消息this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}response.setCode(result.getResponseCode());response.setRemark(result.getResponseRemark());return response;}④ 调用endMessageTransaction
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {MessageExtBrokerInner msgInner new MessageExtBrokerInner();// 取出真正要发送是TopicmsgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));// 取出真正要发送的queueidmsgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());msgInner.setWaitStoreMsgOK(false);msgInner.setTransactionId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));msgInner.setSysFlag(msgExt.getSysFlag());TopicFilterType topicFilterType (msgInner.getSysFlag() MessageSysFlag.MULTI_TAGS_FLAG) MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG: TopicFilterType.SINGLE_TAG;long tagsCodeValue MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());msgInner.setTagsCode(tagsCodeValue);MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);return msgInner;}该方法将这条半消息设置成真正发送的Topic以及queueId。
⑤ 调用EndTransactionProcessor#sendFinalMessage()发送真正的消息。最终调用MessageStore#putMessage()将该条消息存入CommitLog中。
⑥ 调用TransactionalMessageServiceImpl#rollbackMessage()对半消息进行回滚但该方法实则是读取到半消息然后再检查这条半消息。
⑦ 发送真正的消息或者是回滚半消息成功之后随后调用TransactionalMessageServiceImpl#deletePrepareMessage()删除半消息。
经过①~⑦的步骤整个RocketMQ的事务消息流程也就结束了但是这里有一个关键点还没有讲解当半消息发送成功了本地事务执行成功发送本地事务状态时发生了broker断电或者是本地事务状态没有发送成功时该如何保证整个流程能够正常运行呢答案就是broker端会在启动时启动一个定时任务区检查本地事务的状态也就是方法TransactionListener#checkLocalTransaction()。
也就是Broker端会向消息生产者发起事务回查第一次回查后仍未获取到事务状态则之后每隔一段时间会再次回查。此外需要注意的是事务消息的生产组名称 ProducerGroupName不能随意设置。事务消息有回查机制回查时Broker端如果发现原始生产者已经崩溃则会联系同一生产者组的其他生产者实例回查本地事务执行情况以Commit或Rollback半事务消息。
参考
RocketMQ官网-事务消息发送