外贸手机网站模板,定制衣服的网站,专业进出口贸易网站,网站结构是什么 怎么做目录 背景遇到的问题 RocketMQ 基础基础消息模型扩展后的消息模型部署模型相关概念点 方案对比影子Topic的方案Tag的方案UserProperty的方案影子Group的方案灰度分区的方案方案对比 灰度分区方案设计适配只有部分灰度的情况所做的功能扩展消费者#xff08;无灰度#xff09;… 目录 背景遇到的问题 RocketMQ 基础基础消息模型扩展后的消息模型部署模型相关概念点 方案对比影子Topic的方案Tag的方案UserProperty的方案影子Group的方案灰度分区的方案方案对比 灰度分区方案设计适配只有部分灰度的情况所做的功能扩展消费者无灰度消费者有灰度 改造流程生产者改造逻辑消费者改造逻辑消费者自定义负载均衡 MQ注册ClientID修改常量配置类链路信息传递BasicMDC类MessageStorage类拦截配置消费者拦截生产者拦截请求拦截 验证过程消费者未启动灰度订阅消费者灰度订阅验证消息链路发送端消费端发送灰度消息 背景
我们公司团队为了更好地控制版本发布的影响范围自研了灰度发布流程目前已经支持http、gRPC等接口调用方式进行灰度流量转发但是消息队列基于业务实现展示不支持。参考了网上很多灰度方案博文比较热门的vivo鲁班RocketMQ的文章但是涉及到的改造范围都是比较大最后在看到关于Asyncer关于灰度分区 的博文整体比较巧妙的利用MQ的原有机制改动相对少了很多
遇到的问题 如上图所示普通的业务灰度流程保证了RPC服务之间的调用灰度但是当消息从服务发出到消息队列后队列分区是被均分到正常服务和灰度服务监听的这样会导致正常服务消费到灰度消息同时灰度服务消费到正常的消息所以MQ灰度是目前需要解决的问题这样才能完成整个灰度链路
RocketMQ 基础
基础消息模型 扩展后的消息模型 相同的ConsumerGroup下的消费者主要有两种负载均衡模式即广播模式和集群模式图中是最常用的集群模式。在集群模式下同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费如图中 ConsumerGroupA 订阅 TopicATopicA 对应 3个队列则 GroupA 中的 Consumer1 消费的是 MessageQueue 0和 MessageQueue 1的消息Consumer2是消费的是MessageQueue2的消息。在广播模式下同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列。需要注意的是广播模式下因为每个 Consumer 实例都需要处理全部的消息因此这种模式仅推荐在通知推送、配置同步类小流量场景使用。
部署模型
Producer、Consumer又是如何找到Topic和Broker的地址呢消息的具体发送和接收又是怎么进行的呢 RocketMQ 部署架构上主要分为四部分: 生产者 Producer 发布消息的角色。Producer通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递投递的过程支持快速失败和重试。 消费者 Consumer 消息消费的角色支持以推push拉pull两种模式对消息进行消费。同时也支持集群方式和广播方式的消费。提供实时消息订阅机制可以满足大多数用户的需求。 名字服务器 NameServer NameServer是一个简单的 Topic 路由注册中心支持 Topic、Broker 的动态注册与发现。 Broker管理NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制检查Broker是否还存活路由信息管理每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息从而进行消息的投递和消费。 代理服务器 Broker Broker主要负责消息的存储、投递和查询以及服务高可用保证。NameServer几乎无状态节点因此可集群部署节点之间无任何信息同步。Broker部署相对复杂。 每个 Broker 与 NameServer 集群中的所有节点建立长连接定时注册 Topic 信息到所有 NameServer。Producer 与 NameServer 集群中的其中一个节点建立长连接定期从 NameServer 获取Topic路由信息并向提供 Topic 服务的 Master 建立长连接且定时向 Master 发送心跳。Producer 完全无状态。Consumer 与 NameServer 集群中的其中一个节点建立长连接定期从 NameServer 获取 Topic 路由信息并向提供 Topic 服务的 Master、Slave 建立长连接且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息也可以从Slave订阅消息。
相关概念点
CommitLog消息体实际存储的地方当我们发送的任一业务消息的时候它最终会存储在commitLog上。MQ在Broker进行集群部署这里为也简洁不涉及主从部分时同一业务消息只会落到集群的某一个Broker节点上。而这个Broker上的commitLog就会存储所有Topic路由到它的消息当消息数据量到达1个G后会重新生成一个新的commitLog。Topic消息主题表示一类消息的逻辑集合。每条消息只属于一个TopicTopic中包含多条消息是MQ进行消息发送订阅的基本单位。属于一级消息类型偏重于业务逻辑设计。Tag消息标签二级消息类型每一个具体的消息都可以选择性地附带一个Tag用于区分同一个Topic中的消息类型Queue实际上Topic更像是一个逻辑概念供我们使用在源码层级看Topic以Queue的形式分布在多个Broker上一个topic往往包含多条Queue消费组及其ID表示一类Producer或Consumer这类Producer或Consumer通常生产或消费同应用域的消息且消息生产与消费的逻辑一致。每个消费组可以定义全局维一的GroupID来标识由它来代表消费组。不同的消费组在消费时互相隔离不会影响彼此的消费位点计算。
方案对比
影子Topic的方案
新建一系列新的Topic来处理隔离灰度的消息 例如对于TOPIC_ORDER会创建TOPIC_ORDER_GRAY来给灰度环境使用。 发送方在发送时对灰度的消息写入到影子Topic中。消费时灰度环境只使用灰度的分组订阅灰度的Topic。 Tag的方案
发送方在发送时对灰度环境产生的消息的Tag加灰度标识。消费方每次灰度版本发布时只订阅灰度Tag的消息正常的版本订阅非灰度的Tag。 UserProperty的方案
发送方在发送时对灰度环境产生的消息的UserProperty加灰度标识。消费方的客户端需要进行改写根据UserProperty来过滤正常的版本会跳过这类消息灰度的版本会处理灰度消息。流程与灰度Tag差不错
影子Group的方案
发送消息灰度的服务发送带有灰度标识的消息消费消息灰度服务只消费灰度标识的消息
灰度分区的方案
发送者注册时会检测当前将是不是灰度节点灰度节点MQ注册的clientId会添加标识消费者订阅时会检测当前节点是不是灰度节点灰度节点会走灰度的Queue分配策略 方案对比
方案优点缺点成本影子Topic的方案使用独立的两个TOPIC,订阅关系一致改造比较容易有临界问题灰度切换生产时有可能会漏掉消息同时需要根据生产、消费者关系维护对应的订阅关系改造数据需求需要维护两套消费组和TOPC,有维护成本影子Group的方案使用独立的两个GROUP订阅关系一致有临界问题需要修改生产者、订阅者、DevOps改动范围较大正常节点和灰度节点都需要知道当前服务的灰度状态来做出对应的处理需要维护两套GROUP的关系维护成本高Tag的方案通过MQ提供的tag机制过滤可以保证灰度的消息只会被灰度节点消费改造简单如果Tag参与的业务过滤不适合该方案如果没有灰度节点订阅关系不一致会出现消息丢失无UserProperty的方案同上同上同上灰度分区的方案订阅关系一致不需要额外维护多一套TOPIC、GROUP;无比较适合公司的一个运营模式无
灰度分区方案设计
灰度分区主要基于以下几点
DevOps灰度服务的Pod容器内部会有 CANARY_RELEASEtrue 的环境变量。MQ客户端心跳上报源码中RocketMQ客户端启动时会想向所有产生订阅关系的broker发送心跳心跳中带有clientId,该值主要由实例名、容器ip等组成可以利用canary环境变量做一层额外的注入MQ客户端重平衡源码中每隔20秒/客户端上下线都会触发一次客户端重平衡我们可以自定义该策略加入灰度分区平衡逻辑来实现灰度分区和正常分区的订阅MQ客户端发送方源码中RocketMQ发送方每次发送消息都会轮询队列发送同时加入重试和故障规避的策略可以通过重写该类来做扩展。
适配只有部分灰度的情况所做的功能扩展
基于ThreadLocal来实现消费和发送的链路标识传递 拦截HTTP请求通过拦截http请求识别请求是否灰度基于ThreadLocal实现线程传递拦截MQ消息消费通过拦截消费者消息识别UserProperty识别存在灰度标识更新到ThreadLocal中拦截MQ生产者发送消息获取ThreadLocal存储的当前线程的UserProperty标识,写入到发送消息的UserProperty中 MQ客户端发送方检测到环境和线程链路变量做出对应的发送策略从而来实现灰度消息通过正常应用后发送MQ消息能被下级监听的灰度应用接收
消费者无灰度 消费者有灰度 改造流程
生产者改造逻辑
无论何时只要发送方自己当前环境是灰度(CANARY_RELEASEtrue)或者当前是灰度链路则会最后两个分区作为灰度队列,否则选取其他分区发送根据RocketMQ的源码,自定义发送策略即可实现。 import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.latency.LatencyFaultTolerance;
import org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;/*** Custom fault strategy*/
public class CustomMQFaultStrategy {private final InternalLogger log ClientLogger.getLog();private final LatencyFaultToleranceString latencyFaultTolerance new LatencyFaultToleranceImpl();private boolean sendLatencyFaultEnable false;private long[] latencyMax {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};private long[] notAvailableDuration {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};private final ConcurrentHashMapTopicPublishInfo, TopicPublishInfoCache topicPublishInfoCacheTable new ConcurrentHashMap();public long[] getNotAvailableDuration() {return notAvailableDuration;}public void setNotAvailableDuration(final long[] notAvailableDuration) {this.notAvailableDuration notAvailableDuration;}public long[] getLatencyMax() {return latencyMax;}public void setLatencyMax(final long[] latencyMax) {this.latencyMax latencyMax;}public boolean isSendLatencyFaultEnable() {return sendLatencyFaultEnable;}public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {this.sendLatencyFaultEnable sendLatencyFaultEnable;}private TopicPublishInfoCache checkCacheChanged(TopicPublishInfo topicPublishInfo) {if (topicPublishInfoCacheTable.containsKey(topicPublishInfo)) {return topicPublishInfoCacheTable.get(topicPublishInfo);}synchronized (this) {TopicPublishInfoCache cache new TopicPublishInfoCache();ListMessageQueue canaryQueues MessageStorage.getCanaryQueues(topicPublishInfo.getMessageQueueList());ListMessageQueue normalQueues MessageStorage.getNormalQueues(topicPublishInfo.getMessageQueueList());Collections.sort(canaryQueues);Collections.sort(normalQueues);cache.setCanaryQueueList(canaryQueues);cache.setNormalQueueList(normalQueues);topicPublishInfoCacheTable.putIfAbsent(topicPublishInfo, cache);}return topicPublishInfoCacheTable.get(topicPublishInfo);}/*** Queue selection strategy*/public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {ListMessageQueue messageQueueList tpInfo.getMessageQueueList();TopicPublishInfoCache topicPublishInfoCache checkCacheChanged(tpInfo);if (MessageStorage.isCanaryRelease() || MessageStorage.isThreadCanaryRelease()) {MessageQueue messageQueue selectDefaultMessageQueue(tpInfo, lastBrokerName, topicPublishInfoCache.getCanaryQueueList());log.debug(canary context,send message to canary queue:{}, messageQueue.getBrokerName() messageQueue.getQueueId());return messageQueue;} else {if (this.sendLatencyFaultEnable) {try {int index tpInfo.getSendWhichQueue().incrementAndGet();int size topicPublishInfoCache.getNormalQueueList().size();for (int i 0; i size; i) {int pos Math.max(Math.abs(index) % size, 0);MessageQueue mq topicPublishInfoCache.getNormalQueueList().get(pos);if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {return mq;}}final String notBestBroker latencyFaultTolerance.pickOneAtLeast();int writeQueueNums tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums 0) {final MessageQueue mq tpInfo.selectOneMessageQueue();if (!topicPublishInfoCache.getCanaryQueueList().contains(mq)) {if (notBestBroker ! null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;}} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error(Error occurred when selecting message queue, e);}}return selectDefaultMessageQueue(tpInfo, lastBrokerName, topicPublishInfoCache.getNormalQueueList());}}/*** Default message queue selection strategy** param topicPublishInfo* param lastBrokerName* param queues* return {link MessageQueue }*/private MessageQueue selectDefaultMessageQueue(final TopicPublishInfo topicPublishInfo, final String lastBrokerName,ListMessageQueue queues) {ThreadLocalIndex sendWhichQueue topicPublishInfo.getSendWhichQueue();int size queues.size();if (lastBrokerName ! null) {for (int i 0; i size; i) {int index sendWhichQueue.incrementAndGet();int pos Math.max(Math.abs(index) % size, 0);MessageQueue mq queues.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}}int i sendWhichQueue.incrementAndGet();int res Math.max(Math.abs(i) % size, 0);log.debug(selectDefaultMessageQueue, lastBrokerName:{}, res:{}, lastBrokerName, topicPublishInfo.getMessageQueueList().get(res));return queues.get(res);}public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {long duration computeNotAvailableDuration(isolation ? 30000 : currentLatency);this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}}private long computeNotAvailableDuration(final long currentLatency) {for (int i latencyMax.length - 1; i 0; i--) {if (currentLatency latencyMax[i]) {return this.notAvailableDuration[i];}}return 0;}/*** Cache of topic publish info*/private static class TopicPublishInfoCache {/*** Grayscale message queue list*/private ListMessageQueue canaryQueueList;private ListMessageQueue normalQueueList;public ListMessageQueue getCanaryQueueList() {return canaryQueueList;}public void setCanaryQueueList(ListMessageQueue canaryQueueList) {this.canaryQueueList canaryQueueList;}public ListMessageQueue getNormalQueueList() {return normalQueueList;}public void setNormalQueueList(ListMessageQueue normalQueueList) {this.normalQueueList normalQueueList;}}
}消费者改造逻辑 其实最大的问题在于消费方如何动态的感知灰度的状态流转这也是产生之前灰度分区方案的临界问题的根本原因。但是通过源码的深入探索发现其实我们可以通过改造ClientId和自定义负载均衡策略来实现 RocketMQ客户端启动的时候会构建本地客户端id(包括实例名、ip名等)然后向broker注册自己。我们可以通过DevOps注入的环境变量CANARY_RELEASE来做改造即灰度服务clientId后面追加canary表示default服务后面追加default标识
消费者自定义负载均衡 import com.alibaba.fastjson.JSON;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;public class CanaryAllocateMessageQueueStrategyImpl implements AllocateMessageQueueStrategy {private final InternalLogger log ClientLogger.getLog();Overridepublic ListMessageQueue allocate(String consumerGroup, String currentCID, ListMessageQueue mqAll, ListString cidAll) {log.info(consumerGroup:{} currentCID:{} cidAll:{},consumerGroup,currentCID, JSON.toJSONString(cidAll));if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return Collections.emptyList();}if (mqAll.stream().anyMatch(mq - mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {return allocateByAvg(consumerGroup, currentCID, mqAll, cidAll);}if (!MessageStorage.hasCanaryRelease(cidAll)) {ListMessageQueue allocate allocateByAvg(consumerGroup, currentCID, mqAll, cidAll);return allocate;}if (MessageStorage.allCanaryRelease(cidAll)) {ListMessageQueue messageQueues this.balanceAllocate(consumerGroup, currentCID, mqAll, cidAll);log.info([canary allocate]: group:{} sub topic:{} has all canary release client,maybe the sub is new,use the default avg strategy.\n current cid:{}\n allocate total {} message queue\n result:\n{},consumerGroup,mqAll.get(0).getTopic(),messageQueues.size(),currentCID,MessageStorage.joinMessageQueue(messageQueues));return messageQueues;}ListString canaryCids MessageStorage.getCanaryCids(cidAll);ListString normalCids MessageStorage.getNormalCids(cidAll);ListMessageQueue canaryQueues MessageStorage.getCanaryQueues(mqAll);ListMessageQueue normalQueues MessageStorage.getNormalQueues(mqAll);Collections.sort(canaryCids);Collections.sort(normalCids);Collections.sort(normalQueues);Collections.sort(canaryQueues);ListMessageQueue result null;if (canaryCids.contains(currentCID)) {result allocateByAvg(consumerGroup, currentCID, canaryQueues, canaryCids);} else {result allocateByAvg(consumerGroup, currentCID, normalQueues, normalCids);}return result;}/*** param consumerGroup* param currentCID* param mqAll* param cidAll* return {link List }{link MessageQueue }*/private ListMessageQueue allocateByAvg(String consumerGroup, String currentCID, ListMessageQueue mqAll,ListString cidAll) {ListMessageQueue result new ArrayListMessageQueue();if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return result;}int index cidAll.indexOf(currentCID);int mod mqAll.size() % cidAll.size();int averageSize mqAll.size() cidAll.size() ? 1 : (mod 0 index mod ? mqAll.size() / cidAll.size() 1 : mqAll.size() / cidAll.size());int startIndex (mod 0 index mod) ? index * averageSize : index * averageSize mod;int range Math.min(averageSize, mqAll.size() - startIndex);for (int i 0; i range; i) {result.add(mqAll.get((startIndex i) % mqAll.size()));}return result;}Overridepublic String getName() {return CANARY;}public boolean check(String consumerGroup, String currentCID, ListMessageQueue mqAll,ListString cidAll) {if (StringUtils.isEmpty(currentCID)) {throw new IllegalArgumentException(currentCID is empty);}if (CollectionUtils.isEmpty(mqAll)) {throw new IllegalArgumentException(mqAll is null or mqAll empty);}if (CollectionUtils.isEmpty(cidAll)) {throw new IllegalArgumentException(cidAll is null or cidAll empty);}if (!cidAll.contains(currentCID)) {log.info([BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {},consumerGroup,currentCID,cidAll);return false;}return true;}public ListMessageQueue balanceAllocate(String consumerGroup, String currentCID, ListMessageQueue mqAll, ListString cidAll) {ListMessageQueue result new ArrayListMessageQueue();if (!check(consumerGroup, currentCID, mqAll, cidAll)) {return result;}int index cidAll.indexOf(currentCID);int mod mqAll.size() % cidAll.size();int averageSize mqAll.size() cidAll.size() ? 1 : (mod 0 index mod ? mqAll.size() / cidAll.size() 1 : mqAll.size() / cidAll.size());int startIndex (mod 0 index mod) ? index * averageSize : index * averageSize mod;int range Math.min(averageSize, mqAll.size() - startIndex);for (int i 0; i range; i) {result.add(mqAll.get((startIndex i) % mqAll.size()));}return result;}
}MQ注册ClientID修改 public String buildMQClientId() {StringBuilder sb new StringBuilder();sb.append(this.getClientIP());sb.append();sb.append(this.getInstanceName());if (!UtilAll.isBlank(this.unitName)) {sb.append();sb.append(this.unitName);}//The key is hereif (MessageStorage.isCanaryRelease()) {sb.append(CustomCommonConstant.CANARY_RELEASE_PREFIX);} else {sb.append(default);}if (this.enableStreamRequestType) {sb.append();sb.append(RequestType.STREAM);}return sb.toString();}常量配置类
public class CustomCommonConstant {/*** Number of grayscale queues*/public static final Integer GRAYSCALE_QUEUE_SIZE 2;/*** Grayscale client identification*/public static final String CANARY_RELEASE_PREFIX canary;/*** Thread identification*/public static final String THREAD_CANARY_RELEASE canary.release;
}链路信息传递BasicMDC类 import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;public class BasicMDC {final ThreadLocalMapString, String copyOnThreadLocal new ThreadLocal();private static final int WRITE_OPERATION 1;private static final int MAP_COPY_OPERATION 2;final ThreadLocalInteger lastOperation new ThreadLocal();private Integer getAndSetLastOperation(int op) {Integer lastOp (Integer)this.lastOperation.get();this.lastOperation.set(op);return lastOp;}private boolean wasLastOpReadOrNull(Integer lastOp) {return lastOp null || lastOp 2;}private MapString, String duplicateAndInsertNewMap(MapString, String oldMap) {MapString, String newMap Collections.synchronizedMap(new HashMap());if (oldMap ! null) {synchronized(oldMap) {newMap.putAll(oldMap);}}this.copyOnThreadLocal.set(newMap);return newMap;}public void put(String key, String val) throws IllegalArgumentException {if (key null) {throw new IllegalArgumentException(key cannot be null);} else {MapString, String oldMap (Map)this.copyOnThreadLocal.get();Integer lastOp this.getAndSetLastOperation(1);if (!this.wasLastOpReadOrNull(lastOp) oldMap ! null) {oldMap.put(key, val);} else {MapString, String newMap this.duplicateAndInsertNewMap(oldMap);newMap.put(key, val);}}}public void remove(String key) {if (key ! null) {MapString, String oldMap (Map)this.copyOnThreadLocal.get();if (oldMap ! null) {Integer lastOp this.getAndSetLastOperation(1);if (this.wasLastOpReadOrNull(lastOp)) {MapString, String newMap this.duplicateAndInsertNewMap(oldMap);newMap.remove(key);} else {oldMap.remove(key);}}}}public void clear() {this.lastOperation.set(1);this.copyOnThreadLocal.remove();}public String get(String key) {MapString, String map (Map)this.copyOnThreadLocal.get();return map ! null key ! null ? (String)map.get(key) : null;}public MapString, String getPropertyMap() {this.lastOperation.set(2);return (Map)this.copyOnThreadLocal.get();}public SetString getKeys() {MapString, String map this.getPropertyMap();return map ! null ? map.keySet() : null;}public MapString, String getCopyOfContextMap() {MapString, String hashMap (Map)this.copyOnThreadLocal.get();return hashMap null ? null : new HashMap(hashMap);}public void setContextMap(MapString, String contextMap) {this.lastOperation.set(1);MapString, String newMap Collections.synchronizedMap(new HashMap());newMap.putAll(contextMap);this.copyOnThreadLocal.set(newMap);}
}MessageStorage类 import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;/*** Message storage*/
public class MessageStorage {private static final InternalLogger log ClientLogger.getLog();private static BasicMDC mdcUtils;static {mdcUtils new BasicMDC();}/*** Determine whether there is a grayscale client** param cidAll* return*/public static boolean hasCanaryRelease(ListString cidAll) {return !getCanaryCids(cidAll).isEmpty();}/*** Determine if all are grayscale clients** param cidAll* return*/public static boolean allCanaryRelease(ListString cidAll) {ListString canaryCids getCanaryCids(cidAll);return canaryCids.size() cidAll.size();}/*** Connect the message queue into a string** param messageQueues* return*/public static String joinMessageQueue(ListMessageQueue messageQueues) {return messageQueues.stream().map(mq - mq.getBrokerName() : mq.getQueueId()).collect(Collectors.joining(, ));}/*** Get the list of grayscale clients** param cidAll* return*/public static ListString getCanaryCids(ListString cidAll) {return cidAll.stream().filter(cid - cid.contains(CustomCommonConstant.CANARY_RELEASE_PREFIX)).collect(Collectors.toList());}/*** Get a list of non grayscale clients** param cidAll* return*/public static ListString getNormalCids(ListString cidAll) {return cidAll.stream().filter(cid - !cid.contains(CustomCommonConstant.CANARY_RELEASE_PREFIX)).collect(Collectors.toList());}/*** Get the list of grayscale queues** param mqAll* return*/public static ListMessageQueue getCanaryQueues(ListMessageQueue mqAll) {Collections.sort(mqAll);log.info(topic:{} reBalance, has canary release client, allocate {} message queue by canary release strategy.\n, JSON.toJSONString(mqAll));int size mqAll.size();if (size CustomCommonConstant.GRAYSCALE_QUEUE_SIZE) {ListMessageQueue lastTwo mqAll.subList(size - CustomCommonConstant.GRAYSCALE_QUEUE_SIZE, size);return new ArrayList(lastTwo);} else {return new ArrayList();}}/*** Get non grayscale queue list** param mqAll* return*/public static ListMessageQueue getNormalQueues(ListMessageQueue mqAll) {Collections.sort(mqAll);log.info(topic:{} reBalance, has normal release client, allocate {} message queue by canary release strategy.\n, JSON.toJSONString(mqAll));int size mqAll.size();if (size CustomCommonConstant.GRAYSCALE_QUEUE_SIZE) {ListMessageQueue lastTwo mqAll.subList(0, size - 2);return new ArrayList(lastTwo);} else {return new ArrayList(mqAll);}}private static volatile String canaryRelease null;/*** Determine whether it is a grayscale client** return*/public static boolean isCanaryRelease() {return Boolean.parseBoolean(getCanaryRelease());}/*** Determine whether it is a grayscale client** return boolean*/public static boolean isThreadCanaryRelease() {String data mdcUtils.get(CustomCommonConstant.THREAD_CANARY_RELEASE);return Boolean.parseBoolean(data);}/*** Set MDC** param key* param value*/public static void setMDC(String key, String value) {mdcUtils.put(key, value);}/*** param key* return {link String }*/public static String getMdcKey(String key) {return mdcUtils.get(key);}/*** Clear MDC*/public static void clearMDC() {mdcUtils.clear();}/*** Replacement strategy** return {link AllocateMessageQueueStrategy }*/public static AllocateMessageQueueStrategy getAllocateMessageQueueAveragely(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
// if (isCanaryRelease()) {
// return new CanaryAllocateMessageQueueStrategyImpl();
// } else {
// return allocateMessageQueueStrategy;
// }return new CanaryAllocateMessageQueueStrategyImpl();}private static String getCanaryRelease() {if (Objects.isNull(canaryRelease)) {synchronized (MessageStorage.class) {if (Objects.nonNull(canaryRelease)) {return canaryRelease;}//CANARY_RELEASEString tmpCanaryRelease System.getProperty(canary.release);if (Objects.isNull(tmpCanaryRelease)) {tmpCanaryRelease false;}canaryRelease tmpCanaryRelease;return canaryRelease;}}return canaryRelease;}
}拦截配置
消费者拦截
Component
Aspect
public class ConsumerAOP {Pointcut(execution(public * org.apache.rocketmq.client.consumer.listener.*.*(..)))public void aspectConsumer() {}Before(aspectConsumer())public void doBefore(JoinPoint joinPoint) {Object[] args joinPoint.getArgs();for (Object arg : args) {if (arg instanceof List?) {ListMessageExt messageExtList (ListMessageExt) arg;for (MessageExt messageExt : messageExtList) {String threadCanaryRelease messageExt.getProperty(CustomCommonConstant.THREAD_CANARY_RELEASE);if(StringUtils.isEmpty(threadCanaryRelease)){threadCanaryRelease false;}MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);}}}}
}Component
Aspect
public class ListenerAOP {Pointcut(execution(public * org.apache.rocketmq.spring.core.*.*(..)))public void aspectListener(){}Before(aspectListener())public void doBefore(JoinPoint joinPoint){Object[] args joinPoint.getArgs();for (Object arg : args) {if (arg instanceof MessageExt){MessageExt messageExt (MessageExt) arg;String threadCanaryRelease messageExt.getProperty(CustomCommonConstant.THREAD_CANARY_RELEASE);if(StringUtils.isEmpty(threadCanaryRelease)){threadCanaryRelease false;}MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);}}}
}生产者拦截
Component
Aspect
public class ProducerAOP {Pointcut(execution(public * org.apache.rocketmq.client.producer.*.*(..)))public void aspectProducer() {}Before(aspectProducer())public void doBefore(JoinPoint joinPoint) {Object[] args joinPoint.getArgs();for (Object arg : args) {if (arg instanceof Message) {Message message (Message) arg;String threadCanaryRelease MessageStorage.getMdcKey(CustomCommonConstant.THREAD_CANARY_RELEASE);if(StringUtils.isEmpty(threadCanaryRelease)){threadCanaryRelease false;}message.putUserProperty(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);}}}
}请求拦截
Slf4j
Component
WebFilter(urlPatterns /*, filterName requestFilter)
Order(Integer.MIN_VALUE)
RequiredArgsConstructor(onConstructor __(Autowired))
public class ForwardFilter implements Filter {SneakyThrowsOverridepublic void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) {HttpServletRequest req (HttpServletRequest) servletRequest;String threadCanaryRelease req.getHeader(THREAD_CANARY_RELEASE);log.info(attributeNames:{}, JSON.toJSONString(req.getHeaderNames()));if(Objects.nonNull(threadCanaryRelease)){MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, true);}filterChain.doFilter(servletRequest, servletResponse);}
}验证过程
消费者未启动灰度订阅 消费者灰度订阅 验证消息链路
发送端
Slf4j
RestController
RequestMapping(/producer)
public class ProducerMessageContrlller {Autowiredprivate RocketMQTemplate rocketMQTemplate;PostMapping(/send)public void send(){boolean canaryRelease MessageStorage.isCanaryRelease();String body String.format(发送消息,环境:%s, canaryRelease ? 灰度 : 正式);
// for (Integer i 0; i 100; i) {log.info(发送消息,环境:{}, canaryRelease ? 灰度 : 正式);rocketMQTemplate.convertAndSend(TEST-DATA-MSG, body);
// }}
}消费端
Slf4j
Service
RocketMQMessageListener(topic TEST-DATA-MSG, consumerGroup test-consumer-group)
public class MyConsumer1 implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt message) {boolean canaryRelease MessageStorage.isCanaryRelease();MapString, String properties message.getProperties();log.info(received message: 【{}】 环境:【{}】 配置参数【{}】 , new String(message.getBody()),canaryRelease ? 灰度 : 正式,JSON.toJSONString(properties));}
}
发送灰度消息