政务公开网站建设的亮点和建议,产品全网营销推广,天津定制开发网站,贵州 跨境电商网站建设2. 看门狗调度机制 背景与问题 面临的挑战 在传统的微服务集群部署中#xff0c;每个服务实例都可能需要连接MQTT服务器处理设备消息。这会带来几个问题#xff1a; 消息重复处理#xff1a;多个节点同时订阅同一个Topic#xff0c;导致同一条消息被处理多次资源浪费#…
2. 看门狗调度机制 背景与问题 面临的挑战 在传统的微服务集群部署中每个服务实例都可能需要连接MQTT服务器处理设备消息。这会带来几个问题 消息重复处理多个节点同时订阅同一个Topic导致同一条消息被处理多次资源浪费每个节点都维护MQTT连接占用不必要的网络和内存资源状态不一致多个节点并发处理设备指令可能导致设备状态混乱 业务需求 对于设备管理服务我们需要确保 每条MQTT消息只被处理一次服务具备高可用性单节点故障不影响消息处理系统能够自动进行故障恢复 解决方案设计 核心思想 通过分布式锁 看门狗的机制确保在任意时刻只有一个节点负责MQTT连接和消息处理同时保证服务的高可用性。 Component
public class MqttClientStart implements ApplicationRunner, DisposableBean {private static final String MQTT_LOCK_KEY Service:Mqtt:Consumers:Client:Watchdog:Lock;private static final Long LOCK_TIMEOUT 120L; // 锁超时时间private static final int LOCK_RENEW_INTERVAL 100; // 续期间隔private final String nodeId RequestUtils.getHostname(); // 节点唯一标识private final AtomicBoolean watchdogRunning new AtomicBoolean(false);private final AtomicBoolean mqttInitialized new AtomicBoolean(false);
} 设计要点 使用主机名作为节点唯一标识锁超时时间120秒续期间隔100秒避免网络抖动导致的锁丢失通过AtomicBoolean确保状态的线程安全 private void startWatchdog() {watchdogExecutor Executors.newSingleThreadScheduledExecutor(r - {Thread thread new Thread(r, mqtt-watchdog- nodeId);thread.setDaemon(true);// 关键设置未捕获异常处理器thread.setUncaughtExceptionHandler((t, ex) - {log.error(Uncaught exception in watchdog thread {}: {}, t.getName(), ex.getMessage(), ex);handleWatchdogFailure(ex);});return thread;});watchdogExecutor.scheduleAtFixedRate(this::watchdogTask, 1, LOCK_RENEW_INTERVAL, TimeUnit.SECONDS);
}
设计亮点
单线程调度器避免并发问题守护线程确保不阻塞应用关闭完善的异常处理机制
3. 核心业务逻辑 private void watchdogTask() {try {boolean hasLock myRedisLock.tryReentrantLock(MQTT_LOCK_KEY, nodeId, LOCK_TIMEOUT);if (hasLock) {// 获得锁且未初始化 - 初始化MQTT客户端if (!mqttInitialized.get()) {log.info(Node {} acquired lock. Initializing MQTT client..., nodeId);initializeMqttClient();}} else {// 失去锁且已初始化 - 关闭MQTT客户端if (mqttInitialized.get()) {log.info(Node {} lost lock. Shutting down MQTT client..., nodeId);shutdownMqttClient();}}} catch (Exception e) {log.error(Error in MQTT watchdog task:, e);if (mqttInitialized.get()) {shutdownMqttClient(); // 异常时确保资源清理}}
}
核心逻辑
持有锁 未初始化 → 启动MQTT客户端失去锁 已初始化 → 关闭MQTT客户端异常情况下确保资源清理
4. 故障恢复机制
private void handleWatchdogFailure(Throwable ex) {watchdogRunning.set(false);// 异步延迟重启CompletableFuture.runAsync(() - {try {Thread.sleep(5000); // 延迟5秒重启if (watchdogExecutor ! null) {watchdogExecutor.shutdown();}startWatchdog();} catch (InterruptedException e) {Thread.currentThread().interrupt();}});
}
容错设计
异常发生时自动重启看门狗延迟重启避免频繁失败异步处理不阻塞当前线程
运行流程
正常运行流程
应用启动各节点启动看门狗线程锁竞争各节点尝试获取Redis分布式锁角色确定获得锁的节点成为Active其他为StandbyMQTT管理Active节点初始化MQTT客户端开始处理消息锁续期Active节点定期续期锁Standby节点继续尝试获取锁
故障切换流程
故障检测Active节点故障停止锁续期锁释放Redis锁超时自动释放120秒后角色切换Standby节点获得锁升级为Active服务恢复新Active节点初始化MQTT客户端恢复消息处理
优势与权衡
主要优势
高可用性
单节点故障时自动切换服务不中断故障恢复时间可控最多120秒
数据一致性
确保消息唯一性处理避免重复操作和状态冲突
运维友好
自动故障检测和恢复完善的日志记录便于问题排查
设计权衡
性能方面
牺牲了并发处理能力MQTT处理能力无法水平扩展
资源利用
其他节点的MQTT处理资源闲置可能造成负载不均 适用场景 这种设计适合以下场景 对消息处理一致性要求较高MQTT消息量不大单节点可以处理更重视可用性而非性能