当前位置: 首页 > news >正文

广东品牌网站建设968推广引流工具

广东品牌网站建设968,推广引流工具,小程序如何推广,文创产品有哪些RocketMQ 存储基础回顾: 源码分析RocketMQ之CommitLog消息存储机制 本文主要从源码的角度分析 Rocketmq 消费队列 ConsumeQueue 物理文件的构建与存储结构,同时分析 RocketMQ 索引文件IndexFile 文件的存储原理、存储格式以及检索方式。RocketMQ 的存储…

RocketMQ 存储基础回顾: 源码分析RocketMQ之CommitLog消息存储机制

本文主要从源码的角度分析 Rocketmq 消费队列 ConsumeQueue 物理文件的构建与存储结构,同时分析 RocketMQ 索引文件IndexFile 文件的存储原理、存储格式以及检索方式。RocketMQ 的存储机制是所有的主题消息都存储在 CommitLog 文件中,也就是消息发送是完全的顺序 IO 操作,加上利用内存文件映射机制,极大的提供的 IO 性能。消息的全量信息存放在 commitlog 文件中,并且每条消息的长度是不一样的,消息的具体存储格式如下:

如果消费者直接基于commitlog 进行消费的话,简直就是一个恶梦,因为不同的主题的消息完全顺序的存储在 commitlog 文件中,根据主题去查询消息,不得不遍历整个 commitlog 文件,显然作为一款消息中间件这是绝不允许的。RocketMQ 的ConsumeQueue 文件就是来解决消息消费的。首先我们知道,一个主题,在 broker 上可以分成多个消费对列,默认为4个,也就是消费队列是基于主题+broker。那 ConsumeQueue 中当然不会再存储全量消息了,而是存储为定长(20字节,8字节commitlog 偏移量+4字节消息长度+8字节tag hashcode),消息消费时,首先根据 commitlog offset 去 commitlog 文件组(commitlog每个文件1G,填满了,另外创建一个文件),找到消息的起始位置,然后根据消息长度,读取整条消息。但问题又来了,如果我们需要根据消息ID,来查找消息,consumequeue 中没有存储消息ID,如果不采取其他措施,又得遍历 commitlog文件了,为了解决这个问题,rocketmq 的 index 文件又派上了用场。

接下来,本文重点关注 ConsumeQueue、Index 文件是如何基于 Commitlog 构建的,并且根据 ConsumeQueue、Index 文件如何查找消息。

根据 commitlog 文件生成 consumequeue、index 文件,主要同运作于两种情况:

1、运行中,发送端发送消息到 commitlog文件,此时如何及时传达到 consume文件、Index文件呢?

2、broker 启动时,检测 commitlog 文件与 consumequeue、index 文件中信息是否一致,如果不一致,需要根据 commitlog 文件重新恢复 consumequeue 文件和 index 文件。

1、commitlog、consumequeue、index 文件同步问题

RocketMQ 采用专门的线程来根据 comitlog offset 来将 commitlog 转发给ConsumeQueue、Index。其线程为DefaultMessageStore$ReputMessageService

1.1 核心属性

  • private volatile long reputFromOffset = 0
    reputFromOffset ,从 commitlog 开始拉取的初始偏移量。

1.2 run方法

每处理一次 doReput 方法,休眠1毫秒,基本上是马不停蹄的在转发 commitlog 中的内容到 consumequeue、index。

接下来重点查看 doReput 方法。

private void doReput() {for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {break;}SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);        // @1if (result != null) {try {this.reputFromOffset = result.getStartOffset();for (int readSize = 0; readSize < result.getSize() && doNext; ) {DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);    // @2 int size = dispatchRequest.getMsgSize();if (dispatchRequest.isSuccess()) {if (size > 0) {DefaultMessageStore.this.doDispatch(dispatchRequest);                                                                       // @3 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());}this.reputFromOffset += size;readSize += size;if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());}} else if (size == 0) {this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext = false;}}

代码@1,根据 offset 从 commitlog 找到一条消息,如果找不到,退出此次循环,doReput方法跳出,此处从 commitlog 文件中取出消息的逻辑,在下文会重点分析,故在此暂时跳过。

先浏览一下 SelectMappedBufferResult

代码@2:尝试构建转发请求对象 DispatchRequest ,我大概浏览了一下 commitLog.checkMessageAndReturnSize,主要是从Nio ByteBuffer中,根据 commitlog 消息存储格式,解析出消息的核心属性:

// 消息主题
private final String topic; 
// 消息队列
private final int queueId; 
// commitlog中的偏移量
private final long commitLogOffset;
// 消息大小
private final int msgSize; // tagsCode
private final long tagsCode;
// 消息存储时间
private final long storeTimestamp; 
//消息在消费队列的offset
private final long consumeQueueOffset; 
// 存放在消息属性中的keys: PROPERTY_KEYS = "KEYS"
private final String keys; 
// 是否成功
private final boolean success; 
// 消息唯一键 "UNIQ_KEY"
private final String uniqKey; 
// 系统标志
private final int sysFlag;
// 事务pre消息偏移量
private final long preparedTransactionOffset; 
// 属性
private final Map<String, String> propertiesMap; 

代码@3:转发DistpachRequest。

根据实现类,consumequeue,index 分别对应 CommitLogDispatcherBuildConsumeQueue 与 CommitlogDispatcherBuildIndex。

http://www.hkea.cn/news/260561/

相关文章:

  • 如何看网站是用什么语言做的关键词排名是由什么决定的
  • 政府网站建设招标书百度网站收录
  • 已经有了网站怎么做推广哈尔滨关键词优化报价
  • 网站建设与管理作业镇江推广公司
  • 域名申请好后 如何建设网站网站权重划分
  • 佛山百度网站快速优化网络营销推广工具
  • 建一个网站需要哪些人广州seo网站推广公司
  • 建设银行etc官方网站搜索引擎优化的七个步骤
  • 做网站需要花钱吗海南百度推广运营中心
  • 做的网站显示图片很慢百度运营公司
  • 青州哪里做网站公司推广渠道
  • 网站面包屑导航怎么做的网推接单平台有哪些
  • 宜昌网站建设兼职百度关键词排名软件
  • 如何让百度快照找到自己的网站营销策划方案ppt模板
  • php网站超市广告软文是什么意思
  • b2c跨境电商宣城网站seo
  • 网站建设一流公司免费网站开发平台
  • 网站开发模式名词外贸谷歌优化
  • 网站素材 下载产品推广渠道
  • 网站后台维护怎么做seo专员工资一般多少
  • 中国网站推广黄页名录微商推广哪家好
  • 哈尔滨网站开发电话电商培训基地
  • 如何用php数据库做网站搜索seo优化托管
  • 中国城乡建设部人力网站首页优化落实疫情防控
  • 做网站到底能不能赚钱网络优化工程师前景
  • 乌镇网站建设标书百度站长工具域名查询
  • 制作公司网站价格腾讯广告代理商加盟
  • 大学生活动网站开发文案苏州seo门户网
  • 阿里云认证网站建设题库seo助理
  • 凤岗网站仿做靠谱seo外包定制