网站栏目模板如何选择,wordpress project,手游推广渠道平台,苏宁易购网站建设的不足之处文章目录 事务消息概念介绍交互流程事务消息原理TransactionListener接⼝TransactionProducer.javaTransactionConsumer.java 事务消息
内置topic中的消息对消费者不可见 本地事务mq消息事务消息
消息队列 RocketMQ 版提供的分布式事务消息适⽤于所有对数据最终⼀致性有强需求… 文章目录 事务消息概念介绍交互流程事务消息原理TransactionListener接⼝TransactionProducer.javaTransactionConsumer.java 事务消息
内置topic中的消息对消费者不可见 本地事务mq消息事务消息
消息队列 RocketMQ 版提供的分布式事务消息适⽤于所有对数据最终⼀致性有强需求的场景。
概念介绍
事务消息消息队列 RocketMQ 版提供类似 X/Open XA 的分布式事务功能通过消息队列RocketMQ 事务消息能达到分布式事务的最终⼀致。
半事务消息暂不能投递的消息发送⽅已经成功地将消息发送到了消息队列 RocketMQ 版服务端但是服务端未收到⽣产者对该消息的⼆次确认此时该消息被标记成“暂不能投递”状态处于该种状态下的消息即半事务消息。
消息回查由于⽹络闪断、⽣产者应⽤重启等原因导致某条事务消息的⼆次确认丢失消息队列RocketMQ 版服务端通过扫描发现某条消息⻓期处于“半事务消息”时需要主动向消息⽣产者询问该消息的最终状态Commit 或是 Rollback该询问过程即消息回查。
交互流程
事务消息交互流程如下图所示。 事务消息发送步骤如下
发送⽅将半事务消息发送⾄消息队列 RocketMQ 版服务端。消息队列 RocketMQ 版服务端将消息持久化成功之后向发送⽅返回 Ack 确认消息已经发送成功此时消息为半事务消息。发送⽅开始执⾏本地事务逻辑。发送⽅根据本地事务执⾏结果向服务端提交⼆次确认Commit 或是 Rollback服务端收到Commit 状态则将半事务消息标记为可投递订阅⽅最终将收到该消息服务端收到 Rollback 状态则删除半事务消息订阅⽅将不会接受该消息。
事务消息回查步骤如下
在断⽹或者是应⽤重启的特殊情况下上述步骤 4 提交的⼆次确认最终未到达服务端经过固定时间后服务端将对该消息发起消息回查。发送⽅收到消息回查后需要检查对应消息的本地事务执⾏的最终结果。发送⽅根据检查得到的本地事务的最终状态再次提交⼆次确认服务端仍按照步骤 4 对半事务消息进⾏操作。
注意事项
事务消息不⽀持延时消息和批量消息。为了避免单个消息被检查太多次⽽导致半队列消息累积我们默认将单个消息的检查次数限制为15 次但是⽤户可以通过 Broker 配置⽂件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话 N transactionCheckMax 则 Broker 将丢弃此消息并在默认情况下同时打印错误⽇志。⽤户可以通过重写AbstractTransactionalMessageCheckListener 类来修改这个⾏为。事务消息将在 Broker 配置⽂件中的参数 transactionTimeout 这样的特定时间⻓度之后被检查。当发送事务消息时⽤户还可以通过设置⽤户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制该参数优先于 transactionTimeout 参数。事务性消息可能不⽌⼀次被检查或消费。做好幂等性的检查提交给⽤户的⽬标主题消息可能会失败⽬前这依⽇志的记录⽽定。它的⾼可⽤性通过 RocketMQ本身的⾼可⽤性机制来保证如果希望确保事务消息不丢失、并且事务完整性得到保证建议使⽤同步的双重写⼊机制。事务消息的⽣产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型的消息不同事务消息允许反向查询、MQ服务器能通过它们的⽣产者 ID 查询到消费者。
事务消息原理
HALF消息:RMQ_SYS_TRANS_HALF_TOPIC(临时存放消息信息) 事务消息替换主题保存原主题和队列信息 半消息对Consumer不可⻅不会被投递 OP消息: RMQ_SYS_TRANS_OP_HALF_TOPIC(记录⼆阶段操作) Rollback:只做记录 Commit:根据备份信息重新构造消息并投递 回查: 对⽐HALF消息和OP消息进⾏回查
TransactionListener接⼝
要使⽤RocketMQ的事务消息要实现⼀个TransactionListener的接⼝这个接⼝中有两个⽅法如下
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;public interface TransactionListener {/*** When send transactional prepare(half) message succeed, this method will* be invoked to execute local transaction.* 执⾏本地事务** param msg Half(prepare) message* param arg Custom business parameter* return Transaction state*/LocalTransactionState executeLocalTransaction(final Message msg, finalObject arg);/*** When no response to prepare(half) message. broker will send check* message to check the transaction status, and this* method will be invoked to get local transaction status.* 消息回查后需要检查对应消息的本地事务执⾏的最终结果** param msg Check message* return Transaction state*/LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
RocketMQ的事务消息是基于两阶段提交实现的也就是说消息有两个状态prepared和commited。当消息执⾏完send⽅法后进⼊的prepared状态进⼊prepared状态以后就要执⾏executeLocalTransaction⽅法这个⽅法的返回值有3个也决定着这个消息的命运1.LocalTransactionState.COMMIT_MESSAGE提交消息这个消息由prepared状态进⼊到commited状态消费者可以消费这个消息 2.LocalTransactionState.ROLLBACK_MESSAGE回滚这个消息将被删除消费者不能消费这个消息 3.LocalTransactionState.UNKNOW未知这个状态有点意思如果返回这个状态这个消息既不提交也不回滚还是保持prepared状态⽽最终决定这个消息命运的是checkLocalTransaction这个⽅法。 当executeLocalTransaction⽅法返回UNKNOW以后RocketMQ会每隔⼀段时间调⽤⼀次checkLocalTransaction这个⽅法的返回值决定着这个消息的最终归宿。那么checkLocalTransaction这个⽅法多⻓时间调⽤⼀次呢我们在BrokerConfig类中可以找到
/*** Transaction message check interval.*/ImportantFieldprivate long transactionCheckInterval 60 * 1000;这个值是在brokder.conf中配置的默认值是60*1000也就是1分钟。那么会检查多少次呢如果每次都返回UNKNOW也不能⽆休⽌的检查吧我们在BrokerConfig类中可以找到
/*** The maximum number of times the message was checked, if exceed this
value, this message will be discarded.*/ImportantFieldprivate int transactionCheckMax 15;这个是检查的最⼤次数超过这个次数如果还返回UNKNOW这个消息将被删除。
TransactionProducer.java
package com.example.rocketmq.demo.transaction;import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.concurrent.TimeUnit;public class TransactionProducer {public static void main(String[] args) throws Exception {//1.创建消息生产者producer并制定生产者组名TransactionMQProducer producer new TransactionMQProducer(GroupTransaction);//2.指定nameserver地址producer.setNamesrvAddr(localhost:9876);//3.添加事务监听器producer.setTransactionListener(new TransactionListener() {/*** 在该方法执行本地事务* param msg* param arg* return*/Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {//db 本地事务 mq消息 事务消息System.out.println(executeLocal:msg.getTags());if(StringUtils.equals(TAGA,msg.getTags())){return LocalTransactionState.COMMIT_MESSAGE;}else if(StringUtils.equals(TAGB,msg.getTags())){//B消息本地事务返回rollbackreturn LocalTransactionState.ROLLBACK_MESSAGE;}else if(StringUtils.equals(TAGC,msg.getTags())){return LocalTransactionState.UNKNOW;}return LocalTransactionState.UNKNOW;}/*** 该方法用于MQ进行消息的回查* param msg* return*/Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println(checkLocalTransaction:msg.getTags());return LocalTransactionState.COMMIT_MESSAGE;}});//4.启动producerproducer.start();String[] tags {TAGA,TAGB,TAGC};//发送三条消息for (int i 0; i 3; i) {// 发送A、B、C三条Message msg new Message(TransactionTopic, tags[i],(Hello RocketMQ: tags[i]).getBytes(RemotingHelper.DEFAULT_CHARSET));//6.发送消息SendResult sendResult producer.sendMessageInTransaction(msg,null);//7.获取发送状态SendStatus sendStatus sendResult.getSendStatus();System.out.printf(发送结果:%s%n, sendStatus);TimeUnit.SECONDS.sleep(1);}//8.关闭生产者producer
// producer.shutdown();}
}
TransactionConsumer.java
package com.example.rocketmq.demo.transaction;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class TransactionConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 1.创建消费者consumer、制定消费者组名DefaultMQPushConsumer consumer new DefaultMQPushConsumer(GroupTransaction);// 2.指定nameserver地址consumer.setNamesrvAddr(localhost:9876);// 3.订阅主题Topic和Tagconsumer.subscribe(TransactionTopic, *);// 4.设置回调函数处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {for(MessageExt msg:msgs){String key msg.getKeys();System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), new String(msg.getBody()));}
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumer.start();System.out.printf(Consumer Started.%n);}
}