宽带动态ip如何做网站访问,readmore wordpress,徐州网络科技有限公司,吴江公司注册这是本人学习的总结#xff0c;主要学习资料如下
马士兵教育rocketMq官方文档 目录 1、Overview2、验证消息3、查找路由4、选择消息发送队列4.1、选择队列的策略4.2、源码阅读4.2.1、轮询规避4.2.2、故障延迟规避4.2.2.1、计算规避时间4.2.2.2、选择队列 4.2.3、ThreadLocal的…这是本人学习的总结主要学习资料如下
马士兵教育rocketMq官方文档 目录 1、Overview2、验证消息3、查找路由4、选择消息发送队列4.1、选择队列的策略4.2、源码阅读4.2.1、轮询规避4.2.2、故障延迟规避4.2.2.1、计算规避时间4.2.2.2、选择队列 4.2.3、ThreadLocal的使用 5、发送消息5.1、客户端建立的时间 1、Overview
消息发送主要可以分成下面四个步骤。
验证消息查找路由选择队列消息发送
之后从源码查看四个步骤的具体内容。
我们建立一个DefaultMQProducer之后调用DefaultMQProducer#send()方法就可发送信息。
查看send()的代码最终会来到DefaultMQProducerImpl#sendDefaultImpl()我们从这里开始看源码。
2、验证消息
发送前必然验证一下消息。
主要是检验消息的状态一些必要的值不能为空等。
this.makeSureStateOK();
// 1、检查消息
Validators.checkMessage(msg, this.defaultMQProducer);公司内部想设置一些新的规则用来发送前拦截信息就适合放在checkMessage()里。
这部分没有太多内容。 3、查找路由
所谓的路由是指可用的Broker的信息包括地址具体的消息队列等。
下面这一句获取到路由。
TopicPublishInfo topicPublishInfo this.tryToFindTopicPublishInfo(msg.getTopic());在DefaultMQProducer内部缓存这路由信息维护在ConcurrentHashMapString, TopicPublishInfo中
private final ConcurrentMapString/* topic */, TopicPublishInfo topicPublishInfoTable new ConcurrentHashMapString, TopicPublishInfo();在tryToFindTopicPublishInfo()中会先检查路由信息是否存在不存在还需要从NameServer中获取路由列表。
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);更新的时候会加上一个ReentrantLock更新结束后释放。
获取到路由信息后开始选择队列发送消息。 4、选择消息发送队列
4.1、选择队列的策略
得到路由信息后就开始选择消息队列发送信息。
选择队列有两种策略
轮询规避轮询选择队列。如果上次发送消息失败那就消息需要重新发送这时就需要规避掉上次发送失败的队列寻找下一个队列发送。故障延迟策略在选择队列发送时根据以往发送时长判断该队列的Broker是否可用。对于发送失败的BrokerProducer会规避该Broker一段时间。
这是发送消息的流程图。
假设我们的Broker是集群有两个Broker。消息会选择其中一个Broker发送消息如果失败就重试直到发送成功或者超过重试次数。 4.2、源码阅读
这里会探索源码如何实现这两种队列选择策略。
选择队列的入口在DefaultMQProducerImpl#sendDefaultImpl - this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); 从入口进入查看代码最终在MQFaultStrategy#selectOneMessageQueue代码通过sendLatencyFaultEnable这个字段来选择不同的选择策略。 4.2.1、轮询规避
这是轮询规避的源码。 其中lastBrokerName是上一次消息发送时选择的broker。这代表该消息上一次发送失败了所以记录着上一次失败的broker以在这次选择Broker时规避他。
所以lastBrokerNamenull时该消息是第一次发送不需要规避直接随机选择一个队列发送。
如果上一次发送失败则开始轮询选择一个队列保证这个新选出的队列和上一个不同后就可以返回。 4.2.2、故障延迟规避
4.2.2.1、计算规避时间
故障延迟规避策略需要记录发送时间并计算。在看选择Broker的代码时需要看看源码如何记录发送时间并计算出规避时间的。
计算规避时间的代码在this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);。这时消息正常发送会调用一次这个方法如果出现异常在catch快也会调用这个方法计算规避时间。
进入这个方法代码如下。
需要注意isolationtrue时表示消息发送出现异常这时便认为延迟时长是30000ms。
同时也可以看到sendLatencyFaultEnabletrue表示开启故障规避策略这种情况才需要计算规避时间。选择Broker时也是通过这个属性判断使用过故障规避还是轮询规避。
规避时间的计算比较简单阿里根据自己的经验设置了一个对照表来计算时间如下图所示。比如延迟是550ms以内的Broker不用规避延迟在550~1000ms的需要规避30s。 这里跳过计算规避时间的代码细节进入下一行代码this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);。
代码如下它其实就是将不可用的Broker维护在faultItemTable中并且记录着解禁时间。以后选择Broker会通过这个集合查看Broker是否可用。
4.2.2.2、选择队列
我们回到选择Broker的代码MQFaultStrategy#selectOneMessageQueue下图是相关代码。 它的大概流程是轮询队列如果可用就返回。实在找不到可用的就随机选择一个Broker发送。
它通过latencyFaultTolerance.isAvailable(mq.getBrokerName())判断队列是否可用里面实际就是通过前面讲到的faultItemTable来查看队列是否可用。 4.2.3、ThreadLocal的使用
在选择队列时无论是轮询规避还是故障延迟规避都需要循环遍历messageQueue找到适合的queue发送信息。
获取下标的方式用到了ThreadLocal。如下图所示sendWhichQueue本质上就是一个ThreadLocalInteger对象。 生产者发送信息时可能会有多个线程同时发信息。
这些线程发送信息时应该各自维护一个消息队列的下标这样每个线程发送信息时才会比较均匀地向每个队列都发送信息。
另外这些线程发送信息时可能会指定消息队列的id所以线程各自维护一个消息队列的下标是很有必要的。
这个场景就很适合ThreadLocal选择消息队列时用ThreadLocal来维护下标。 5、发送消息
5.1、客户端建立的时间
客户端发送消息时建立HTTP连接是在send()方法中而不是在start()方法中。
站在设计者的角度需要考虑到开发者在start()方法后可能还需要过一段时间才会真正发送信息甚至不发信息。
那么建立HTTP连接放在start()就比较浪费资源所以建立HTTP连接放在了send()方法中。