电商类网站建设需要多少钱,做彩票网站违法的吗,做苗木选择哪个网站,全面的河南网站建设1. RabbitMQ高级-消息确认机制的配置
NONE值是禁用发布确认模式#xff0c;是默认值
CORRELATED值是发布消息成功到交换器后会触发回调方法#xff0c;如1示例SIMPLE值经测试有两种效果#xff0c;其一效果和CORRELATED值一样会触发回调方法#xff0c;其二在发布消息成功…1. RabbitMQ高级-消息确认机制的配置
NONE值是禁用发布确认模式是默认值
CORRELATED值是发布消息成功到交换器后会触发回调方法如1示例SIMPLE值经测试有两种效果其一效果和CORRELATED值一样会触发回调方法其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果根据返回结果来判定下一步的逻辑要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel则接下来无法发送消息到broker;
# 服务端口
server:port: 8080
# 配置rabbitmq服务
spring:rabbitmq:username: adminpassword: adminvirtual-host: /host: 47.104.141.27port: 5672publisher-confirm-type: correlated创建一个自己的消息确认类
package com.xuexiangban.rabbitmq.springbootorderrabbitmqproducer.callback;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){System.out.println(消息确认成功!!!!);}else{System.out.println(消息确认失败!!!!);}}
}public void makeOrderTopic(String userId,String productId,int num){// 1: 根据商品id查询库存是否充足// 2: 保存订单String orderId UUID.randomUUID().toString();System.out.println(保存订单成功id是 orderId);// 3: 发送消息//com.# duanxin//#.email.* email//#.sms.# sms// 设置消息确认机制rabbitTemplate.setConfirmCallback(new MessageConfirmCallback());rabbitTemplate.convertAndSend(topic_order_ex,com.email.sms.xxx,orderId);
}2. 分布式事务的解决方式
2.1 两阶段提交
2.1.1 概念
两阶段提交Two-phase Commit2PC通过引入协调者Coordinator来协调参与者的行为并最终决定这些参与者是否要真正执行事务。
准备阶段 协调者询问参与者事务是否执行成功参与者发回事务执行结果。 提交阶段 如果事务在每个参与者上都执行成功事务协调者发送通知让参与者提交事务否则协调者发送通知让参与者回滚事务。需要注意的是在准备阶段参与者执行了事务但是还未提交。只有在提交阶段接收到协调者发来的通知后才进行提交或者回滚。
2.1.2 优缺点
同步阻塞 所有事物参与者在等待其他参与者响应的时候都处于同步阻塞状态无法进行其他操作。 单点问题 协调者在2PC中起到非常大的作用发生故障将会造成很大影响。特别是在阶段二发生故障所有参与者会一直等待状态无法完成其它操作。 数据不一致 在阶段二如果协调者只发送了部分 Commit 消息此时网络发生异常那么只有部分参与者接收到 Commit 消息也就是说只有部分参与者提交了事务使得系统数据不一致。 太过保守 任意一个节点失败就会导致整个事务失败没有完善的容错机制
2.2 补偿事务TCC
2.2.1 概念
TCC 其实就是采用的补偿机制其核心思想是针对每个操作都要注册一个与其对应的确认和补偿撤销操作。它分为三个阶段
Try阶段主要是对业务系统做检测及资源预留Confirm阶段主要是对业务系统做确认提交Try阶段执行成功并开始执行 Confirm阶段时默认 - - - Confirm阶段是不会出错的。即只要Try成功Confirm一定成功。Cancel阶段主要是在业务执行错误需要回滚的状态下执行的业务取消预留资源释放。
举个例子假入 Bob 要向 Smith 转账思路大概是 我们有一个本地方法里面依次调用 1首先在 Try 阶段要先调用远程接口把 Smith 和 Bob 的钱给冻结起来。 2在 Confirm 阶段执行远程调用的转账的操作转账成功进行解冻。 3如果第2步执行成功那么转账成功如果第二步执行失败则调用远程冻结接口对应的解冻方法 (Cancel)。
2.2.2 优缺点
优点跟2PC比起来实现以及流程相对简单了一些但数据的一致性比2PC也要差一些 缺点缺点还是比较明显的在2,3步中都有可能失败。TCC属于应用层的一种补偿方式所以需要程序员在实现的时候多写很多补偿的代码在一些场景中一些业务流程可能用TCC不太好定义及处理。
2.3 本地消息表异步确认
2.3.1 概念
本地消息表与业务数据表处于同一个数据库中这样就能利用本地事务来保证在对这两个表的操作满足事务特性并且使用了消息队列来保证最终一致性。
在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息本地事务能保证这个消息一定会被写入本地消息表中。之后将本地消息表中的消息转发到 Kafka 等消息队列中如果转发成功则将消息从本地消息表中删除否则继续重新转发。在分布式事务操作的另一方从消息队列中读取一个消息并执行消息中的操作。
2.3.2 优缺点
优点 一种非常经典的实现避免了分布式事务实现了最终一致性。 缺点 消息表会耦合到业务系统中如果没有封装好的解决方案会有很多杂活需要处理。
2.4 MQ 事务消息
2.4.1 概念
异步场景通用性较强拓展性较高 有一些第三方的MQ是支持事务消息的比如RocketMQ他们支持事务消息的方式也是类似于采用的二阶段提交但是市面上一些主流的MQ都是不支持事务消息的比如 Kafka 不支持。
以阿里的 RabbitMQ 中间件为例其思路大致为
第一阶段Prepared消息会拿到消息的地址。 第二阶段执行本地事务第三阶段通过第一阶段拿到的地址去访问消息并修改状态。也就是说在业务方法内要想消息队列提交两次请求一次发送消息和一次确认消息。如果确认消息发送失败了RabbitMQ会定期扫描消息集群中的事务消息这时候发现了Prepared消息它会向消息发送者确认所以生产方需要实现一个check接口RabbitMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
2.4.2 优缺点
优点 实现了最终一致性不需要依赖本地数据库事务。 缺点 实现难度大主流MQ不支持RocketMQ事务消息部分代码也未开源。
3. 具体实现
3.1 可靠的生产流程
通过冗余的本地消息等待确认机制修改本地消息的标志位。定期循环扫描未被发送的消息。 3.2 基于MQ的分布式事务消息的可靠消费 当消息消费失败时我们需要对这些消息进行一定的处理
3.2.1 基于MQ的分布式事务消息的消费失败处理
设置重试次数一定要进行控制try/catchbasicNack(tag,false,false)死信队列