网站开发项目流程书,电商ui设计是什么意思,1688网页登录,电商网站可维护性一、发送事务消息案例 事务消息共有三种状态#xff0c;提交状态、回滚状态、中间状态#xff1a;
TransactionStatus.CommitTransaction: 提交事务#xff0c;它允许消费者消费此消息。TransactionStatus.RollbackTransaction: 回滚事务#xff0c;它代表该消息将被删除…一、发送事务消息案例 事务消息共有三种状态提交状态、回滚状态、中间状态
TransactionStatus.CommitTransaction: 提交事务它允许消费者消费此消息。TransactionStatus.RollbackTransaction: 回滚事务它代表该消息将被删除不允许被消费。TransactionStatus.Unknown: 中间状态它代表需要检查消息队列来确定状态。 1.1创建事务性生产者 使用 TransactionMQProducer类创建生产者并指定唯一的 ProducerGroup就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在上面说的。 /*** 发送事务消息* throws Exception*/Testpublic void testTransactionProduce() throws Exception {TransactionListener transactionListener new TransactionListenerImpl();TransactionMQProducer producer new TransactionMQProducer(please_rename_unique_group_name);ExecutorService executorService new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueueRunnable(2000), new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {Thread thread new Thread(r);thread.setName(client-transaction-msg-check-thread);return thread;}});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();String[] tags new String[] {TagA, TagB, TagC, TagD, TagE};for (int i 0; i 10; i) {try {Message msg new Message(TopicTest1234, tags[i % tags.length], KEY i,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult producer.sendMessageInTransaction(msg, null);System.out.printf(%s%n, sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i 0; i 100000; i) {Thread.sleep(1000);}producer.shutdown();}
2.事务监听接口 当发送半消息成功时我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTransaction 方法用于检查本地事务状态并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。 class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex new AtomicInteger(0);private ConcurrentHashMapString, Integer localTrans new ConcurrentHashMap();/*** 本地事务*/Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value transactionIndex.getAndIncrement();int status value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}/*** 状态回查*/Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status localTrans.get(msg.getTransactionId());if (null ! status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}
}
1.3事务消息使用上的限制
事务消息不支持延时消息和批量消息。为了避免单个消息被检查太多次而导致半队列消息累积我们默认将单个消息的检查次数限制为 15 次但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话 N transactionCheckMax 则 Broker 将丢弃此消息并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制该参数优先于 transactionTimeout 参数。事务性消息可能不止一次被检查或消费提交给用户的目标主题消息可能会失败目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证如果希望确保事务消息不丢失、并且事务完整性得到保证建议使用同步的双重写入机制。事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。