html后缀的网站,河北省沧州建设厅网站,wordpress 局域网 慢,开发菏泽网站建设一#xff1a;RocketMq 整体文件存储介绍 存储⽂件主要分为三个部分#xff1a; CommitLog#xff1a;存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成#xff0c;每个⽂件固定⼤⼩1G。以第⼀条消 息的偏移量为⽂件名。 ConsumerQue…一RocketMq 整体文件存储介绍 存储⽂件主要分为三个部分 CommitLog存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成每个⽂件固定⼤⼩1G。以第⼀条消 息的偏移量为⽂件名。 ConsumerQueue存储消息在CommitLog的索引。⼀个MessageQueue⼀个⽂件记录当前MessageQueue被哪些消费者组消费到了哪⼀条CommitLog。 IndexFile为了消息查询提供了⼀种通过key或时间区间来查询消息的⽅法这种通过IndexFile来查找消息的⽅法不影响发送与消费消息的主流程。
这篇文章主要介绍IndexFile的研究以rocketmq5.3.0版本作为研究。
二IndexFile的文件结构
文件整理格式如下图2-1所示 图2-1 IndexFile 文件结构图 IndexFile 文件格式 文件名以时间戳命名例如 20240301120000000表示该文件索引的消息的时间范围。 文件大小默认为 400MB可通过 maxIndexSize 配置调整。 存储路径默认在 ~/store/index 目录下。
每个 IndexFile 文件由三部分组成 1. 文件头部Header 2. 哈希槽Hash Slot区域 3. 索引条目Index Entry区域 1. 文件头部Header 字段名 长度字节 说明 beginTimestamp 8 索引文件覆盖的最小时间戳消息存储时间 endTimestamp 8 索引文件覆盖的最大时间戳消息存储时间 beginPhyOffset 8 索引文件对应的最小物理偏移量CommitLog 中的起始位置 endPhyOffset 8 索引文件对应的最大物理偏移量CommitLog 中的结束位置 hashSlotCount 4 哈希槽数量固定为 5,000,000 indexCount 4 当前已写入的索引条目数量 2. 哈希槽Hash Slot区域 哈希槽数量固定为 500 万个5,000,000每个哈希槽占 4 字节。 哈希函数对消息的 Key如 UNIQ_KEY 或 KEYS进行哈希计算得到槽位索引 slotPos abs(hash(key)) % 5000000
每个哈希槽存储的是 索引条目区域 的起始位置索引条目链表的头节点。 3. 索引条目Index Entry区域
每个索引条目占 20 字节包含以下字段 字段名 长度字节 说明 keyHash 4 消息 Key 的哈希值用于快速比对 phyOffset 8 消息在 CommitLog 中的物理偏移量 timeDiff 4 消息存储时间与文件头部 beginTimestamp 的时间差秒级 slotValue 4 下一个索引条目的位置用于解决哈希冲突的链表结构 三IndexFile 写入和查询流程
IndexFile 写入流程
---------------------
| Producer 发送消息 |
---------------------|v
---------------------
| 提取消息的 Key | -- 如 UNIQ_KEY 或 KEYS 属性
---------------------|v
---------------------
| 检查 IndexFile 容量 | -- 是否已满(indexCount indexNum)
---------------------| 是v
---------------------
| 返回 false写入失败 |
---------------------| 否v
---------------------
| 计算 Key 的哈希值 | -- keyHash indexKeyHashMethod(key)
---------------------|v
---------------------
| 计算哈希槽位置 | -- slotPos keyHash % hashSlotNum
---------------------|v
---------------------
| 计算哈希槽绝对位置 | -- absSlotPos IndexHeader.INDEX_HEADER_SIZE slotPos * hashSlotSize
---------------------|v
---------------------
| 读取哈希槽的当前值 | -- slotValue mappedByteBuffer.getInt(absSlotPos)
---------------------|v
---------------------
| 校验 slotValue 有效性 | -- 是否无效(slotValue invalidIndex || slotValue indexCount)
---------------------| 是v
---------------------
| 将 slotValue 设为无效 | -- slotValue invalidIndex
---------------------| 否v
---------------------
| 计算时间差 (timeDiff) | -- timeDiff (storeTimestamp - beginTimestamp) / 1000
---------------------|v
---------------------
| 处理 timeDiff 边界值 | -- 确保 0 timeDiff Integer.MAX_VALUE
---------------------|v
---------------------
| 计算索引条目绝对位置 | -- absIndexPos IndexHeader.INDEX_HEADER_SIZE hashSlotNum * hashSlotSize indexCount * indexSize
---------------------|v
---------------------
| 写入索引条目内容 |
| - keyHash |
| - phyOffset |
| - timeDiff |
| - slotValue (nextIndex)|
---------------------|v
---------------------
| 更新哈希槽指向新条目 | -- mappedByteBuffer.putInt(absSlotPos, indexCount)
---------------------|v
---------------------
| 更新 IndexFile 头部信息 |
| - 若 indexCount 1更新 beginPhyOffset 和 beginTimestamp |
| - 若 slotValue 无效增加 hashSlotCount |
| - 增加 indexCount |
| - 更新 endPhyOffset 和 endTimestamp |
---------------------|v
---------------------
| 返回 true写入成功 |
---------------------|v
---------------------
| IndexFile 是否已满 | -- 是 -- 创建新 IndexFile
| (文件大小 ≥ 400MB) |
---------------------
源码入口org.apache.rocketmq.store.index.IndexFile#putKey
IndexFile 查询流程
---------------------
| Consumer 根据 Key 查询 |
---------------------|v
---------------------
| 计算 Key 的哈希值 | -- keyHash Math.abs(key.hashCode())
---------------------|v
---------------------
| 计算哈希槽位置 | -- slotPos keyHash % 5,000,000
---------------------|v
---------------------
| 读取哈希槽的链表头位置 | -- slotValue mappedByteBuffer.getInt(slotPos * 4)
---------------------|v
---------------------
| 遍历链表条目 |
| while (slotValue 0)|
---------------------|v
---------------------
| 读取索引条目 |
| - keyHashRead |
| - phyOffset |
| - timeDiff |
| - nextIndex |
---------------------|v
---------------------
| 检查时间范围是否匹配 | -- storeTime beginTimestamp timeDiff * 1000
| (storeTime ∈ [begin, end]?)|
---------------------| 否|------------------ 跳过继续下一个条目| 是v
---------------------
| 比对 keyHashRead 和 keyHash |
| (是否相等) |
---------------------| 否|------------------ 跳过继续下一个条目| 是v
---------------------
| 从 CommitLog 读取实际 Key |
| (检查 Key 是否一致) |
---------------------| 否|------------------ 跳过继续下一个条目| 是v
---------------------
| 返回 phyOffset | -- 添加到结果列表
---------------------|v
---------------------
| slotValue nextIndex| -- 继续遍历下一个条目
---------------------|v
---------------------
| 遍历结束返回结果列表 |
---------------------
源码入口org.apache.rocketmq.store.index.IndexService#queryOffset
四IndexFile解决hash冲突问题思想
RocketMQ 的 IndexFile 通过 链地址法Chaining 解决哈希冲突问题其核心思想是将哈希到同一槽位的多个索引条目组织成链表结构并通过哈希槽Hash Slot与索引条目Index Entry的关联实现高效写入和查询。以下是具体实现思想及关键设计 1. 哈希冲突的背景 哈希冲突不同 Key 经过哈希函数计算后可能得到相同的哈希值导致被分配到同一个哈希槽。 问题若不处理冲突后续 Key 的索引会覆盖已有数据导致查询结果错误。 2. 解决冲突的核心思想链地址法
RocketMQ 的 IndexFile 采用 单链表 结构管理同一哈希槽下的所有冲突条目具体流程如下
(1) 写入时的链表插入 新条目插入链表头部 当新 Key 的哈希值与某槽位已有条目冲突时新条目会被插入链表头部并更新哈希槽指针指向新条目。 // 新条目的 nextIndex 指向原头节点
this.mappedByteBuffer.putInt(absIndexPos 16, slotValue);
// 更新哈希槽指针为新条目位置
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); 优势插入时间复杂度为 O(1)无需遍历链表。
(2) 查询时的链表遍历 遍历链表比对 Key 查询时从哈希槽指向的链表头节点开始依次遍历所有条目通过两次比对哈希值 实际 Key过滤冲突。 while (nextIndexToRead 0) {// 1. 读取条目内容int keyHashRead this.mappedByteBuffer.getInt(absIndexPos);long phyOffsetRead this.mappedByteBuffer.getLong(absIndexPos 4);// 2. 比对哈希值if (keyHashRead keyHash) {// 3. 从 CommitLog 读取实际 Key 比对String keyStored readKeyFromCommitLog(phyOffsetRead);if (key.equals(keyStored)) {phyOffsets.add(phyOffsetRead);}}// 4. 移动到下一个节点nextIndexToRead prevIndexRead;
} 3. 关键设计优化
(1) 哈希槽数量固定 默认 500 万个哈希槽 private static final int HASH_SLOT_NUM 5000000; // 默认槽数 目的通过大量槽位减少哈希冲突的概率使冲突链表尽可能短。 权衡槽数过多会占用更多内存但查询效率更高。
(2) 时间范围过滤 索引条目存储时间差timeDiff 每个索引条目记录消息存储时间与 IndexFile 起始时间的差值秒级查询时快速过滤掉不满足时间范围的条目。 long timeRead this.indexHeader.getBeginTimestamp() timeDiff * 1000L;
if (timeRead begin || timeRead end) {continue; // 跳过不符合时间条件的条目
} 优势减少无效条目的遍历提升查询性能。
(3) 文件滚动Rolling 按时间或大小滚动 IndexFile 文件默认大小上限为 400MB或时间跨度超过阈值时创建新文件。 目的避免单个文件过大导致链表过长同时支持按时间范围快速定位文件。
4. 示例场景
写入冲突场景 Key1: Ea#20231001123456 → 哈希值 19583063 → 槽位 18332292 Key2: FB#20231001123456 → 哈希值 19583063 → 槽位 18332292冲突 处理流程 Key1 写入槽位 18332292链表头指向 Key1。 Key2 写入时插入链表头部槽位指针更新为 Key2Key2 的 nextIndex 指向 Key1。
查询冲突场景 查询 Key: Ea#20231001123456 哈希计算定位到槽位 18332292。 遍历链表 先读取 Key2哈希值匹配但 Key 不匹配跳过。 再读取 Key1哈希值 Key 均匹配返回 phyOffset。
hash冲突代码调试示例 public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(producerGroup);producer.setNamesrvAddr(127.0.0.1:9876);producer.start();Message msg new Message(Ea, TagA , (消息1).getBytes(RemotingHelper.DEFAULT_CHARSET));msg.setKeys(20231001123456);producer.sendOneway(msg);Message msg2 new Message(FB, TagA , (消息3).getBytes(RemotingHelper.DEFAULT_CHARSET));msg2.setKeys(20231001123456);producer.sendOneway(msg2);producer.shutdown();}