建站 哪个网站系统好用,淮安企业网站制作,二维码生成器网站源码,作图软件免费先看有哪些send方法 首先说红圈的
有3个红圈。归类成3种发送方式。假设前提条件#xff0c;发送的topic#xff0c;有3个broker#xff0c;每个broker总共4个write队列#xff0c;总共有12个队列。
普通发送。负载均衡12个队列。指定超时时间指定MessageQueue,发送#…先看有哪些send方法 首先说红圈的
有3个红圈。归类成3种发送方式。假设前提条件发送的topic有3个broker每个broker总共4个write队列总共有12个队列。
普通发送。负载均衡12个队列。指定超时时间指定MessageQueue,发送指定超时时间指定selector器指定特定参数指定超时时间。一般用于局部有序比如相同userId的到同一个队列
默认超时时间时3秒
再说蓝圈
sendDefaultImpl 负载均衡的方式选择队列。然后调sendKernelImplsendSelectImpl 指定队列selector和arg的方式选择队列。然后调sendKernelImplsendKernelImpl 最核心的方式。这里已经明确队列做真实的消息发送
很明显只需要简单解读sendDefaultImpl和sendSelectImpl如何选择队列。然后重点在于查看sendKernelImpl方法实现
sendDefaultImpl选择队列分析
先看源码
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID random.nextLong();long beginTimestampFirst System.currentTimeMillis();long beginTimestampPrev beginTimestampFirst;long endTimestamp beginTimestampFirst;TopicPublishInfo topicPublishInfo this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo ! null topicPublishInfo.ok()) {boolean callTimeout false;MessageQueue mq null;Exception exception null;SendResult sendResult null;int timesTotal communicationMode CommunicationMode.SYNC ? 1 this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times 0;String[] brokersSent new String[timesTotal];for (; times timesTotal; times) {String lastBrokerName null mq ? null : mq.getBrokerName();MessageQueue mqSelected this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected ! null) {mq mqSelected;brokersSent[times] mq.getBrokerName();try {beginTimestampPrev System.currentTimeMillis();if (times 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime beginTimestampPrev - beginTimestampFirst;if (timeout costTime) {callTimeout true;break;}sendResult this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() ! SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
第一步通过topic查找路由信息tryToFindTopicPublishInfo 先从内存中获取。内存是DefaultMQProducerImpl#topicPublishInfoTable 如果内存没有则从nameserver获取 org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String)
内存是什么时候添加的呢是有定时器任务更新的。详情看我写的文章rocketmq-push模式-消费侧重平衡-类流程图分析
第二步、设定默认重试3次包含首次,选择topic的其中一个队列 org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName null) {return selectOneMessageQueue();} else {for (int i 0; i this.messageQueueList.size(); i) {int index this.sendWhichQueue.incrementAndGet();int pos Math.abs(index) % this.messageQueueList.size();if (pos 0)pos 0;MessageQueue mq this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}
}可以发现topic对应的TopicPublishInfo维护者一个ThreadLocalIndex对象。 每个线程先会获取一个index然后对index取模得到某一个队列。 这意味着sendDefaultImpl中队列的负载均衡是线程独立的。每个线程维护着自己的index每发送一次index1。
public int incrementAndGet() {Integer index this.threadLocalIndex.get();if (null index) {index Math.abs(random.nextInt());this.threadLocalIndex.set(index);}this.threadLocalIndex.set(index);return Math.abs(index POSITIVE_MASK);}第三步、选择完MessageQueue后调用sendKernelImpl发送消息
sendSelectImpl选择队列分析
先看源码
private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);TopicPublishInfo topicPublishInfo this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo ! null topicPublishInfo.ok()) {MessageQueue mq null;try {ListMessageQueue messageQueueList mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());Message userMessage MessageAccessor.cloneMessage(msg);String userTopic NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);mq mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException(select message queue threw exception., e);}long costTime System.currentTimeMillis() - beginStartTime;if (timeout costTime) {throw new RemotingTooMuchRequestException(sendSelectImpl call timeout);}if (mq ! null) {return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException(select message queue return null., null);}}validateNameServerSetting();throw new MQClientException(No route info for this topic, msg.getTopic(), null);}org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl
第一步通过topic查找路由信息tryToFindTopicPublishInfo。分析同上 第二步通过MessageQueueSelector找出发送的MessageQueue MessageQueueSelector的实现方式可以自定义。提供了2种 SelectMessageQueueByRandom 随机一个 SelectMessageQueueByHash 根据arg的hashcode取模一个。适合局部有序
public class SelectMessageQueueByHash implements MessageQueueSelector {Overridepublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {int value arg.hashCode() % mqs.size();if (value 0) {value Math.abs(value);}return mqs.get(value);}
}第三步、选择完MessageQueue后调用sendKernelImpl发送消息
sendKernelImpl发送分析
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl 第一步、通过MessageQueue获取对应的master节点地址 第二步、设置消息的唯一id。详情看以下实现。明显是客户端生成的由于不是分布式唯一ID的创建方式有点怀疑会重复。后续查看 org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqID 第三步、对消息body做消息压缩 第四步、判断该消息是否是事务消息。给sysFlag位标志变量加标志 第五步、发送前可做一些自定义的检查CheckForbiddenHook、SendMessageHook 第六步、构建SendMessageRequestHeader requestHeader将msg的一些内容设置到header上 第七部、根据发送模式communicationMode调用不同的sendMessage方法 org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage
switch (communicationMode) {case ASYNC:Message tmpMessage msg;boolean messageCloned false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage MessageAccessor.cloneMessage(msg);messageCloned true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage MessageAccessor.cloneMessage(msg);messageCloned true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync System.currentTimeMillis() - beginStartTime;if (timeout costTimeAsync) {throw new RemotingTooMuchRequestException(sendKernelImpl call timeout);}sendResult this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync System.currentTimeMillis() - beginStartTime;if (timeout costTimeSync) {throw new RemotingTooMuchRequestException(sendKernelImpl call timeout);}sendResult this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;
}第八步、最终会调用NettyRemotingClient的发送方法 SYNC org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync ONEWAY: org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway ASYNC: org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeAsync
总结
product的发送有几种API模式其实目的都是为了选择MessageQueue
默认的发送是根据topic的队列做负载均衡的方式topicPublishInfo内部维护着ThreadLocalIndex对象做线程级别的负载均衡。而且默认都3次重试机会意味可以选择不同队列做发送指定messageQueue是调用方明确知道发送的MessageQueue这种失败不会做重试指定MessageQueueSelector等这种是通过传入的参数计算出对应的MessageQueue这种失败不会做重试适合作为局部有序的发送方式
选择好队列后就会调用org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl方法主要是构建SendMessageRequestHeader执行自定义的发送before和after的处理。 sendKernelImpl最终会调用NettyRemotingClient提供的接口分别处理SYNC、ONEWAY、ASYNC的三种模式