12380网站建设,久久建筑网碗扣式钢管脚手架安全技术规范,现在做微信开发网站多少钱,拼多多网上购物入口RocketMQ源码深入剖析
1 RocketMQ介绍
RocketMQ 是阿里巴巴集团基于高可用分布式集群技术#xff0c;自主研发的云正式商用的专业消息中间件#xff0c;既可为分布式应用系统提供异步解耦和削峰填谷的能力#xff0c;同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠…RocketMQ源码深入剖析
1 RocketMQ介绍
RocketMQ 是阿里巴巴集团基于高可用分布式集群技术自主研发的云正式商用的专业消息中间件既可为分布式应用系统提供异步解耦和削峰填谷的能力同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性是阿里巴巴双 11 使用的核心产品。
1.1 RocketMQ版本发展
如果想要了解RocketMQ的历史则需了解阿里巴巴中间件团队中的历史。
2011年Linkin(领英全球知名的职场社交平台)推出Kafka消息引擎阿里巴巴中间件团队在研究了Kafka的整体机制和架构设计之后基于Kafka(Scala语言编写)的设计使用Java进行了完全重写并推出了MetaQ 1.0版本主要是用于解决顺序消息和海量堆积的问题由开源社区killme2008维护。课程重点不在此版本具体见https://github.com/killme2008/Metamorphosis
2012年阿里巴巴发现MetaQ原本基于Kafka的架构在阿里巴巴如此庞大的体系下很难进行水平扩展于是对MetaQ进行了架构重组升级开发出了MetaQ 2.0同年阿里把Meta2.0从阿里内部开源出来取名RocketMQ为了命名上的规范以及版本上的延续对外称为RocketMQ3.0。因为RocketMQ3只是RocketMQ的一个过渡版本课程重点也不在此。
2016年11月28日阿里巴巴宣布将开源分布式消息中间件RocketMQ捐赠给Apache成为Apache 孵化项目。在孵化期间RocketMQ完成编码规约、分支模型、持续交付、发布规约等方面的产品规范化同时RocketMQ3也升级为RocketMQ4。现在RocketMQ主要维护的是4.x的版本也是大家使用得最多的版本,所以本书重点将围绕此版本进行详细的讲解项目地址https://github.com/apache/rocketmq/
2015年阿里基于RocketMQ开发了阿里云上的Aliware MQAliware MQ(Message Queue)是RocketMQ的商业版本是阿里云商用的专业消息中间件是企业级互联网架构的核心产品基于高可用分布式集群技术搭建了包括发布订阅、消息轨迹、资源统计、定时延时、监控报警等一套完整的消息云服务。因为Aliware MQ是商业版本课程也不对此产品进行讲述产品地址https://www.aliyun.com/product/rocketmq
2021年伴随众多企业全面上云以及云原生的兴起RocketMQ也在github上发布5.0版本。目前来说还只是一个预览版不过RocketMQ5的改动非常大同时也明确了版本定位RocketMQ 5.0定义为云原生的消息、事件、流的超融合平台。本课程也将会根据目前所发布的版本进行针对性的讲述。
1.1.1 RocketMQ4.X版本更新概要
在 RocketMQ 4.3.0 版本之后正式发布事务消息通过类似于两阶段的方式去解决上下游数据不一致问题。在 RocketMQ 4.4.0 版本中RocketMQ 增加了消息轨迹的功能使用户可以更好定位每一条消息的投放接收路径帮助问题排查另外还增加 ACL 权限控制提高了 RocketMQ 的管控能力和安全性。在 4.5.0 版本中RocketMQ 推出了多副本也就是 Raft 模式。在 Raft 模式下一组 Broker 如果 Master 挂了那么 Broker 中其他 Slave 就会重新选出主。因此 Broker 组内就拥有了自动故障转移的能力也解决了像高可用顺序消息这样的问题进一步提高了 RocketMQ 的可用性。在 4.6.0 版本中我们推出了轻量级 Pull Consumer用户可以使用更加适合于流计算的 API这一版本也开始支持全新的 Request-Reply 消息使得 RocketMQ 具备了同步调用 RPC 的能力RocketMQ 可以更好的打破网络隔离网络之间的调用这个版本中 RocketMQ 也开始支持 IPV6并且是首个支持 IPV6 的消息中间件。在 4.7.0 版本中RocketMQ 重构了主备同步复制流程通过线程异步化将同步复制和刷盘的过程 Pipeline 化同步双写性能有将近数倍提升。在 4.8.0 版本中RocketMQ Raft 模式有了一个质的提升包括通过异步化、批量复制等手段将性能提升了数倍在稳定性上利用 OpenChaos 完成包括宕机、杀死进程OOM、各种各样的网络分区和延迟的测试修复了重要 Bug。在功能上支持 Preferred Leader从而 Broker 组内可以优先选主也支持了批量消息等功能。在 4.9.0 版本主要是提升了可观测性包括支持 OpenTracing事务消息和 Pull Consumer 支持 Trace 等功能。
1.2 为什么要学RocketMQ源码 编写优雅、高效的代码经验 RocketMQ作为阿里双十一交易核心链路产品支撑千万级并发、万亿级数据洪峰。读源码可以积累编写高效、优雅代码的经验。 提升微观的架构设计能力重点在思维和理念 Apache RocketMQ作为Apache顶级项目它的架构设计是值得大家借鉴的。 解决工作中、学习中的各种疑难杂症 在使用RocketMQ过程中遇到消费卡死、卡顿等问题可以通过阅读源码的方式找到问题并给予解决。 在公司面试中展现优秀的自己 大厂面试中尤其是阿里系的公司你有RocketMQ源码体系化知识必定是一个很大的加分项。
1.3 RocketMQ源码中的技术亮点
读写锁原子操作类文件存储设计零拷贝MMAP线程池ConcurrentHashMap写时复制容器负载均衡策略故障延迟机制堆外内存
2 RocketMQ核心组件 NameServer
命名服务更新和路由发现 broker服务。NameServer的作用是为消息生产者、消息消费者提供关于主题 Topic 的路由信息NameServer除了要存储路由的基础信息还要能够管理 Broker节点包括路由注册、路由删除等功能。
Producer/Consumer
java版本的MQ客户端实现包括生产者和消费者。
Broker
它能接收producer和consumer的请求并调用store层服务对消息进行处理。HA服务的基本单元支持同步双写异步双写等模式。
Store
存储层实现同时包括了索引服务高可用HA服务实现。
Netty Remoting Server/Netty Remoting Client
基于netty的底层通信实现所有服务间的交互都基于此模块。也区分服务端和客户端
3 RocketMQ源码下载及安装
3.1 下载地址
官方下载地址http://rocketmq.apache.org/dowloading/releases/
本课程使用的是4.8.0的版本 3.2 环境要求
Linux64位系统JDK1.8(64位)Maven 3.2.x
3.3 使用IntelliJ IDEA导入安装源码
1使用IDEA导入已经下载且已经解压后的代码 2下载且已经解压后的代码导入后执行Maven命令install
mvn install -Dmaven.test.skiptrue使用Maven验证下没问题
3.4 配置与运行RocketMQ
3.4.1 启动NameServer
RocketMQ启动必须先启动NameServer启动类是namesrv/目录下的NamesrvStartup类不过在运行这个类之前必须要配置环境变量ROCKETMQ_HOME,变量值为RocketMQ的运行主目录。 RocketMQ的运行主目录一般使用新建的方式同时在运行主目录中创建conf、logs、store三个文件夹然后从源码目录中distribution目录下的中将broker.conf、logback_broker.xml、logback_namesrv.xml复制到conf目录中。 最后运行namesrv/目录下的NamesrvStartup类的main方法NameServer启动成功 3.4.2 启动Broker
在broker模块找到broker模块同时找到启动类BrokerStartup.java 源码启动前需要修改配置文件broker.conf 修改RocketMQ的消息存储路径 配置环境变量同时启动时需要加入参数指定启动的配置文件 broker启动成功后的控制台如下 3.5 控制台安装及部署
3.5.1 环境要求
运行前确保已经有jdk1.8Maven(打包需要安装Maven3.2.x)
3.5.2 下载
老版本地址下载https://codeload.github.com/apache/rocketmq-externals/zip/master
新版本地址https://github.com/apache/rocketmq-dashboard
解压后如图(以下使用的是老版本新版本参考老版本即可) 3.5.3 配置
后端管理界面是一个Java工程独立部署同时也需要根据不同的环境进行相关的配置。
控制台端口及服务地址配置
下载完成之后进入‘\rocketmq-console\src\main\resources’文件夹打开‘application.properties’进行配置。 进入‘\rocketmq-externals\rocketmq-console’文件夹执行‘mvn clean package -Dmaven.test.skiptrue’编译生成运行jar包。 编译成功之后cmd命令进入‘target’文件夹执行‘java -jar rocketmq-console-ng-2.0.0.jar’启动‘rocketmq-console-ng-2.0.0.jar’。 浏览器中输入‘127.0.0.1:8089’成功后即可进行管理端查看。
4 RocketMQ的核心三流程 整体模块如下 1.rocketmq-namesrv 命名服务更新和路由发现 broker服务。 NameServer 要作用是为消息生产者、消息消费者提供关于主题 Topic 的路由信息NameServer除了要存储路由的基础信息还要能够管理 Broker节点包括路由注册、路由删除等功能 2.rocketmq-broker mq的核心。 它能接收producer和consumer的请求并调用store层服务对消息进行处理。HA服务的基本单元支持同步双写异步双写等模式。 3.rocketmq-store 存储层实现同时包括了索引服务高可用HA服务实现。 4.rocketmq-remoting 基于netty的底层通信实现所有服务间的交互都基于此模块。 5.rocketmq-common 一些模块间通用的功能类比如一些配置文件、常量。 6.rocketmq-client java版本的mq客户端实现 7.rocketmq-filter 消息过滤服务相当于在broker和consumer中间加入了一个filter代理。 8.rocketmq-srvutil 解析命令行的工具类ServerUtil。 9.rocketmq-tools mq集群管理工具提供了消息查询等功能
RocketMQ的源码是非常的多我们没有必要把RocketMQ所有的源码都读完所以我们把核心、重点的源码进行解读RocketMQ核心流程如下
启动流程 RocketMQ服务端由两部分组成NameServer和BrokerNameServer是服务的注册中心Broker会把自己的地址注册到NameServer生产者和消费者启动的时候会先从NameServer获取Broker的地址再去从Broker发送和接受消息。消息生产流程 Producer将消息写入到RocketMQ集群中Broker中具体的Queue。消息消费流程 Comsumer从RocketMQ集群中拉取对应的消息并进行消费确认。
5 NameServer源码分析
5.1 NameServer整体流程
NameServer是整个RocketMQ的“大脑”它是RocketMQ的服务注册中心,所以RocketMQ需要先启动NameServer再启动Rocket中的Broker。 NameServer启动 启动监听等待Broker、Producer、Comsumer连接。Broker在启动时向所有NameServer注册生产者在发送消息之前先从NameServer获取Broker服务器地址列表然后根据负载均衡算法从列表中选择一台服务器进行消息发送。消费者在订阅某个主题的消息之前从NamerServer获取Broker服务器地址列表有可能是集群但是消费者选择从Broker中订阅消息订阅规则由 Broker 配置决定。路由注册 Broker启动后向所有NameServer发送路由及心跳信息。路由剔除 移除心跳超时的Broker相关路由信息。NameServer与每台Broker服务保持长连接并间隔10S检查Broker是否存活如果检测到Broker宕机则从路由注册表中将其移除。这样就可以实现RocketMQ的高可用。
5.2 NameServer启动流程
从源码的启动可知NameServer单独启动。 入口类NamesrvController 核心方法NamesrvController 类中main()-main0- createNamesrvController()-start() - initialize()
流程图如下 5.2.1 加载KV配置
核心解读NamesrvController类中createNamesrvController() 在源码中发现还有一个p的参数直接在启动个参数中送入 -p 就可以打印这个NameServer的所有的参数信息不过NameServer会自动终止说明这个-p是一个测试参数。 正常启动时也可以在启动日志中一定可以找到所有的参数 5.2.2 构建NRS通讯接收路由、心跳信息 5.2.3 定时任务剔除超时Broker
核心控制器会启动定时任务 每隔10s扫描一次Broker,移除不活跃的Broker。
Broker每隔30s向NameServer发送一个心跳包心跳包包含BrokerIdBroker地址Broker名称Broker所属集群名称、Broker关联的FilterServer列表。但是如果Broker宕机NameServer无法收到心跳包此时NameServer如何来剔除这些失效的Broker呢NameServer会每隔10s扫描brokerLiveTable状态表如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s则认为Broker失效移除该Broker关闭与Broker连接同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。 路由剔除机制中Borker每隔30S向NameServer发送一次心跳而NameServer是每隔10S扫描确定有没有不可用的主机120S没心跳那么问题就来了这种设计是存在问题的就是NameServer中认为可用的Broker实际上已经宕机了那么某一时间段从NameServer中读到的路由中包含了不可用的主机会导致消息的生产/消费异常不过不用担心在生产和消费端有故障规避策略及重试机制可以解决以上问题原理后续源码解读。这个设计符合RocketMQ的设计理念整体设计追求简单与性能同时这样设计NameServer是可以做到无状态化的可以随意的部署多台其代码也非常简单非常轻量。 RocketMQ有两个触发点来删除路由信息 NameServer定期扫描brokerLiveTable检测上次心跳包与当前系统的时间差如果时间超过120s则需要移除broker。 Broker在正常关闭的情况下会执行unregisterBroker指令这两种方式路由删除的方法都是一样的都是从相关路由表中删除与该broker相关的信息。 在消费者启动之后第一步都要从NameServer中获取Topic相关信息
5.3 NameServer设计亮点
5.3.1 读写锁
RouteInfoManager类中有一个读写锁的设计 消息发送时客户端会从NameServer获取路由信息同时Broker会定时更新NameServer的路由信息所以路由表会有非常频繁的以下操作
1、 生产者发送消息时需要频繁的获取。对表进行读。
RouteInfoManager类 2、 Broker定时(30s)会更新一个路由表。对表进行写。
RouteInfoManager类 因为Broker每隔30s向NameServer发送一个心跳包这个操作每次都会更新Broker的状态但同时生产者发送消息时也需要Broker的状态要进行频繁的读取操作。所以这个地方就有一个矛盾Broker的状态会被经常性的更新同时也会被更加频繁的读取。这里如何提高并发尤其是生产者进行消息发送时的并发所以这里使用了读写锁机制针对读多写少的场景。
synchronized和ReentrantLock基本都是排他锁排他锁在同一时刻只允许一个线程进行访问而读写锁在同一时刻可以允许多个读线程访问但是在写线程访问时所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁一个读锁和一个写锁通过分离读锁和写锁使得并发性相比一般的排他锁有了很大提升。
5.3.2 存储基于内存
NameServer存储以下信息
topicQueueTableTopic消息队列路由信息消息发送时根据路由表进行负载均衡
brokerAddrTableBroker基础信息包括brokerName、所属集群名称、主备Broker地址
clusterAddrTableBroker集群信息存储集群中所有Broker名称
brokerLiveTableBroker状态信息NameServer每次收到心跳包是会替换该信息
filterServerTableBroker上的FilterServer列表用于类模式消息过滤。 NameServer的实现基于内存NameServer并不会持久化路由信息持久化的重任是交给Broker来完成。这样设计可以提高NameServer的处理能力。
5.3.3 NameServer无状态化 NameServer集群中它们相互之间是不通讯主从架构中Broker都会向所有NameServer注册路由、心跳信息生产者/消费者同一时间与NameServer集群中其中一台建立长连接
项目实战部署分析
假设一个RocketMQ集群部署在两个机房每个机房都有一些NameServer、Broker和客户端节点当两个机房的链路中断时所有的NameServer都可以提供服务客户端只能在本机房的NameServer中找到本机房的Broker。
RocetMQ集群中NameSever之间是不需要互相通信的所以网络分区对NameSever本身的可用性是没有影响的如果NameSever检测到与Broker的连接中断了NameServer会认为这个Broker不再能提供服务NameServer会立即把这个Broker从路由信息中移除掉避免客户端连接到一个不可用的Broker上去。 网络分区后NameSever 收不到对端机房那些Broker的心跳这时候每个Namesever上都只有本机房的Broker信息。
6 Broker源码分析
1、Broker启动流程分析
2、消息存储设计
3、消息写入流程
4、亮点分析NRS与NRC的功能号设计
5、亮点分析同步双写数倍性能提升的CompletableFuture
6、亮点分析Commitlog写入时使用可重入锁还是自旋锁
7、亮点分析零拷贝技术之MMAP提升文件读写性能
8、亮点分析堆外内存机制 6.1 Broker启动流程分析
在RocketMQ中Broker的处理是最多的所以我们先分析Broker的启动流程。核心流程图如下 6.2 消息存储设计 Kafka 中文件的布局是以 Topic/partition 每一个分区一个物理文件夹在分区文件级别实现文件顺序写如果一个Kafka集群中拥有成百上千个主题每一个主题拥有上百个分区消息在高并发写入时其IO操作就会显得零散消息分散的落盘策略会导致磁盘IO竞争激烈成为瓶颈其操作相当于随机IO即 Kafka 在消息写入时的IO性能会随着 topic 、分区数量的增长其写入性能会先上升然后下降。而 RocketMQ在消息写入时追求极致的顺序写所有的消息不分主题一律顺序写入 commitlog 文件并不会随着 topic 和 分区数量的增加而影响其顺序性。在消息发送端消费端共存的场景下随着Topic数的增加Kafka吞吐量会急剧下降而RocketMQ则表现稳定。因此Kafka适合Topic和消费端都比较少的业务场景而RocketMQ更适合多Topic多消费端的业务场景。
6.3 存储文件设计
RocketMQ 主要存储的文件包括 Commitlog 文件、 ConsumeQueue 文件、 IndexFile。RocketMQ 将所有主题的消息存储在同一文件确保消息发送时顺序写文件尽最大的能力确保消息发送的高性能与高吞吐量。但由于一般的消息中间件是基于消息主题的订阅机制这样便给按照消息主题检索消息带来了极大的不便。为了提高消息消费的效率 RocketMQ 引入了 ConsumeQueue 消息队列文件每个消息主题包含多个消息消费队列每个消息队列有一个消息文件。RocketMQ 还引入了IndexFile 索引文件其主要设计理念就是为了加速消息的检索性能可以根据消息的属性快速从 Commitlog 文件中检索消息。整体如下 1 ) CommitLog 消息存储文件所有消息主题的消息都存储在 CommitLog 文件中2 ) ConsumeQueue 消息消费队列消息到达 CommitLog 文件后将异步转发到消息消费队列供消息消费者消费3 ) IndexFile 消息索引文件主要存储消息 Key与Offset 的对应关系
6.3.1消息存储结构
CommitLog 以物理文件的方式存放每台 Broker 上的 CommitLog 被本机器所有 ConsumeQueue 共享在CommitLog 中一个消息的存储长度是不固定的 RocketMQ采取一些机制尽量向CommitLog 中顺序写 但是随机读。commitlog 文件默认大小为lG 可通过在 broker 置文件中设置 mapedFileSizeCommitLog 属性来改变默认大小。 Commitlog文件存储的逻辑视图如下每条消息的前面4个字节存储该条消息的总长度。但是一个消息的存储长度是不固定的。 ConsumeQueue
ConsumeQueue 是消息的逻辑队列类似数据库的索引文件存储的是指向物理存储的地址。每个Topic下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件。 ConsumeQueue中存储的是消息条目为了加速 ConsumeQueue 消息条目的检索速度与节省磁盘空间每一个 Consumequeue条目不会存储消息的全量信息消息条目如下 ConsumeQueue 即为Commitlog 文件的索引文件 其构建机制是 当消息到达 Commitlog 文件后 由专门的线程产生消息转发任务从而构建消息消费队列文件ConsumeQueue 与下文提到的索引文件。存储机制这样设计有以下几个好处1 ) CommitLog 顺序写 可以大大提高写入效率。实际上磁盘有时候会比你想象的快很多有时候也比你想象的慢很多关键在如何使用使用得当磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘顺序写速度可以达到600MB/s 超过了一般网卡的传输速度这是磁盘比想象的快的地方 但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差 6000 倍2 )虽然是随机读但是利用操作系统的 pagecache 机制可以批量地从磁盘读取作为 cache 存到内存中加速后续的读取速度。同时因为ConsumeQueue中每一条消息的索引是固定长度所以也能够确保消息消费时的时间复杂度保持在O(1)。3 为了保证完全的顺序写需要 ConsumeQueue 这个中间结构 因为ConsumeQueue 里只存偏移量信息所以尺寸是有限的在实际情况中大部分的 ConsumeQueue 能够被全部读入内存所以这个中间结构的操作速度很快可以认为是内存读取的速度。此外为了保证 CommitLog和ConsumeQueue 的一致性 CommitLog 里存储了 Consume Queues 、Message Key、 Tag 等所有信息即使 ConsumeQueue 丢失也可以通过 commitLog 完全恢复出来。
IndexFile
index 存的是索引文件这个文件用来加快消息查询的速度。消息消费队列 RocketMQ 专门为消息订阅构建的索引文件 提高根据主题与消息检索消息的速度 使用Hash索引机制具体是Hash槽与Hash冲突的链表结构 Config
config 文件夹中 存储着Topic和Consumer等相关信息。主题和消费者群组相关的信息就存在在此。topics.json : topic 配置属性subscriptionGroup.json :消息消费组配置信息。delayOffset.json 延时消息队列拉取进度。consumerOffset.json 集群消费模式消息消进度。consumerFilter.json 主题消息过滤信息。 6.3.2 消息存储结构源码对应 6.4 消息写入流程
RocketMQ使用Netty处理网络broker收到消息写入的请求就会进入SendMessageProcessor类中processRequest方法。 最终进入DefaultMessageStore类中asyncPutMessage方法进行消息的存储 然后消息进入commitlog类中的asyncPutMessage方法进行消息的存储 整个存储设计层次非常清晰大致的层次如下图 业务层也可以称之为网络层就是收到消息之后一般交给SendMessageProcessor来分配交给哪个业务来处理。DefaultMessageStore这个是存储层最核心的入口。
存储逻辑层主要负责各种存储的逻辑里面有很多跟存储同名的类。
存储I/O层主要负责存储的具体的消息与I/O处理。
6.5 源码分析中亮点
6.5.1 NRS与NRC的功能号设计
RocketMQ的通讯使用的是Netty,作为客户端核心类有两种RemotingCommand与NettyRemotingClient。
RemotingCommand主要处理消息的组装包括消息头、消息序列化与反序列化。
NettyRemotingClient主要处理消息的发送包括同步、异步、单向、注册等操作。 因为RocketMQ消息种类比较众多所以对于消息的发送使用了一个类似于功能号的设计。
客户端发送消息时定义一个code对应一个功能服务端注册一个业务处理对应一个code的业务处理。
code对应码表RequestCode类。
例如从生产者客户端代码跳入到NRC的代码NettyRemotingClient MQClientAPIImpl类中的sendMessage()中 NettyRemotingClient类 而在NRS中只需要将服务端需要处理的ExecutorService注册到NRS组件中即可。
在启动流程中BrokerController类中的initialize()中 注意功能号的设计并对客户端和服务端不是一对一的在服务端往往处理是可以将不同的功能号对应到一个处理的任务中。
6.5.2 同步双写数倍性能提升的CompletableFuture
在RocketMQ4.7.0之后RocketMQ大量使用Java中的异步编程接口CompletableFuture。尤其是在Broker端进行消息接收处理时。
比如DefaultMessageStore类中asyncPutMessage方法 Future接口正是设计模式中Future模式的一种实现如果一个请求或任务比较耗时可以将方法调用改为异步方法立即返回任务则使用主线程外的其他线程异步执行主线程继续执行。当需要获取计算结果时再去获取数据。
在Master-Slave主从架构下Master 节点与 Slave 节点之间数据同步/复制的方式有同步双写和异步复制两种模式。同步双写是指Master将消息成功落盘后需要等待Slave节点复制成功(如果有多个Slave,成功复制一个就可以)后再告诉客户端消息发送成功。 RocketMQ 4.7.0 以后合理使用CompletableFuture对同步双写进行性能优化使得对消息的处理流式化大大提高了Broker的接收消息的处理能力。
6.5.3 Commitlog写入时使用可重入锁还是自旋锁
RocketMQ在写入消息到CommitLog中时使用了锁机制即同一时刻只有一个线程可以写CommitLog文件。CommitLog 中使用了两种锁一个是自旋锁另一个是重入锁。源码如下 这里注意lock锁的标准用法是try-finally处理防止死锁问题
另外这里锁的类型可以自主配置。
RocketMQ 官方文档优化建议异步刷盘建议使用自旋锁同步刷盘建议使用重入锁调整Broker配置项useReentrantLockWhenPutMessage默认为false 同步刷盘时锁竞争激烈会有较多的线程处于等待阻塞等待锁的状态如果采用自旋锁会浪费很多的CPU时间所以“同步刷盘建议使用重入锁”。
异步刷盘是间隔一定的时间刷一次盘锁竞争不激烈不会存在大量阻塞等待锁的线程偶尔锁等待就自旋等待一下很短的时间不要进行上下文切换了所以采用自旋锁更合适。
6.5.4 零拷贝技术之MMAP提升文件读写性能
RocketMQ底层对commitLog、consumeQueue之类的磁盘文件的读写操作都采用了mmap技术。具体到代码里面就是利用JDK里面NIO的MapperByteBuffer的map()函数来先将磁盘文件CommitLog文件、consumeQueue文件映射到内存里来。
假如没有使用mmap技术的时候使用最传统和基本普通文件进行io操作会产生数据多拷贝问题。比如从磁盘上把数据读取到内核IO缓冲区里面然后再从内核IO缓冲区中读取到用户进程私有空间里去然后我们才能拿到这个数据。
MMAP内存映射是在硬盘上文件的位置和应用程序缓冲区(application buffers)进行映射建立一种一一对应关系由于mmap()将文件直接映射到用户空间所以实际文件读取时根据这个映射关系直接将文件从硬盘拷贝到用户空间只进行了一次数据拷贝不再有文件内容从硬盘拷贝到内核空间的一个缓冲区。 MMAP属于零拷贝技术的一种。
零拷贝(英语: Zero-copy) 技术是指计算机执行操作时CPU不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。
mmap技术在地址映射的过程中对文件的大小是有限制的在1.5G2G之间所以RocketMQ就会把单个的commitLog文件大小控制在1GBconsumeQueue文件大小控制在5.72MB这样就在读写的时候方便的进行内存映射了。
Broker启动时MMAP相关源码如下
MappedFile类的init方法 生产者发送消息时MMAP相关消息写入源码如下 CommitLog类中的doAppend方法:具体进行消息格式的排列 CommitLog之Message格式可做参考 6.5.5 堆外内存机制
一般情况下RocketMQ是通过MMAP内存映射生产时消息写入内存映射文件然后消费的时候再读。但是RocketMQ还提供了一种机制。堆外内存机制TransientStorePool,短暂的存储池(堆外内存)。
6.5.5.1 开启条件及限制
开启堆外内存需要修改配置文件brokertransientStorePoolEnabletrue 同时如果开启了堆外内存缓冲区的话集群模式必须是异步刷盘的模式同时该Broker必须为主节点通过查看源码我们可以可以看到这一限制
DefaultMessageStore. DefaultMessageStore() 从堆外内存的流程图也可以看出堆外内存的消息写入明显要多一个步骤所以堆外内存缓冲区的设置一定要求是异步才行。
6.5.5.2 堆外缓冲区流程 RocketMQ单独创建一个ByteBuffer内存缓存池用来临时存储数据数据先写入该内存映射中然后由commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RocketMQ引入该机制主要的原因是提供一种内存锁定将当前堆外内存一直锁定在内存中避免被进程将内存交换到磁盘。同时因为是堆外内存这么设计可以避免频繁的GC。
6.5.5.3 源码分析
在DefaultMessageStore类中开启堆外缓冲配置则进行堆外内存池初始化TransientStorePool 在创建MappedFile时如果检测到有堆外内存配置。这里就会把堆外内存通过borrowBuffer()赋给writeBuffer 从6.4章节中的消息写入流程继续消息最终会进入mappedFile中处理。 经过几个转跳后进入appendMessagesInner方法中 以上就可知如果有堆外内存缓冲区的话ByteBuffer的来源是不同的。不过这里要注意如果是堆外内存缓冲区消息写入到ByteBuffer的话还只是写入一个临时区域不像默认模式本身就是mmap映射的内存直接写入就进入了磁盘和内存的映射所以这里还需要一个步骤就是从临时区域到正式区域。RocketMQ这里使用的是定时任务处理并且是借用异步刷盘的定时任务来处理。
这里需要回到CommitLog的构造方法在CommitLog构造的时候会选择启动一个定时任务来处理堆外内存 上述跳转比较麻烦但是记住一个点就是堆外内存的数据写入本质上是分成两个阶段
一个阶段先写入堆外内存另外一个阶段通过定时任务再写入文件。
6.5.5.4 堆外内存缓冲的意义 从图中可以发现默认方式MmapPageCache的方式读写消息都走的是pageCache(MappedByteBuffer类)这样子读写都在pagecache里面不可避免会有锁的问题在并发的读写操作情况下会出现缺页中断降低内存加锁污染页的回写脏页面。
而如果采用堆外缓冲区DirectByteBuffer(堆外内存)PageCache的两层架构方式这样子可以实现读写消息分离写入消息时候写到的是DirectByteBuffer——堆外内存中,读消息走的是PageCache(MappedByteBuffer类)带来的好处就是避免了内存操作的很多容易堵的地方降低了时延比如说缺页中断降低内存加锁污染页的回写。
所以使用堆外缓冲区的方式相对来说会比较好但是肯定的是需要消耗一定的内存如果服务器内存吃紧就不推荐这种模式同时的话堆外缓冲区的话也需要配合异步刷盘才能使用(因为写数据分成了两步同步刷盘延迟就会比较大)。
7 Producer源码分析
7.1 消息发送整体流程
下面是一个生产者发送消息的demo同步发送 主要做了几件事
初始化一个生产者DefaultMQProducer对象设置 NameServer 的地址启动生产者发送消息
7.2 消息发送者启动流程 DefaultMQProducerImpl类start() 7.2.1 检查
DefaultMQProducerImpl类 7.2.2 获得MQ客户端实例
整个JVM中只存在一个MQClientManager实例维护一个MQClientInstance缓存表
DefaultMQProducerImpl类start() 一个clientId只会创建一个MQClientInstance clientId生成规则IPinstanceNameunitName
ClientConfig类 RocketMQ中消息发送者、消息消费者都属于”客户端“
每一个客户端就是一个MQClientInstance每一个ClientConfig对应一个实例。
故不同的生产者、消费端如果引用同一个客户端配置(ClientConfig)则它们共享一个MQClientInstance实例。所以我们在定义的的时候要注意这种问题生产者和消费者如果分组名相同容易导致这个问题 7.2.3 启动实例
MQClientInstance类start() 7.2.4 定时任务
MQClientInstance类startScheduledTask() 7.3 Producer消息发送流程
我们从一个生产者案例的代码进入代码可知DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法 从核心方法可知消息发送就是4个步骤验证消息、查找路由、选择队列、消息发送。 7.4 消息发送队列选择 7.4.1 默认选择队列策略
采用了最简单的轮询算法这种算法有个很好的特性就是保证每一个Queue队列的消息投递数量尽可能均匀。这种算法只要消息投递过程中没有发生重试的话基本上可以保证每一个Queue队列的消息投递数量尽可能均匀。当然如果投递中发生问题比如第一次投递就失败那么很大的可能性是集群状态下的一台Broker挂了所以在重试发送中进行规避。这样设置也是比较合理的。
7.4.2 故障延迟机制策略
采用此策略后每次向Broker成功或者异常的发送RocketMQ都会计算出该Borker的可用时间发送结束时间-发送开始时间失败的按照30S计算并且保存方便下次发送时做筛选。 除了记录Broker的发送消息时长之外还要计算一个Broker的不可用时长。这里采用一个经验值
如果消息时长在550ms之内不可用时长为0。
达到550ms不可用时长为30S
达到1000ms不可用时长为60S
达到2000ms不可用时长为120S
达到3000ms不可用时长为180S
达到15000ms不可用时长为600S
以最大的计算。 有了以上的Broker规避信息后发送消息就非常简单了。
在开启故障延迟机制策略步骤如下
1、根据消息队列表时做轮训
2、选好一个队列
3、判断该队列所在Broker是否可用
4、如果是可用则返回该队列队列选择逻辑结束
5、如果不可用则接着步骤2继续
6、如果都不可用则随机选一个
代码如下 7.4.3 两种策略的选择
从这种策略上可以很明显看到默认队列选择是轮训策略而故障延迟选择队列则是优先考虑消息的发送时长短的队列。那么如何选择呢
首先RocketMQ默认的发送失败有重试策略默认是2也就是如果向不同的Broker发送三次都失败了那么这条消息的发送就失败了作为RocketMQ肯定是尽力要确保消息发送成功。所以给出以下建议。
如果是网络比较好的环境推荐默认策略毕竟网络问题导致的发送失败几率比较小。
如果是网络不太好的环境推荐故障延迟机制消息队列选择时会在一段时间内过滤掉RocketMQ认为不可用的broker以此来避免不断向宕机的broker发送消息从而实现消息发送高可用。
当然以上成立的条件是一个Topic创建在2个Broker以上的的基础上。
7.4.4 技术亮点:ThreadLocal 7.5 客户端建立连接的时机
Producer、Consumer连接的建立时机有何关系 源码分析一波
DefaultMQProducerImpl类中sendKernelImpl方法 根据源码分析客户端(MQClientInstance)中连接的建立时机为按需创建也就是在需要与对端进行数据交互时才建立的。建立的是长连接。
8 Consumer源码分析
8.1 消息发送时数据在ConsumeQueue的落地 连续发送5条消息消息是不定长首先所有信息先放入 Commitlog中每一条消息放入Commitlog的时候都需要上锁确保顺序的写入。
当Commitlog写成功了之后。数据通过ReputMessageService类定时同步到ConsunmeQueue中写入Consume Queue的内容是定长的固定是20个Bytesoffset 8个、size 4个、Hashcode of Tag 8个。
这种设计非常的巧妙
查找消息的时候可以直按根据队列的消息序号计算出索引的全局位置比如序号2就知道偏移量是20然后直接读取这条索引再根据索引中记录的消息的全局位置找到消息。这两次查找是差不多的第一次在通过序号在consumer Queue中获取数据的时间复杂度是O(1)第二次查找commitlog文件的时间复杂度也是O(1)所以消费时查找数据的时间复杂度也是O(1)。
8.1.1 ReputMessageService.doReput源码分析
DefaultMessageStore. start() maxPhysicalPosInLogicQueue 就是commitlog的文件名这个文件记录的最小偏移量
ReputMessageService.run()
ReputMessageService线程每执行一次任务推送休息1毫秒就继续尝试推送消息 1返回reputFromOffset偏移量开始的全部有效数据(commitlog文件)。然后循环读取每一条消息。
2从result返回的ByteBuffer中循环读取消息一次读取一条创建DispatchRequest对象。如果消息长度大于0则调用doDispatch方法。最终将分别调用CommitLogDispatcherBuildConsumeQueue(构建消息消费队列)CommitLogDispatcherBuildIndex(构建索引文件)。 3构建消息消费队列 8.2 消费者启动流程 DefaultMQPushConsumerImpl类是核心类 8.3 消费者模式
8.3.1 集群消费 消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息即一条消息只会投递到一个Consumer Group下面的一个实例。
实际上每个Consumer是平均分摊Message Queue的去做拉取消费。例如某个Topic有3条Q其中一个Consumer Group 有 3 个实例可能是 3 个进程或者 3 台机器那么每个实例只消费其中的1条Q。
而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。
这种模式下消费进度 (Consumer Offset) 的存储会持久化到Broker 。
代码演示
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ARadJII4-1677492460948)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image004.jpg)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fEoQyoof-1677492460949)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image006.jpg)]
8.3.2 广播消费 消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些 Consumer 属于同一个Consumer Group消息也会被Consumer Group 中的每个Consumer都消费一次。
实际上是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。
这种模式下消费进度 (Consumer Offset) 会存储持久化到实例本地 。
代码演示 8.4 Consumer负载均衡
8.4.1 集群模式
在集群消费模式下每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更都会触发一次所有实例的负载均衡这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
默认的分配算法是AllocateMessageQueueAveragely
还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle也是平均分摊每一条queue只是以环状轮流分queue的形式
如下图 需要注意的是集群模式下queue都是只允许分配只一个实例这是由于如果多个实例同时消费一个queue的消息由于拉取哪些消息是consumer主动控制的那样会导致同一个消息在不同的实例下被消费多次所以算法上都是一个queue只分给一个consumer实例一个consumer实例可以允许同时分到不同的queue。
通过增加consumer实例去分摊queue的消费可以起到水平扩展的消费能力的作用。而有实例下线的时候会重新触发负载均衡这时候原来分配到的queue将分配到其他实例上继续消费。
但是如果consumer实例的数量比message queue的总数量还多的话多出来的consumer实例将无法分到queue也就无法消费到消息也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。 8.4.2 广播模式
由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例所以也就没有消息被分摊消费的说法。
在实现上其中一个不同就是在consumer分配queue的时候所有consumer都分到所有的queue。
8.5 并发消费流程
一般我们在消费时使用回调函数的方式使用得最多的是并发消费消费者客户端代码如下 参考RocketMQ核心流程 在RocketMQ的消费时整体流程如下 8.5.1 获取topic配置信息
在消费者启动之后第一步都要从NameServer中获取Topic相关信息。
这一步设计到组件之间的交互RocketMQ使用功能号来设计的。GET_ROUTEINFO_BY_TOPIC在idea上使用ctrlH 查找功能。很快就定位这段代码
MQClientInstance类中 最终在MQClientAPIImpl类中完成调用 具体这里是30S定时执行一次。
8.5.2 获取Group的ConsumerList
在消费消息前需要获取当前分组已经消费的相关信息ConsumerList
MQClientInstance类的start() 这里就是每间隔20S就执行一个doRebalance方法 进入RebalanceImpl类 再进入具体的类,如果是广播消费模式则不需要从服务器获取消费进度广播消费模式把进度在本地消费端进行存储 而广播消费模式则需要从服务器获取消费进度相关信息具体如下 8.5.3 获取Queue的消费Offset
在分配完消费者对应的Queue之后如果是集群模式的话需要获取这个消费者对应Queue的消费Offset,便于后续拉取未消费完的消息。
RebalanceImpl类中rebalanceByTopic方法 进入RebalancePushImpl类 8.5.4 拉取Queue的消息 最终进入DefaultMQPushConsumerImpl类的pullMessage方法 8.5.5 更新Queue的消费Offset
这里要注意因为RocketMQ的推模式是基于拉模式实现的因为拉消息是一批批拉所以不能做到拉一批提交一次偏移量所以这里使用定时任务进行偏移量的更新。
MQClientInstance类中的start方法 8.5.6 注销Consumer 8.6 顺序消费流程
顺序消费代码顺序消费的流程和并发消费流程整体差不多唯一的多的就是使用锁机制来确保一个队列同时只能被一个消费者消费从而确保消费的顺序性
ConsumeMessageOrderlyService类 这里有一个定时任务是每个20秒运行一次周期性的去续锁锁的有效期是60S 8.7 消费卡死
之前在消费的流程中尤其是针对顺序消息感觉上会有卡死的现象由于顺序消息中需要到Broker中加锁如果消费者某一个挂了那么在Broker层是维护了60s的时间才能释放锁所以在这段时间只能消费者是消费不了的在等待锁。
另外如果还有Broker层面也挂了如果是主从机构获取锁都是走的Master节点如果Master节点挂了走Slave消费但是slave节点上没有锁所以顺序消息如果发生了这样的情况也是会有卡死的现象。 8.8 启动之后较长时间才消费
在并发消费的时候当我们启动了非常多的消费者维护了非常多的topic的时候、或者queue比较多的时候你可以看到消费的流程的交互是比较多的5~6步要启动多线程也要做相当多的事情所以你会感觉要启动较长的时间才能消费。
还有顺序消费的时候如果是之前的消费者挂了这个锁要60秒才会释放也会导致下一个消费者启动的时候需要等60s才能消费。
8.9 消费端整体流程预览
这个流程比较复杂建议有兴趣的同学可以根据这张图研究下 9 分布式事务消息源码分析
9.1 什么是分布式事务
业务场景用户A转账100元给用户B这个业务比较简单具体的步骤 1、用户A的账户先扣除100元 2、再把用户B的账户加100元 如果在同一个数据库中进行事务可以保证这两步操作要么同时成功要么同时不成功。这样就保证了转账的数据一致性。 但是在微服务架构中因为各个服务都是独立的模块都是远程调用都没法在同一个事务中都会遇到分布式事务问题。
9.2 RocketMQ的解决方案 RocketMQ采用两阶段提交把扣款业务和加钱业务异步化在A系统扣款成功后发送“扣款成功消息”到消息中间件B系统中加钱业务订阅“扣款成功消息”再对用户进行加钱。
9.2.1 解决方案的问题
在哪个阶段向RocketMQ发送消息
1、先扣款后再向RocketMQ发消息
先扣款再发送消息万一发送消息超时了(MQ中有可能成功有可能失败)那这个状态就很难判断了
2、先向RocketMQ发消息后再扣款
扣款成功消息发送成功但是如果本地扣款业务失败了那消息已经发给MQ了第二阶段的加钱就会执行成功。
所以我们发现无论是哪种方案处理起来都会有问题。
其实仔细分析下问题的关键点就是RocketMQ改变不了消息发送者的事务状态。所以RocketMQ的分布式事务方案进行了优化。
9.3 RocketMQ的分布式事务方案 所以RocketMQ在分布式事务中引入了半事务及事务回查机制。
半事务 发一个消息到rocketmq但该消息只储存在commitlog中但consumeQueue中不可见也就是消费端订阅端无法看到此消息。
事务回查
RocketMq会定时遍历commitlog中的半事务消息这个事务回查机制就可以站在 RocketMQ的角度参与消息发送者的事务中。
9.4 RocketMQ的分布式事务案例代码 这个是分布式事务的生产者完成了半事务的发送。
通过事务回查如果在TransactionListenerImpl类executeLocalTransaction方法中如果本地事务执行成功则提交commit_message,消费端即可消费消息 如果有一些比较耗时的操作导致不能在这个步骤确认的话可以提交UNKNOW交给定时的任务回查来处理 另外一点如果担心生产者发生故障导致分布式事务的问题话定时事务回查是可以在生产者群组中做的。
我们可以做一个这样的案例一个生产者1一个生产者2消费发送时的分组是一样都使用分布式事务消息。
生产者1发生故障了消息状态还是一个UNKNOW状态只要生产者2还存活生产者2就可以帮助生产者1完成事务回查的确认从而不会有单点故障问题。 9.5 分布式事务源码分析
从分布式事务的流程上我们分析源码可以从消息发送确认/回滚 回查三个方面。 9.5.1 消息发送源码分析
Producer Broker
RocketMQ使用Netty处理网络broker收到消息写入的请求就会进入SendMessageProcessor类中processRequest方法。 最终进入DefaultMessageStore类中asyncPutMessage方法进行消息的存储 结合图同时结合代码我们可以看到在事务消息发送时消息实际存储的主题是一个系统主题RMQ_SYS_TRANS_HALF_TOPIC
同时消息中保存着消息的原有主题相关的信息与队列 9.5.2 确认/回滚源码分析
Producer
DefaultMQProducerImpl类sendMessageInTransaction方法 Broker EndTransactionProcessor类 9.5.3 回查源码分析
Producer
事务回查中Producer是服务端所以需要注册服务处理 DefaultMQProducerImpl类checkTransactionState方法 DefaultMQProducerImpl类processTransactionState方法 Broker
在Broker启动的时候是要作为客户端定期的访问客户端做事务回查。
回顾到之前讲到《6.1 Broker启动流程分析》 事务回查是Broker发起的一次定时的网络调用每隔60s所以事务回查在客户端启动的时候第一次不一定是60s的间隔一般会小于60s因为事务回查是broker发起的并不是client端定时发起