如何构建企业网站,网站加速cdn,郑州装修公司口碑哪家好,东莞市建设工程监督网RocketMQ笔记 文章目录 一、引言⼆、RocketMQ介绍RocketMQ的由来 三、RocketMQ的基本概念1 技术架构2 部署架构 四、快速开始1.下载RocketMQ2.安装RocketMQ3.启动NameServer4.启动Broker5.使⽤发送和接收消息验证MQ6.关闭服务器 五、搭建RocketMQ集群1.RocketMQ集群模式2.搭建主…RocketMQ笔记 文章目录 一、引言⼆、RocketMQ介绍RocketMQ的由来 三、RocketMQ的基本概念1 技术架构2 部署架构 四、快速开始1.下载RocketMQ2.安装RocketMQ3.启动NameServer4.启动Broker5.使⽤发送和接收消息验证MQ6.关闭服务器 五、搭建RocketMQ集群1.RocketMQ集群模式2.搭建主从异步集群3.验证集群4.安装可视化管理控制平台 六、消息示例1.构建Java基础环境2.简单消息示例3.顺序消息4.⼴播消息5.延迟消息6.批量消息7.过滤消息8.事务消息 六、SpringBoot整合RocketMQ1.引⼊依赖2.编写配置⽂件3.编写⽣产者发送普通消息4.编写JUnit单元测试发送消息5.创建消费者程序6.发送事务消息 七、Spring Cloud Stream整合RocketMQ1.Spring Cloud Stream介绍2.编写⽣产者2.编写消费者 ⼋、RocketMQ核⼼概念1.消息模型Message Model2.消息⽣产者Producer3.消息消费者Consumer4.主题Topic5.代理服务器Broker Server6.名字服务Name Server7.拉取式消费Pull Consumer8.推动式消费Push Consumer9.⽣产者组Producer Group10.消费者组Consumer Group11.集群消费Clustering12.⼴播消费Broadcasting13.消息Message14.标签Tag 九、消息存储机制1.消息存储整体架构2.⻚缓存与内存映射3.消息刷盘 ⼗、集群核⼼概念1.消息主从复制2.负载均衡3.消息重试4.死信队列5.幂等消息 ⼗⼀、RocketMQ最佳实践1.保证消息顺序消费2.快速处理积压消息3.保证消息可靠性投递 一、引言 这是B站上千峰的MQ学习笔记详细可以对照B站视频进行学习个人觉得本视频与笔记还是偏向于基础实战部分基本没有。实战需要结合项目学习。 Message Queue消息 队列从字⾯上理解⾸先它是⼀个队列。FIFO先进先出 的数据结构——队列。消息队列就是所谓的存放消息的队列。
消息队列解决的不是存放消息的队列的⽬的解决的是通信问题。 ⽐如以电商订单系统为例如果各服务之间使⽤同步通信不仅耗时较久且过程中受到⽹络波动的影响不能保证⾼成功率。因此使⽤异步的通信⽅式对架构进⾏改造。 使⽤异步的通信⽅式对模块间的调⽤进⾏解耦可以快速的提升系统的吞吐量。上游执⾏完消息的发送业务后⽴即获得结果下游多个服务订阅到消息后各⾃消费。 通过消息队列屏蔽底层的通信协议使得解藕和并⾏消费得以实现。
⼆、RocketMQ介绍
RocketMQ的由来
随着使⽤中队列和虚拟主题的增加阿⾥巴巴团队使⽤的ActiveMQ IO 模块达到了 瓶颈。为了尽⼒通过节流、断路器或降级来解决这个问题但效果不佳。所以开始 关注当时流⾏的消息传递解决⽅案Kafka。不幸的是Kafka ⽆法满⾜要求尤其是 在低延迟和⾼可靠性⽅⾯。在这种情况下决定发明⼀种新的消息传递引擎来处理 更⼴泛的⽤例从传统的发布/订阅场景到⼤容量实时零丢失交易系统。⽬前 RocketMQ已经开源给Apache基⾦会。如今已有 100 多家公司在其业务中使⽤开源版本的 RocketMQ。
三、RocketMQ的基本概念
1 技术架构 RocketMQ架构上主要分为四部分如上图所示:
Producer消息发布的⻆⾊⽀持分布式集群⽅式部署。Producer通过MQ的负 载均衡模块选择相应的Broker集群队列进⾏消息投递投递的过程⽀持快速失 败并且低延迟。
Consumer消息消费的⻆⾊⽀持分布式集群⽅式部署。⽀持以push推pull 拉两种模式对消息进⾏消费。同时也⽀持集群⽅式和⼴播⽅式的消费它提供 实时消息订阅机制可以满⾜⼤多数⽤户的需求。
NameServerNameServer是⼀个⾮常简单的Topic路由注册中⼼其⻆⾊类似 Dubbo中的zookeeper⽀持Broker的动态注册与发现。主要包括两个功能 Broker管理NameServer接受Broker集群的注册信息并且保存下来作为路由信 息的基本数据。然后提供⼼跳检测机制检查Broker是否还存活路由信息管 理每个NameServer将保存关于Broker集群的整个路由信息和⽤于客户端查询 的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker 集群的路由信息从⽽进⾏消息的投递和消费。NameServer通常也是集群的⽅式部署各实例间相互不进⾏信息通讯。Broker是向每⼀台NameServer注册⾃ ⼰的路由信息所以每⼀个NameServer实例上⾯都保存⼀份完整的路由信息。 当某个NameServer因某种原因下线了Broker仍然可以向其它NameServer同步其路由信息Producer,Consumer仍然可以动态感知Broker的路由的信息。
BrokerServerBroker主要负责消息的存储、投递和查询以及服务⾼可⽤保证为了实现这些功能Broker包含了以下⼏个重要⼦模块。
Remoting Module整个Broker的实体负责处理来⾃clients端的请求。
Client Manager负责管理客户端(Producer/Consumer)和维护Consumer的 Topic订阅信息
Store Service提供⽅便简单的API接⼝处理消息存储到物理硬盘和查询功 能。
HA Service⾼可⽤服务提供Master Broker 和 Slave Broker之间的数据同步 功能。
Index Service根据特定的Message key对投递到Broker的消息进⾏索引服 务以提供消息的快速查询。 2 部署架构 RocketMQ ⽹络部署特点
NameServer是⼀个⼏乎⽆状态节点可集群部署节点之间⽆任何信息同步。 Broker部署相对复杂Broker分为Master与Slave⼀个Master可以对应多个 Slave但是⼀个Slave只能对应⼀个MasterMaster与Slave 的对应关系通过指定 相同的BrokerName不同的BrokerId 来定义BrokerId为0表示Master⾮0表示 Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建 ⽴⻓连接定时注册Topic信息到所有NameServer。 注意当前RocketMQ版本 在部署架构上⽀持⼀Master多Slave但只有BrokerId1的从服务器才会参与消 息的读负载。Producer与NameServer集群中的其中⼀个节点随机选择建⽴⻓连接定期 从NameServer获取Topic路由信息并向提供Topic 服务的Master建⽴⻓连接 且定时向Master发送⼼跳。Producer完全⽆状态可集群部署。Consumer与NameServer集群中的其中⼀个节点随机选择建⽴⻓连接定期 从NameServer获取Topic路由信息并向提供Topic服务的Master、Slave建⽴⻓ 连接且定时向Master、Slave发送⼼跳。Consumer既可以从Master订阅消息 也可以从Slave订阅消息消费者在向Master拉取消息时Master服务器会根据 拉取偏移量与最⼤偏移量的距离判断是否读⽼消息产⽣读I/O以及从服 务器是否可读等因素建议下⼀次是从Master还是Slave拉取。
结合部署架构图描述集群⼯作流程
启动NameServerNameServer起来后监听端⼝等待Broker、Producer、Consumer连上来相当于⼀个路由控制中⼼。Broker启动跟所有的NameServer保持⻓连接定时发送⼼跳包。⼼跳包中包 含当前Broker信息(IP端⼝等)以及存储所有Topic信息。注册成功后 NameServer集群中就有Topic跟Broker的映射关系。收发消息前先创建Topic创建Topic时需要指定该Topic要存储在哪些Broker 上也可以在发送消息时⾃动创建Topic。Producer发送消息启动时先跟NameServer集群中的其中⼀台建⽴⻓连接并 从NameServer中获取当前发送的Topic存在哪些Broker上轮询从队列列表中选 择⼀个队列然后与队列所在的Broker建⽴⻓连接从⽽向Broker发消息。Consumer跟Producer类似跟其中⼀台NameServer建⽴⻓连接获取当前订阅 Topic存在哪些Broker上然后直接跟Broker建⽴连接通道开始消费消息。
四、快速开始
1.下载RocketMQ
本教程使⽤的是RocketMQ4.7.1版本建议使⽤该版本进⾏之后的demo训练
运⾏版https://www.apache.org/dyn/closer.cgi?pathrocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
源码https://www.apache.org/dyn/closer.cgi?pathrocketmq/4.7.1/rocketmqall-4.7.1-source-release.zip
2.安装RocketMQ
准备⼀台装有Linux系统的虚拟机。本教程使⽤的是Ubuntu16.04版本。安装jdk上传jdk-8u191安装包并解压缩在 /usr/local/java ⽬录下。安装rocketmq上传rocketmq安装包并使⽤unzip命令解压缩在 /usr/local/rocketmq ⽬录下。配置jdk和rocketmq的环境变量
export JAVA_HOME/usr/local/java/jdk1.8.0_191
export JRE_HOME/usr/local/java/jdk1.8.0_191/jre
export ROCKETMQ_HOME/usr/local/rocketmq/rocketmq-all-4.7.1-binrelease
export CLASSPATH$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
export
PATH$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$ROCKETMQ_HOME/bin:$PATH:$HO
ME/bin注意RocketMQ的环境变量⽤来加载 ROCKETMQ_HOME/conf 下的配置⽂件 如果不配置则⽆法启动NameServer和Broker。 完成后执⾏命令让环境变量⽣效
source /etc/profile修改bin/runserver.sh⽂件由于RocketMQ默认设置的JVM内存为4G但虚拟机 ⼀般没有这么4G内存因此调整为512mb。
JAVA_OPT${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize128m -XX:MaxMetaspaceSize320m在runserver.sh⽂件中找到上⾯这段内容改为下⾯的参数。
JAVA_OPT${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize128m -XX:MaxMetaspaceSize320m3.启动NameServer
在上⼀章中介绍了RocketMQ的架构启动RocketMQ服务需要先启动NameServer。
在bin⽬录内使⽤静默⽅式启动。
nohup ./mqnamesrv 查看bin/nohup.out显示如下内容表示启动成功
rootubuntu:/usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin#
cat nohup.out
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young
collector with the CMS collector is deprecated and will likely be
removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning:
UseCMSCompactAtFullCollection is deprecated and will likely be
removed in a future release.
The Name Server boot success. serializeTypeJSON4.启动Broker
修改broker的JVM参数配置将默认8G内存修改为512m。修 改 bin/runbroker.sh ⽂件
JAVA_OPT${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m在 conf/broker.conf ⽂件中加⼊如下配置开启⾃动创建Topic功能
autoCreateTopicEnabletrue以静默⽅式启动broker
nohup ./mqbroker -n localhost:9876 查看 bin/nohup.out ⽇志显示如下内容表示启动成功
The broker[ubuntu, 172.17.0.1:10911] boot success.
serializeTypeJSON5.使⽤发送和接收消息验证MQ
配置nameserver的环境变量
在发送/接收消息之前需要告诉客户端nameserver的位置。配置环境变量
NAMESRV_ADDR (这里就是在安装了MQ的服务器配置环境变量并且把localhost换成服务器对应的ip)
export NAMESRV_ADDRlocalhost:9876使⽤bin/tools.sh⼯具验证消息的发送默认会发1000条消息
./tools.sh org.apache.rocketmq.example.quickstart.Producer发送的消息⽇志
...
SendResult [sendStatusSEND_OK,
msgIdFD154BA55A2B1008020C29FFFED6A0855CFC12A3A380885CB70A0235,
offsetMsgIdAC11000100002A9F000000000001F491,
messageQueueMessageQueue [topicTopicTest, brokerNameubuntu,
queueId0], queueOffset141]使⽤bin/tools.sh⼯具验证消息的接收
./tools.sh org.apache.rocketmq.example.quickstart.Consumer看到接收到的消息
...
ConsumeMessageThread_12 Receive New Messages: [MessageExt
[brokerNameubuntu, queueId1, storeSize227, queueOffset245,
sysFlag0, bornTimestamp1658892578234,
bornHost/172.16.253.100:48524, storeTimestamp1658892578235,
storeHost/172.17.0.1:10911,
msgIdAC11000100002A9F0000000000036654, commitLogOffset222804,
bodyCRC683694034, reconsumeTimes0, preparedTransactionOffset0,
toString()Message{topicTopicTest, flag0, properties
{MIN_OFFSET0, MAX_OFFSET250, CONSUME_START_TIME1658892813497,
UNIQ_KEYFD154BA55A2B1008020C29FFFED6A0855CFC12A3A380885CB9BA03D6,
CLUSTERDefaultCluster, WAITtrue, TAGSTagA}, body[72, 101, 108,
108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 50],
transactionIdnull}]]6.关闭服务器
关闭broker
./mqshutdown broker 1关闭nameserver
./mqshutdown namesrv五、搭建RocketMQ集群
1.RocketMQ集群模式
为了追求更好的性能RocketMQ的最佳实践⽅式都是在集群模式下完成。
RocketMQ官⽅提供了三种集群搭建⽅式。
2主2从异步通信⽅式
使⽤异步⽅式进⾏主从之间的数据复制吞吐量⼤但可能会丢消息。
使⽤ conf/2m-2s-async ⽂件夹内的配置⽂件做集群配置。
2主2从同步通信⽅式
使⽤同步⽅式进⾏主从之间的数据复制保证消息安全投递不会丢失但影响吞 吐量
使⽤ conf/2m-2s-sync ⽂件夹内的配置⽂件做集群配置。
2主⽆从⽅式
会存在单点故障且读的性能没有前两种⽅式好。
使⽤ conf/2m-noslave ⽂件夹内的配置⽂件做集群配置。
Dledger⾼可⽤集群
上述三种官⽅提供的集群没办法实现⾼可⽤即在master节点挂掉后slave节点没 办法⾃动被选举为新的master⽽需要⼈⼯实现。
RocketMQ在4.5版本之后引⼊了第三⽅的Dleger⾼可⽤集群。
2.搭建主从异步集群
1准备三台Linux服务器
三台Linux服务器中nameserver和broker之间的关系如下 三台服务器都需要安装jdk和rocketmq安装步骤参考上⼀章节。
注意记得修改每一台的环境变量
2启动三台nameserver
nameserver是⼀个轻量级的注册中⼼broker把⾃⼰的信息注册到nameserver上。 ⽽且nameserver是⽆状态的直接启动即可。三台nameserver之间不需要通信 ⽽是被请求⽅来关联三台nameserver的地址。
修改三台服务器的的runserver.sh⽂件修改JVM内存默认的4g为512m。
JAVA_OPT${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize128m -XX:MaxMetaspaceSize320m-Xms512m -Xmx512m -Xmn256m 这里的三个参数分别是指最小堆内存最大堆内存年轻代堆内存。 在每台服务器的bin⽬录下执⾏如下命令
服务器1:
nohup ./mqnamesrv -n 172.16.253.103:9876 服务器2:
nohup ./mqnamesrv -n 172.16.253.101:9876 服务器3:
nohup ./mqnamesrv -n 172.16.253.102:9876 3)配置broker
broker-a,broker-b-s这两台broker是配置在服务器2上
broker-b,broker-a-s这两台 broker是配置在服务器3上。
这两对主从节点在不同的服务器上服务器1上没有部 署broker。
需要修改每台broker的配置⽂件。注意同⼀台服务器上的两个broker保存路径不 能⼀样。
broker-a的master节点
在服务器2上进⼊到conf/2m-2s-async⽂件夹内修改broker-a.properties⽂件。
# 所属集群名称
brokerClusterNameDefaultCluster
# broker名字
brokerNamebroker-a
# broker所在服务器的ip
brokerIP1172.16.253.101
# broker的id0表示master0表示slave
brokerId0
# 删除⽂件时间点默认在凌晨4点
deleteWhen04
# ⽂件保留时间为48⼩时
fileReservedTime48
# broker的⻆⾊为master
brokerRoleASYNC_MASTER
# 使⽤异步刷盘的⽅式
flushDiskTypeASYNC_FLUSH
# 名称服务器的地址列表
namesrvAddr172.16.253.103:9876;172.16.253.101:9876;172.16.253.102
:9876
# 在发送消息⾃动创建不存在的topic时默认创建的队列数为4个
defaultTopicQueueNums4
# 是否允许 Broker ⾃动创建Topic建议线下开启线上关闭
autoCreateTopicEnabletrue
# 是否允许 Broker ⾃动创建订阅组建议线下开启线上关闭
autoCreateSubscriptionGrouptrue
# broker对外服务的监听端⼝
listenPort10911
# abort⽂件存储路径
abortFile/usr/local/rocketmq/store/abort
# 消息存储路径
storePathRootDir/usr/local/rocketmq/store
# commitLog存储路径
storePathCommitLog/usr/local/rocketmq/store/commitlog
# 消费队列存储路径
storePathConsumeQueue/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex/usr/local/rocketmq/store/index
# checkpoint⽂件存储路径
storeCheckpoint/usr/local/rocketmq/store/checkpoint
# 限制的消息⼤⼩
maxMessageSize65536
# commitLog每个⽂件的⼤⼩默认1G
mapedFileSizeCommitLog1073741824
# ConsumeQueue每个⽂件默认存30W条根据业务情况调整
mapedFileSizeConsumeQueue300000broker-a的slave节点
在服务器3上进⼊到conf/2m-2s-async⽂件夹内修改broker-a-s.properties⽂件。
brokerClusterNameDefaultCluster
brokerNamebroker-a
brokerIP1172.16.253.102
brokerId1
deleteWhen04
fileReservedTime48
brokerRoleSLAVE
flushDiskTypeASYNC_FLUSH
namesrvAddr172.16.253.103:9876;172.16.253.101:9876;172.16.253.102
:9876
defaultTopicQueueNums4
autoCreateTopicEnabletrue
autoCreateSubscriptionGrouptrue
listenPort11011
abortFile/usr/local/rocketmq/store-slave/abort
storePathRootDir/usr/local/rocketmq/store-slave
storePathCommitLog/usr/local/rocketmq/store-slave/commitlog
storePathConsumeQueue/usr/local/rocketmq/store-slave/consumequeue
storePathIndex/usr/local/rocketmq/store-slave/index
storeCheckpoint/usr/local/rocketmq/store-slave/checkpoint
maxMessageSize65536broker-b的master节点
**在服务器3上**进⼊到conf/2m-2s-async⽂件夹内修改broker-b.properties⽂件。
brokerClusterNameDefaultCluster
brokerNamebroker-b
brokerIP1172.16.253.102
brokerId0
deleteWhen04
fileReservedTime48
brokerRoleASYNC_MASTER
flushDiskTypeASYNC_FLUSH
namesrvAddr172.16.253.103:9876;172.16.253.101:9876;172.16.253.102:9876
defaultTopicQueueNums4
autoCreateTopicEnabletrue
autoCreateSubscriptionGrouptrue
listenPort10911
abortFile/usr/local/rocketmq/store/abort
storePathRootDir/usr/local/rocketmq/store
storePathCommitLog/usr/local/rocketmq/store/commitlog
storePathConsumeQueue/usr/local/rocketmq/store/consumequeue
storePathIndex/usr/local/rocketmq/store/index
storeCheckpoint/usr/local/rocketmq/store/checkpoint
maxMessageSize65536broker-b的slave节点
在服务器2上进⼊到conf/2m-2s-async⽂件夹内修改broker-b-s.properties⽂件。
brokerClusterNameDefaultCluster
brokerNamebroker-b
brokerIP1172.16.253.101
brokerId1
deleteWhen04
fileReservedTime48
brokerRoleSLAVE
flushDiskTypeASYNC_FLUSH
namesrvAddr172.16.253.103:9876;172.16.253.101:9876;172.16.253.102:9876
defaultTopicQueueNums4
autoCreateTopicEnabletrue
autoCreateSubscriptionGrouptrue
listenPort11011
abortFile/usr/local/rocketmq/store-slave/abort
storePathRootDir/usr/local/rocketmq/store-slave
storePathCommitLog/usr/local/rocketmq/store-slave/commitlog
storePathConsumeQueue/usr/local/rocketmq/store-slave/consumequeue
storePathIndex/usr/local/rocketmq/store-slave/index
storeCheckpoint/usr/local/rocketmq/store-slave/checkpoint
maxMessageSize65536修改服务器2和服务器3的runbroker.sh⽂件
修改JVM内存默认的8g为512m。
JAVA_OPT${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m 4启动broker
在服务器2中启动broker-amaster和broker-b-sslave
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties 在服务器3中启动broker-bmaster,broker-a-sslave
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties 3.验证集群
使⽤RocketMQ提供的tools⼯具验证集群是否正常⼯作。
在服务器2上配置环境变量
⽤于被tools中的⽣产者和消费者程序读取该变量。
export
NAMESRV_ADDR172.16.253.103:9876;172.16.253.101:9876;172.16.253.102:9876启动⽣产者
./tools.sh org.apache.rocketmq.example.quickstart.Producer 1./tools.sh org.apache.rocketmq.example.quickstart.Producer 启动消费者
./tools.sh org.apache.rocketmq.example.quickstart.Consumer4.安装可视化管理控制平台
RocketMQ没有提供可视化管理控制平台可以使⽤第三⽅管理控制平台https://github.com/apache/rocketmq-externals/tree/rocketmq-console-1.0.0/rocketmq-console
下载管理控制平台解压缩在linux服务器上
可以安装在服务器1上
给服务器安装maven环境
apt install maven修改 rocketmq-externals/rocketmq-externals-master/rocketmq console/src/main/resources/application.properties 配置⽂件中的 nameserver地址
rocketmq.config.namesrvAddr172.16.253.103:9876;172.16.253.101:9876;172.16.253.102:9876回到 rocketmq-externals/rocketmq-externals-master/rocketmq console 路径下执⾏maven命令进⾏打包
mvn clean package -Dmaven.test.skiptrue运⾏jar包。进⼊到 rocketmq-externals/rocketmq-externals master/rocketmq-console/target ⽬录内执⾏如下命令
nohup java -jar rocketmq-console-ng-1.0.1.jar访问所在服务器的8080端⼝查看集群界⾯可以看到之前部署的集群 六、消息示例
在掌握RocketMQ的基本实现逻辑之后接下来通过Java程序来学习RocketMQ的多种消息示例它们拥有各⾃的应⽤场景。
1.构建Java基础环境
在maven项⽬中构建出RocketMQ消息示例的基础环境即创建⽣产者程序和消费者程序。通过⽣产者和消费者了解RocketMQ操作消息的原⽣API。 引⼊依赖
dependenciesdependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.7.1/version/dependency/dependencies编写⽣产者程序
/*** author Thor* 公众号 Java架构栈*/
public class SyncProducer {public static void main(String[] args) throwsMQClientException, UnsupportedEncodingException,RemotingException, InterruptedException, MQBrokerException {//Instantiate with a producer group name.DefaultMQProducer producer new DefaultMQProducer(producerGroup1);// Specify name server addresses.producer.setNamesrvAddr(172.16.253.101:9876);//Launch the instance.producer.start();for (int i 0; i 100; i) {//Create a message instance, specifying topic, tag and message body.Message msg new Message(TopicTest /* Topic */,TagA /* Tag */,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);//Call send message to deliver message to one of brokers.SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}编写消费者程序
package com.qf.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** author Thor* 公众号 Java架构栈*/
public class MyConsumer {public static void main(String[] args) throwsMQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer new DefaultMQPushConsumer(please_rename_unique_group_name);// Specify name server addresses.consumer.setNamesrvAddr(172.16.253.101:9876);// Subscribe one more more topics to consume.consumer.subscribe(TopicTest, *);// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatusconsumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {System.out.printf(%s Receive New Messages: %s % n, Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf(Consumer Started.%n);}
}启动消费者和⽣产者验证消息的收发。 2.简单消息示例
简单消息分成三种同步消息、异步消息、单向消息。
同步消息
⽣产者发送消息后必须等待broker返回信息后才继续之后的业务逻辑在broker 返回信息之前⽣产者阻塞等待。
package com.qf.producer.simple;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;/*** author Thor* 公众号 Java架构栈*/
public class SyncProducer {public static void main(String[] args) throwsMQClientException, UnsupportedEncodingException,RemotingException, InterruptedException, MQBrokerException {//Instantiate with a producer group name.DefaultMQProducer producer new DefaultMQProducer(producerGroup1);// Specify name server addresses.producer.setNamesrvAddr(172.16.253.101:9876);//Launch the instance.producer.start();for (int i 0; i 100; i) {//Create a message instance, specifying topic, tag andmessage body.Message msg new Message(TopicTest /* Topic */,TagA /* Tag */,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);//Call send message to deliver message to one ofbrokers.SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);}//Shut down once the producer instance is not longer inuse.producer.shutdown();}
}同步消息的应⽤场景如重要通知消息、短信通知、短信营销系统等。
异步消息
⽣产者发完消息后不需要等待broker的回信可以直接执⾏之后的业务逻辑。⽣ 产者提供⼀个回调函数供broker调⽤体现了异步的⽅式。
package com.qf.producer.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** 异步消息** author Thor* 公众号 Java架构栈*/
public class AsyncProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);// Specify name server addresses.producer.setNamesrvAddr(172.16.253.101:9876);//Launch the instance.producer.start();//设置消息生产者在异步发送消息失败时的重试次数producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount 100;//这里是计数器代码会在 countDownLatch.await(5, TimeUnit.SECONDS);阻塞或者直到超时代码才会继续执行final CountDownLatch countDownLatch newCountDownLatch(messageCount);for (int i 0; i messageCount; i) {try {final int index i;Message msg new Message(Jodie_topic_1023, TagA, OrderID188, Helloworld.getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf(%-10d OK %s %n, index,sendResult.getMsgId());}Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf(%-10d Exception %s %n, index, e);e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}//代码在此处阻塞countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();}
}异步传输⼀般⽤于响应时间敏感的业务场景。
单向消息
⽣产者发送完消息后不需要等待任何回复直接进⾏之后的业务逻辑单向传输⽤ 于需要中等可靠性的情况例如⽇志收集。
package com.qf.producer.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;/*** 单向消息** author Thor* 公众号 Java架构栈*/
public class OnewayProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);// Specify name server addresses.producer.setNamesrvAddr(172.16.253.101:9876);//Launch the instance.producer.start();for (int i 0; i 100; i) {//Create a message instance, specifying topic, tag andmessage body.Message msg new Message(TopicTest /* Topic */,TagA /* Tag */,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);//Call send message to deliver message to one ofbrokers.producer.sendOneway(msg);}//Wait for sending to completeThread.sleep(5000);producer.shutdown();}
}3.顺序消息
顺序消息指的是消费者消费消息的顺序按照发送者发送消息的顺序执⾏。顺序消息 分成两种局部顺序和全局顺序。
局部顺序
局部消息指的是消费者消费某个topic的某个队列中的消息是顺序的。消费者使⽤ MessageListenerOrderly类做消息监听实现局部顺序。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.client.exception.MQClientException;public class OrderConsumer {public static void main(String[] args) throws MQClientException {// 创建一个消费者实例指定消费者组名DefaultMQPushConsumer consumer new DefaultMQPushConsumer(example_group_name);// 设置从最早的消息开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 订阅一个主题* 表示订阅该主题下的所有标签consumer.subscribe(OrderTopicTest, *);// 注册消息监听器处理顺序消息consumer.registerMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext context) {// 设置自动提交消费进度context.setAutoCommit(true);// 遍历接收到的消息列表for (MessageExt msg : msgs) {// 打印消息内容System.out.println(消息内容 new String(msg.getBody()));}// 返回消费状态表示消息已成功消费return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf(Consumer Started.%n);}
}全局顺序
消费者消费全部消息都是顺序的只能通过⼀个某个topic只有⼀个队列才能实现 这种应⽤场景较少且性能较差。
乱序消费
消费者消费消息不需要关注消息的顺序。消费者使⽤MessageListenerConcurrently 类做消息监听。
package com.qf.producer.order;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;
import java.util.concurrent.atomic.AtomicLong;/*** 顺序消息** author Thor* 公众号 Java架构栈*/
public class OrderConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(example_group_name);//设置从最早的消息开始消费(通过设置偏移量实现)consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(OrderTopicTest, *);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println(消息内容 newString(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf(Consumer Started.%n);}
}4.⼴播消息
⼴播是向主题topic的所有订阅者发送消息。订阅同⼀个topic的多个消费者能全量收到⽣产者发送的所有消息。
消费者
package com.qf.producer.broadcast;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;/*** ⼴播消息* author Thor* 公众号 Java架构栈*/
public class BroadcastConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(example_group_name);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//设置消费者的消费模式为广播模式Broadcastingconsumer.setMessageModel(MessageModel.BROADCASTING);consumer.subscribe(TopicTest, *);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println(消息内容 newString(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf(Broadcast Consumer Started.%n);}
}⽣产者
package com.qf.producer.broadcast;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;/*** author Thor* 公众号 Java架构栈*/
public class BroadcastProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName);producer.start();for (int i 0; i 100; i) {Message msg new Message(TopicTest, // 消息所属的主题TagA, // 消息的标签OrderID188, // 消息的键 (Hello worldi).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);}producer.shutdown();}
}5.延迟消息
延迟消息与普通消息的不同之处在于它们要等到指定的时间之后才会被传递。
消息⽣产者
public class ScheduledProducer {public static void main(String[] args) throws Exception {// Instantiate a producer to send scheduled messagesDefaultMQProducer producer new DefaultMQProducer(ExampleProducerGroup);// Launch producerproducer.start();int totalMessagesToSend 100;for (int i 0; i totalMessagesToSend; i) {Message message new Message(TestTopic, (Hello scheduled message i).getBytes());// 设置消息延时十秒消费message.setDelayTimeLevel(3);// Send the messageproducer.send(message);}producer.shutdown();}
}RocketMQ设计了18个延迟等级分别是 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 等级3对应的是10s。系统为这18个等级配置了18个topic⽤于实现延迟队列的效 果。 消息消费者
package com.qf.producer.scheduled;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** author Thor* 公众号 Java架构栈*/
public class ScheduledConsumer {public static void main(String[] args) throws MQClientException {// Instantiate message consumerDefaultMQPushConsumer consumer new DefaultMQPushConsumer(ExampleConsumer);// Subscribe topicsconsumer.subscribe(TestTopic, *);// Register message listenerconsumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// Print approximate delay time periodSystem.out.println(Receive message[msgId message.getMsgId() ] (System.currentTimeMillis() - message.getStoreTimestamp()) ms later);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// Launch consumerconsumer.start();}
}在商业版RocketMQ中不仅可以设置延迟等级还可以设置具体的延迟时间但是 在社区版RocketMQ中只能设置延迟等级。
6.批量消息
批量发送消息提⾼了传递⼩消息的性能。
使⽤批量消息
package com.qf.producer.batch;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
/*** 批量消息* author Thor* 公众号 Java架构栈*/
public class BatchProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName);producer.start();String topic BatchTest;ListMessage messages new ArrayList();//参数分别是消息主题、消息标签、消息的键、消息内容messages.add(new Message(topic, TagA, OrderID001, Hello world 0.getBytes()));messages.add(new Message(topic, TagA, OrderID002, Hello world 1.getBytes()));messages.add(new Message(topic, TagA, OrderID003, Hello world 2.getBytes()));producer.send(messages);producer.shutdown();}
}超出限制的批量消息
官⽅建议批量消息的总⼤⼩不应超过1m实际不应超过4m。如果超过4m的批量消 息需要进⾏分批处理同时设置broker的配置参数为4m在broker的配置⽂件中修改 maxMessageSize4194304
public class MaxBatchProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(BatchProducerGroupName);producer.start();String topic BatchTest;ListMessage messages new ArrayList(100);for (int i 0; i 100; i) {messages.add(new Message(topic, Tag, OrderID i, (Hello world i).getBytes()));}// producer.send(messages);//split the large batch into small ones:ListSplitter splitter new ListSplitter(messages);while (splitter.hasNext()) {ListMessage listItem splitter.next();producer.send(listItem);}producer.shutdown();}
}使用 ListSplitter 将大批次的消息拆分成多个小批次。
通过 while 循环遍历每个小批次并调用 producer.send(listItem) 发送每个小批次的消息。
ListSplitter
package com.qf.producer.batch;import org.apache.rocketmq.common.message.Message;import java.util.Iterator;
import java.util.List;
import java.util.Map;/*** 批量消息** author Thor* 公众号 Java架构栈*/
public class ListSplitter implements IteratorListMessage {private int sizeLimit 1000 * 1000;private final ListMessage messages;private int currIndex;public ListSplitter(ListMessage messages) {this.messages messages;}Overridepublic boolean hasNext() {return currIndex messages.size();}Overridepublic ListMessage next() {int nextIndex currIndex;int totalSize 0;for (; nextIndex messages.size(); nextIndex) {Message message messages.get(nextIndex);int tmpSize message.getTopic().length() message.getBody().length;MapString, String properties message.getProperties();for (Map.EntryString, String entry : properties.entrySet()) {tmpSize entry.getKey().length() entry.getValue().length();}tmpSize tmpSize 20; //for log overheadif (tmpSize sizeLimit) {//it is unexpected that single message exceeds the sizeLimit//here just let it go, otherwise it will block the splitting processif (nextIndex - currIndex 0) {nextIndex;}break;}if (tmpSize totalSize sizeLimit) {break;} else {totalSize tmpSize;}}ListMessage subList messages.subList(currIndex, nextIndex);currIndex nextIndex;return subList;}
}使⽤限制
同⼀批次的消息应该具有相同的主题、相同的 waitStoreMsgOK 并且不⽀持延迟消息和事务消息
waitStoreMsgOK 参数的含义
true生产者在发送消息后会等待消息存储成功的确认。这意味着生产者会确保消息已经成功存储在 Broker 上然后再返回发送结果。这种模式更加可靠但可能会增加消息发送的延迟。 false生产者在发送消息后不会等待消息存储成功的确认。这意味着生产者会立即返回发送结果而不关心消息是否已经成功存储在 Broker 上。这种模式性能更高但可靠性较低。
7.过滤消息
在⼤多数情况下标签是⼀种简单⽽有⽤的设计可以⽤来选择您想要的消息。
tag过滤的⽣产者
package com.qf.rocketmq.filter;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;/*** author Thor* 公众号 Java架构栈*/
public class TagProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);producer.start();String[] tags new String[]{TagA, TagB, TagC};for (int i 0; i 15; i) {Message msg new Message(TagFilterTest, tags[i % tags.length],Hello world.getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);}producer.shutdown();}
}tag过滤的消费者
package com.qf.rocketmq.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** author Thor* 公众号 Java架构栈*/
public class TagConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(please_rename_unique_group_name);//这里与之前相比较就是多了个管道符的过滤指定只接收TagA 或者是TagC的消息consumer.subscribe(TagFilterTest, TagA || TagC);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf(Consumer Started.%n);}
}消费者将收到包含 TAGA 或 TAGB 或 TAGC 的消息。但是限制是⼀条消息只能有⼀个标签这可能不适⽤于复杂的场景。在这种情况下您可以使⽤ SQL 表达式来过滤掉消息。
使⽤SQL过滤
SQL 功能可以通过您在发送消息时输⼊的属性进⾏⼀些计算。在 RocketMQ 定义的语法下可以实现⼀些有趣的逻辑。这是⼀个例⼦ 语法
RocketMQ 只定义了⼀些基本的语法来⽀持这个特性也可以轻松扩展它。
1. 数值⽐较如, , , , BETWEEN, ;
2. 字符⽐较如, , IN;
3. IS NULL或IS NOT NULL
4. 逻辑AND, OR, NOT;常量类型有
1. 数字如 123、3.1415
2. 字符如abc必须⽤单引号
3. NULL特殊常数
4. 布尔值TRUE或FALSE**使⽤注意**只有推模式的消费者可以使⽤SQL过滤。拉模式是⽤不了的。
SQL过滤的⽣产者示例
package com.qf.rocketmq.filter;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;/*** author Thor* 公众号 Java架构栈*/
public class SQLProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(please_rename_unique_group_name);producer.start();String[] tags new String[]{TagA, TagB, TagC};for (int i 0; i 15; i) {Message msg new Message(SqlFilterTest, tags[i % tags.length],(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 给消息设置用户属性键值对msg.putUserProperty(a, String.valueOf(i));SendResult sendResult producer.send(msg);System.out.printf(%s%n, sendResult);}producer.shutdown();}
} SQL过滤的消费者示例Don’t forget to set enablePropertyFiltertrue in broker
package com.qf.rocketmq.filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/*** author Thor* 公众号 Java架构栈*/
public class SQLConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(please_rename_unique_group_name);// Dont forget to set enablePropertyFiltertrue in brokerconsumer.subscribe(SqlFilterTest, MessageSelector.bySql((TAGS is not null and TAGS in (TagA, TagB)) and (a is not null and a between 0 and 3)));consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf(Consumer Started.%n);}
}8.事务消息
事务消息的定义
它可以被认为是⼀个两阶段的提交消息实现以确保分布式系统的最终⼀致性。事务性消息确保本地事务的执⾏和消息的发送可以原⼦地执⾏。
事务消息有三种状态
a.TransactionStatus.CommitTransaction提交事务表示允许消费者消费该消息。
b.TransactionStatus.RollbackTransaction回滚事务表示该消息将被删除不允许消费。
c.TransactionStatus.Unknown中间状态表示需要MQ回查才能确定状态。
事务消息的实现流程 ⽣产者
package com.qf.rocketmq.transaction;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;/*** author Thor* 公众号 Java架构栈*/
public class TransactionProducer {public static void main(String[] args) throws Exception {//创建生产者的事务监听器TransactionListener transactionListener new TransactionListenerImpl();TransactionMQProducer producer new TransactionMQProducer(please_rename_unique_group_name);producer.setNamesrvAddr(172.16.253.101:9876);//创建一个线程池ExecutorService executorService new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueueRunnable(2000), new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {Thread thread new Thread(r);thread.setName(client-transaction-msg-check-thread);return thread;}});//为生产者设置线程池producer.setExecutorService(executorService);//为生产者注入事务监听器producer.setTransactionListener(transactionListener);producer.start();String[] tags new String[] {TagA, TagB, TagC, TagD, TagE};for (int i 0; i 10; i) {try {Message msg new Message(TopicTest, tags[i % tags.length], KEY i,(Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET));//发送事务消息会与transactionListener进行配对SendResult sendResult producer.sendMessageInTransaction(msg, null);System.out.printf(%s%n, sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i 0; i 100000; i) {Thread.sleep(1000);}producer.shutdown();}
}
本地事务处理-TransactionListener
package com.qf.rocketmq.transaction;import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;/*** author Thor* 公众号 Java架构栈*/
public class TransactionListenerImpl implements TransactionListener {/*** 执行本地事务根据标签来决定消息事务的状态** param msg Half(prepare) message* param arg Custom business parameter* return Transaction state*/Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {String tags msg.getTags();if(StringUtils.contains(tags,TagA)){return LocalTransactionState.COMMIT_MESSAGE;}else if(StringUtils.contains(tags,TagB)){return LocalTransactionState.ROLLBACK_MESSAGE;}else{//中间状态表示需要MQ回查才能确定状态。执行下面的checkLocalTransaction方法return LocalTransactionState.UNKNOW;}}/*** 检查本地事务* method will be invoked to get local transaction status.** param msg Check message* return Transaction state*/Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {String tags msg.getTags();if(StringUtils.contains(tags,TagC)){return LocalTransactionState.COMMIT_MESSAGE;}else if(StringUtils.contains(tags,TagD)){return LocalTransactionState.ROLLBACK_MESSAGE;}else{//对于TagE又会变成UNKNOW在这里就会对TagE再次回查回查超过15次消息就会丢弃return LocalTransactionState.UNKNOW;}}
}消费者
package com.qf.rocketmq.transaction;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** author Thor* 公众号 Java架构栈*/
public class TransactionConsumer {public static void main(String[] args) throws MQClientException {//1.创建消费者对象DefaultMQPushConsumer consumer new DefaultMQPushConsumer(my-consumer-group1);//2.指明nameserver的地址consumer.setNamesrvAddr(172.16.253.101:9876);//3.订阅主题:topic 和过滤消息用的tag表达式consumer.subscribe(TopicTest,*);//4.创建一个监听器当broker把消息推过来时调用consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
// System.out.println(收到的消息new String(msg.getBody()));System.out.println(收到的消息msg);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumer.start();System.out.println(消费者已启动);}
}使⽤限制
事务性消息没有调度和批处理⽀持。为避免单条消息被检查次数过多导致半队列消息堆积我们默认将单条 消息的检查次数限制为15次但⽤户可以通过更改“transactionCheckMax”来 更改此限制”参数在broker的配置中如果⼀条消息的检查次数超过 “transactionCheckMax”次broker默认会丢弃这条消息同时打印错误⽇ 志。⽤户可以通过重写“AbstractTransactionCheckListener”类来改变这种⾏ 为。事务消息将在⼀定时间后检查该时间由代理配置中的参数“transactionTimeout”确定。并且⽤户也可以在发送事务消息时通过设置⽤ 户属性“CHECK_IMMUNITY_TIME_IN_SECONDS”来改变这个限制这个参数 优先于“transactionMsgTimeout”参数。⼀个事务性消息可能会被检查或消费不⽌⼀次。提交给⽤户⽬标主题的消息reput可能会失败。⽬前它取决于⽇志记录。 ⾼可⽤是由 RocketMQ 本身的⾼可⽤机制来保证的。如果要保证事务消息不丢失保证事务完整性推荐使⽤同步双写机制。事务性消息的⽣产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型 的消息不同事务性消息允许向后查询。MQ 服务器通过其⽣产者 ID 查询客户端。
六、SpringBoot整合RocketMQ
Springboot提供了快捷操作RocketMQ的RocketMQTemplate对象。
1.引⼊依赖
注意依赖的版本需要和RocketMQ的版本相同。
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.1.1/version/dependency2.编写配置⽂件
# 应⽤名称
spring.application.namemy-boot-rocketmq-demo
# 应⽤服务 WEB 访问端⼝
server.port8080
# nameserver地址
rocketmq.name-server172.16.253.101:9876
# 配置⽣产者组
rocketmq.producer.groupmy-producer-boot-group3.编写⽣产者发送普通消息
package com.qf.my.boot.producer.demo.producer;import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** author Thor* 公众号 Java架构栈*/
Component
public class MyProducer {Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic,String message){rocketMQTemplate.convertAndSend(topic,message);}}4.编写JUnit单元测试发送消息
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
SpringBootTest
class MySpringCloudRocketmqProducerApplicationTests {Autowiredprivate MyProducer producer;Testvoid testSendMessage(){String topic my-boot-topic;String message hello spring boot rocketmq;producer.sendMessage(message,topic);}
}
5.创建消费者程序
package com.qf.my.boot.consumer.demo.consumer;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** author Thor* 公众号 Java架构栈*/
Component
RocketMQMessageListener(consumerGroup my-boot-consumer-group,topic my-boot-topic)
public class MyConsumer implements RocketMQListenerString {Overridepublic void onMessage(String message) {System.out.println(收到的消息:message);}
}6.发送事务消息 import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** author Thor* 公众号 Java架构栈*/
Component
public class MyProducer {Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送事务消息* param topic* param msg* throws InterruptedException*/public void sendMessageInTransaction(String topic,String msg) throws InterruptedException {String[] tags new String[]{TagA,TagB,TagC,TagD,TagE};for (int i 0; i 10; i) {MessageString message MessageBuilder.withPayload(msg).build();//topic和tag整合在一起String destination topic:tags[ i % tags.length];//第一个destination是消息要发送的目的地topic第二个destination消息携带的业务数据,可以通过业务数据对消息进行过滤这里因为destination已经包含了tags把tags当成业务数据可以用tags进行过滤TransactionSendResult sendResult rocketMQTemplate.sendMessageInTransaction(destination, message, destination);System.out.println(sendResult);Thread.sleep(10);}}
}编写事务监听器类
package com.qf.my.boot.producer.demo.listener;import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.StringMessageConverter;/*** author Thor* 公众号 Java架构栈*/
//找到对应发送消息的bean
RocketMQTransactionListener(rocketMQTemplateBeanName rocketMQTemplate)
public class MyTransactionListener implements RocketMQLocalTransactionListener {/*** 执行本地事务* param msg* param arg 业务参数* return*/Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { //业务参数String destination (String) arg;//把spring的message转换成Rocketmq的messageorg.apache.rocketmq.common.message.Message message RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),utf-8,destination,msg);//得到message上的tag的内容String tags message.getTags();if(StringUtils.contains(tags,TagA)){return RocketMQLocalTransactionState.COMMIT;}else if(StringUtils.contains(tags,TagB)){return RocketMQLocalTransactionState.ROLLBACK;}else{return RocketMQLocalTransactionState.UNKNOWN;}}Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {return null;}
}编写单元测试发送事务消息
package com.qf.my.boot.producer.demo;import com.qf.my.boot.producer.demo.producer.MyProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;SpringBootTest
class MyBootProducerDemoApplicationTests {Autowiredprivate MyProducer producer;Testvoid testSendMessageInTransaction() throws InterruptedException {String topic my-boot-topic;String message hello rocket mq transaction springboot message;producer.sendMessageInTransaction(topic,message);System.out.println(事务消息发送成功);}
}至于消费者就和之前的写法一样。
七、Spring Cloud Stream整合RocketMQ
1.Spring Cloud Stream介绍
Spring Cloud Stream 是⼀个框架⽤于构建与共享消息系统连接的⾼度可扩展的事 件驱动微服务。
该框架提供了⼀个灵活的编程模型该模型基于已经建⽴和熟悉的 Spring 习惯⽤法 和最佳实践包括对持久 pub/sub 语义、消费者组和有状态分区的⽀持。 Spring Cloud Stream 的核⼼构建块是
Destination Binders负责提供与外部消息传递系统集成的组件。Destination Bindings外部消息系统和最终⽤户提供的应⽤程序代码⽣产者/ 消费者之间的桥梁。Message⽣产者和消费者⽤来与⽬标绑定器以及通过外部消息系统的其他 应⽤程序进⾏通信的规范数据结构。主要就是屏蔽不同的消息中间件并对此进行兼容。
2.编写⽣产者
引⼊依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.qf/groupIdartifactIdmy-spring-cloud-rocketmq-producer/artifactIdversion0.0.1-SNAPSHOT/versionnamemy-spring-cloud-rocketmq-producer/namedescriptionDemo project for Spring Boot/descriptionpropertiesjava.version1.8/java.versionproject.build.sourceEncodingUTF-8/project.build.sourceEncodingproject.reporting.outputEncodingUTF-8/project.reporting.outputEncodingspring-boot.version2.3.7.RELEASE/spring-boot.versionspring-cloud-alibaba.version2.2.2.RELEASE/spring-cloud-alibaba.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-stream-rocketmq/artifactIdexclusionsexclusiongroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactId/exclusionexclusiongroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-acl/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.7.1/version/dependencydependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-acl/artifactIdversion4.7.1/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scopeexclusionsexclusiongroupIdorg.junit.vintage/groupIdartifactIdjunit-vintage-engine/artifactId/exclusion/exclusions/dependency/dependenciesdependencyManagementdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-dependencies/artifactIdversion${spring-boot.version}/versiontypepom/typescopeimport/scope/dependencydependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-alibaba-dependencies/artifactIdversion${spring-cloud-alibaba.version}/versiontypepom/typescopeimport/scope/dependency/dependencies/dependencyManagementbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.8.1/versionconfigurationsource1.8/sourcetarget1.8/targetencodingUTF-8/encoding/configuration/pluginplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdversion2.3.7.RELEASE/versionconfigurationmainClasscom.qf.my.spring.cloud.rocketmq.producer.MySpringCloudRocketmqProducerApplication/mainClass/configurationexecutionsexecutionidrepackage/idgoalsgoalrepackage/goal/goals/execution/executions/plugin/plugins/build/project注意Rocket官⽅维护的spring-cloud-stream依赖中rocket⽤的版本为4.4需要排除 后加⼊4.7.1的依赖。
编写配置⽂件
# 应⽤名称
spring.application.namemy-s-rocketmq-demo
# 应⽤服务 WEB 访问端⼝
server.port8080
# output ⽣产者的目的地topic
spring.cloud.stream.bindings.output.destinationTopicTest
# 配置rocketMQ
spring.cloud.stream.rocketmq.binder.name-server172.16.253.101:9876启动类上打上注解
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;//该注解会拿到配置的output的topic
EnableBinding(Source.class)
SpringBootApplication
public class MySpringCloudRocketmqProducerApplication {public static void main(String[] args) {SpringApplication.run(MySpringCloudRocketmqProducerApplication.class, args);}
}其中 EnableBinding(Source.class) 指向配置⽂件的output参数。
编写⽣产者程序
package com.qf.my.spring.cloud.rocketmq.producer;import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import sun.misc.Contended;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;/*** author Thor* 公众号 Java架构栈*/
Component
public class MyProducer {Resourceprivate Source source;public void sendMessage(String msg){//封装消息头MapString, Object headers new HashMap();headers.put(MessageConst.PROPERTY_TAGS,TagA);MessageHeaders messageHeaders new MessageHeaders(headers);//创建消息对象MessageString message MessageBuilder.createMessage(msg, messageHeaders);//发送消息到配置文件的目的地topicsource.output().send(message);}}编写单元测试发送消息
package com.qf.my.spring.cloud.rocketmq.producer;import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;SpringBootTest
class MySpringCloudRocketmqProducerApplicationTests {Autowiredprivate MyProducer producer;Testvoid testSendMessage(){producer.sendMessage(hello spring cloud stream message);}}2.编写消费者
引⼊依赖
与⽣产者相同
编写配置⽂件
# 应⽤名称
spring.application.namemy-s-rocketmq-demo
# 应⽤服务 WEB 访问端⼝
server.port8081
# input 消费者指定需要消费的topic
spring.cloud.stream.bindings.input.destinationTopicTest
spring.cloud.stream.bindings.input.groupspring-cloud-strema-group
# 配置rocketMQ
spring.cloud.stream.rocketmq.binder.name-server172.16.253.101:9876启动类上打上注解
package com.qf.my.spring.cloud.rocketmq.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;//针对于消费者读取目的地
EnableBinding(Sink.class)
SpringBootApplication
public class MySpringCloudRocketmqConsumerApplication {public static void main(String[] args) {SpringApplication.run(MySpringCloudRocketmqConsumerApplication.class, args);}
}其中 EnableBinding(Sink.class) 指向配置⽂件的input参数。
编写消费者程序
package com.qf.my.spring.cloud.rocketmq.consumer;import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;/*** author Thor* 公众号 Java架构栈*/
Component
public class MyConsumer {//直接可以找到配置文件指定的topic进行消费StreamListener(Sink.INPUT)public void processMessage(String message){System.out.println(收到的消息message);}
}⼋、RocketMQ核⼼概念
1.消息模型Message Model
RocketMQ主要由 Producer、Broker、Consumer 三部分组成其中Producer 负责⽣ 产消息Consumer 负责消费消息Broker 负责存储消息。Broker 在实际部署过程 中对应⼀台服务器每个 Broker 可以存储多个Topic的消息每个Topic的消息也可 以分⽚存储于不同的 Broker。Message Queue ⽤于存储消息的物理地址每个Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实 例构成。
2.消息⽣产者Producer
负责⽣产消息⼀般由业务系统负责⽣产消息。⼀个消息⽣产者会把业务应⽤系统 ⾥产⽣的消息发送到broker服务器。RocketMQ提供多种发送⽅式同步发送、异步 发送、顺序发送、单向发送。同步和异步⽅式均需要Broker返回确认信息单向发送不需要。⽣产者组将多个⽣产者归为⼀组。⽤于保证⽣产者的⾼可⽤⽐如在事务消息中回 查本地事务状态需要⽣产者具备⾼可⽤的特性才能完成整个任务。
3.消息消费者Consumer
负责消费消息⼀般是后台系统负责异步消费。⼀个消息消费者会从Broker服务器 拉取消息、并将其提供给应⽤程序。从⽤户应⽤的⻆度⽽⾔提供了两种消费形式 拉取式消费、推动式消费。 消费者组将多个消息消费者归为⼀组⽤于保证消费者的⾼可⽤和⾼性能。
4.主题Topic
表示⼀类消息的集合每个主题包含若⼲条消息每条消息只能属于⼀个主题 是RocketMQ进⾏消息订阅的基本单位。
5.代理服务器Broker Server
消息中转⻆⾊负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收 从⽣产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存 储消息相关的元数据包括消费者组、消费进度偏移和主题和队列消息等。
6.名字服务Name Server
名称服务充当路由消息的提供者。⽣产者或消费者能够通过名字服务查找各主题相 应的Broker IP列表。多个Namesrv实例组成集群但相互独⽴没有信息交换。
7.拉取式消费Pull Consumer
Consumer消费的⼀种类型应⽤通常主动调⽤Consumer的拉消息⽅法从Broker服 务器拉消息、主动权由应⽤控制。⼀旦获取了批量消息应⽤就会启动消费过程。
8.推动式消费Push Consumer
Consumer消费的⼀种类型该模式下Broker收到数据后会主动推送给消费端该 消费模式⼀般实时性较⾼。
9.⽣产者组Producer Group
同⼀类Producer的集合这类Producer发送同⼀类消息且发送逻辑⼀致。如果发送 的是事务消息且原始⽣产者在发送之后崩溃则Broker服务器会联系同⼀⽣产者组 的其他⽣产者实例以提交或回溯消费。
10.消费者组Consumer Group
同⼀类Consumer的集合这类Consumer通常消费同⼀类消息且消费逻辑⼀致。消 费者组使得在消息消费⽅⾯实现负载均衡和容错的⽬标变得⾮常容易。要注意的 是消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ ⽀持两种消息模 式集群消费Clustering和⼴播消费Broadcasting。
11.集群消费Clustering
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。 12.⼴播消费Broadcasting
⼴播消费模式下相同Consumer Group的每个Consumer实例都接收全量的消息。
13.消息Message
消息系统所传输信息的物理载体⽣产和消费数据的最⼩单位每条消息必须属于 ⼀个主题。RocketMQ中每个消息拥有唯⼀的Message ID且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
14.标签Tag
为消息设置的标志⽤于同⼀主题下区分不同类型的消息。来⾃同⼀业务单元的消 息可以根据不同业务⽬的在同⼀主题下设置不同标签。标签能够有效地保持代码 的清晰度和连贯性并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对 不同⼦主题的不同消费逻辑实现更好的扩展性。
九、消息存储机制 消息存储是RocketMQ中最为复杂和最为重要的⼀部分本节将分别从RocketMQ的消息存储整体架构、PageCache与Mmap内存映射以及RocketMQ中两种不同的刷盘⽅式三⽅⾯来分别展开叙述。
1.消息存储整体架构 消息存储架构图中主要有下⾯三个跟消息存储相关的⽂件构成。
CommitLog
消息主体以及元数据的存储主体存储Producer端写⼊的消息主体内容,消息内容不 是定⻓的。单个⽂件⼤⼩默认1G ⽂件名⻓度为20位左边补零剩余为起始偏 移量⽐如00000000000000000000代表了第⼀个⽂件起始偏移量为0⽂件⼤⼩ 为1G1073741824当第⼀个⽂件写满了第⼆个⽂件为00000000001073741824 起始偏移量为1073741824以此类推。消息主要是顺序写⼊⽇志⽂件当⽂件满 了写⼊下⼀个⽂件
ConsumeQueue
消息消费队列引⼊的⽬的主要是提⾼消息消费的性能由于RocketMQ是基于主题 topic的订阅模式消息消费是针对主题进⾏的如果要遍历commitlog⽂件中根据 topic检索消息是⾮常低效的。Consumer即可根据ConsumeQueue来查找待消费的消 息。其中ConsumeQueue逻辑消费队列作为消费消息的索引保存了指定 Topic下的队列消息在CommitLog中的起始物理偏移量offset消息⼤⼩size和消息 Tag的HashCode值consumequeue⽂件可以看成是基于topic的commitlog索引⽂件故consumequeue⽂件夹的组织⽅式如下topic/queue/file三层组织结构具体 存储路径为$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue⽂件采取定⻓设计每⼀个条⽬共20个字节分别为8字节的 commitlog物理偏移量、4字节的消息⻓度、8字节tag hashcode单个⽂件由30W个 条⽬组成可以像数组⼀样随机访问每⼀个条⽬每个ConsumeQueue⽂件⼤⼩约 5.72M
IndexFile
IndexFile索引⽂件提供了⼀种可以通过key或时间区间来查询消息的⽅法。 Index⽂件的存储位置是KaTeX parse error: Undefined control sequence: \store at position 6: HOME \̲s̲t̲o̲r̲e̲\index{fileName}⽂件名fileName是以创 建时的时间戳命名的固定的单个IndexFile⽂件⼤⼩约为400M⼀个IndexFile可以 保存 2000W个索引IndexFile的底层存储设计为在⽂件系统中实现HashMap结构 故rocketmq的索引⽂件其底层实现为hash索引。
在上⾯的RocketMQ的消息存储整体架构图中可以看出RocketMQ采⽤的是混合型 的存储结构即为Broker单个实例下所有的队列共⽤⼀个⽇志数据⽂件即为 CommitLog来存储。RocketMQ的混合型存储结构(多个Topic的消息实体内容都存 储于⼀个CommitLog中)针对Producer和Consumer分别采⽤了数据和索引部分相分 离的存储结构Producer发送消息⾄Broker端然后Broker端使⽤同步或者异步的 ⽅式对消息刷盘持久化保存⾄CommitLog中。只要消息被刷盘持久化⾄磁盘⽂件 CommitLog中那么Producer发送的消息就不会丢失。正因为如此Consumer也就 肯定有机会去消费这条消息。当⽆法拉取到消息后可以等下⼀次消息拉取同时 服务端也⽀持⻓轮询模式如果⼀个消息拉取请求未拉取到消息Broker允许等待 30s的时间只要这段时间内有新消息到达将直接返回给消费端。这⾥ RocketMQ的具体做法是使⽤Broker端的后台服务线程—ReputMessageService不停 地分发请求并异步构建ConsumeQueue逻辑消费队列和IndexFile索引⽂件 数据。
2.⻚缓存与内存映射
⻚缓存PageCache)是OS对⽂件的缓存⽤于加速对⽂件的读写。⼀般来说程序 对⽂件进⾏顺序读写的速度⼏乎接近于内存的读写速度主要原因就是由于OS使⽤ PageCache机制对读写访问操作进⾏了性能优化将⼀部分的内存⽤作PageCache。 对于数据的写⼊OS会先写⼊⾄Cache内随后通过异步的⽅式由pdflush内核线程 将Cache内的数据刷盘⾄物理磁盘上。对于数据的读取如果⼀次读取⽂件时出现 未命中PageCache的情况OS从物理磁盘上访问读取⽂件的同时会顺序对其他相 邻块的数据⽂件进⾏预读取。 在RocketMQ中ConsumeQueue逻辑消费队列存储的数据较少并且是顺序读取 在page cache机制的预读取作⽤下Consume Queue⽂件的读性能⼏乎接近读内 存即使在有消息堆积情况下也不会影响性能。⽽对于CommitLog消息存储的⽇志 数据⽂件来说读取消息内容时候会产⽣较多的随机访问读取严重影响性能。如 果选择合适的系统IO调度算法⽐如设置调度算法为“Deadline”此时块存储采⽤ SSD的话随机读的性能也会有所提升。
另外RocketMQ主要通过MappedByteBuffer对⽂件进⾏读写操作。其中利⽤了 NIO中的FileChannel模型将磁盘上的物理⽂件直接映射到⽤户态的内存地址中这 种Mmap的⽅式减少了传统IO将磁盘⽂件数据在操作系统内核地址空间的缓冲区和 ⽤户应⽤程序地址空间的缓冲区之间来回进⾏拷⻉的性能开销将对⽂件的操作 转化为直接对内存地址进⾏操作从⽽极⼤地提⾼了⽂件的读写效率正因为需要 使⽤内存映射机制故RocketMQ的⽂件存储都使⽤定⻓结构来存储⽅便⼀次将整 个⽂件映射⾄内存。
3.消息刷盘 同步刷盘
如上图所示只有在消息真正持久化⾄磁盘后RocketMQ的Broker端才会真正返回给 Producer端⼀个成功的ACK响应。同步刷盘对MQ消息可靠性来说是⼀种不错的保 障但是性能上会有较⼤影响⼀般适⽤于⾦融业务应⽤该模式较多。
异步刷盘
能够充分利⽤OS的PageCache的优势只要消息写⼊PageCache即可将成功的ACK返 回给Producer端。消息刷盘采⽤后台异步线程提交的⽅式进⾏降低了读写延迟 提⾼了MQ的性能和吞吐量。
⼗、集群核⼼概念
1.消息主从复制
RocketMQ官⽅提供了三种集群搭建⽅式。
2主2从异步通信⽅式
使⽤异步⽅式进⾏主从之间的数据复制吞吐量⼤但可能会丢消息。
使⽤ conf/2m-2s-async ⽂件夹内的配置⽂件做集群配置。
2主2从同步通信⽅式
使⽤同步⽅式进⾏主从之间的数据复制保证消息安全投递不会丢失但影响吞
吐量
使⽤ conf/2m-2s-sync ⽂件夹内的配置⽂件做集群配置。
2主⽆从⽅式
不存在复制消息会存在单点故障且读的性能没有前两种⽅式好。
使⽤ conf/2m-noslave ⽂件夹内的配置⽂件做集群配置。
2.负载均衡
RocketMQ中的负载均衡都在Client端完成具体来说的话主要可以分为Producer 端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。
Producer的负载均衡
Producer端在发送消息的时候会先根据Topic找到指定的TopicPublishInfo在获取 了TopicPublishInfo路由信息后RocketMQ的客户端在默认⽅式下 selectOneMessageQueue()⽅法会从TopicPublishInfo中的messageQueueList中选择⼀ 个队列MessageQueue进⾏发送消息。具体的容错策略均在MQFaultStrategy这个 类中定义。这⾥有⼀个sendLatencyFaultEnable开关变量如果开启在随机递增取 模的基础上再过滤掉not available的Broker代理。所谓的latencyFaultTolerance 是指对之前失败的按⼀定的时间做退避。例如如果上次请求的latency超过 550Lms就退避3000Lms超过1000L就退避60000L如果关闭采⽤随机递增 取模的⽅式选择⼀个队列MessageQueue来发送消息latencyFaultTolerance机 制是实现消息发送⾼可⽤的核⼼关键所在。 Consumer的负载均衡
在RocketMQ中Consumer端的两种消费模式Push/Pull都是基于拉模式来获取 消息的⽽在Push模式只是对pull模式的⼀种封装其本质实现为消息拉取线程在 从服务器拉取到⼀批消息后然后提交到消息消费线程池后⼜“⻢不停蹄”的继续 向服务器再次尝试拉取消息。如果未拉取到消息则延迟⼀下⼜继续拉取。在两种 基于拉模式的消费⽅式Push/Pull中均需要Consumer端在知道从Broker端的哪 ⼀个消息队列—队列中去获取消息。因此有必要在Consumer端来做负载均衡即 Broker端中多个MessageQueue分配给同⼀个ConsumerGroup中的哪些Consumer消费。
Consumer的负责均衡可以通过consumer的api进⾏设置
consumer.setAllocateMessageQueueStrategy(new
AllocateMessageQueueAveragelyByCircle());AllocateMessageQueueStrategy接⼝的实现类表达了不同的负载均衡策略
a.AllocateMachineRoomNearby :基于机房近侧优先级的代理分配策略。可以指定实际的分配策略。如果任何使⽤者在机房中活动则部署在同⼀台机器中的代理的消息队列应仅分配给这些使⽤者。否则这些消息队列可以与所有消费者共享因为没有活着的消费者可以垄断它们
b.AllocateMessageQueueAveragely:平均哈希队列算法
c.AllocateMessageQueueAveragelyByCircle:循环平均哈希队列算法
d.AllocateMessageQueueByConfig:不分配通过指定MessageQueue列表来消费
e.AllocateMessageQueueByMachineRoom:机房哈希队列算法如⽀付宝逻辑机房
f.AllocateMessageQueueConsistentHash:⼀致哈希队列算法带有虚拟节点的⼀致性哈希环。 注意在MessageQueue和Consumer之间⼀旦发⽣对应关系的改变就会触发rebalance进⾏重新分配。
3.消息重试
⾮⼴播模式下Consumer消费消息失败后要提供⼀种重试机制令消息再消费 ⼀次。Consumer消费消息失败通常可以认为有以下⼏种情况
由于消息本身的原因例如反序列化失败消息数据本身⽆法处理例如话费 充值当前消息的⼿机号被注销⽆法充值等。这种错误通常需要跳过这条 消息再消费其它消息⽽这条失败的消息即使⽴刻重试消费99%也不成 功所以最好提供⼀种定时重试机制即过10秒后再重试。由于依赖的下游应⽤服务不可⽤例如db连接不可⽤外系统⽹络不可达等。 遇到这种错误即使跳过当前失败的消息消费其他消息同样也会报错。这种 情况建议应⽤sleep 30s再消费下⼀条消息这样可以减轻Broker重试消息的 压⼒。
在代码层⾯如果消费者返回的是以下三种情况则消息会重试消费
consumer.registerMessageListener(newMessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println(收到的消息msg);}return null;//returnConsumeConcurrentlyStatus.RECONSUME_LATER;//抛出异常}});消费者返回null或者返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 或者抛出异常都会触发重试。
关于重试次数
RocketMQ会为每个消费组都设置⼀个Topic名称为“%RETRY%consumerGroup”的重试队列这⾥需要注意的是这个Topic的重试队列是针对消费组⽽不是针对每个Topic设置的⽤于暂时保存因为各种异常⽽导致Consumer端⽆法消费的消息。
考虑到异常恢复起来需要⼀些时间会为重试队列设置多个重试级别每个重试级 别都有与之对应的重新投递延时重试次数越多投递延时就越⼤。RocketMQ对于重 试消息的处理是先保存⾄Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中后 台定时任务按照对应的时间进⾏Delay后重新保存⾄“%RETRY%consumerGroup”的 重试队列中。 与延迟队列的设置相同消息默认会重试16次每次重试的时间间隔如
10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1重试超过指定次数的消息将会进⼊到死信队列中 %DLQ%my-consumer group1 。
4.死信队列
死信队列⽤于处理⽆法被正常消费的消息。当⼀条消息初次消费失败消息队列会 ⾃动进⾏消息重试达到最⼤重试次数后若消费依然失败则表明消费者在正常情况下⽆法正确地消费该消息此时消息队列 不会⽴刻将消息丢弃⽽是将其发送到该消费者对应的特殊队列中。
RocketMQ将这种正常情况下⽆法被消费的消息称为死信消息Dead-Letter Message将存储死信消息的特殊队列称为死信队列Dead-Letter Queue。在 RocketMQ中可以通过使⽤console控制台对死信队列中的消息进⾏重发来使得消 费者实例再次进⾏消费。
死信队列具备以下特点
RocketMQ会⾃动为需要死信队列的ConsumerGroup创建死信队列。死信队列与ConsumerGroup对应死信队列中包含该ConsumerGroup所有相关 topic的死信消息。死信队列中消息的有效期与正常消息相同默认48⼩时。若要消费死信队列中的消息需在控制台将死信队列的权限设置为6即可读可 写。
5.幂等消息
幂等性多次操作造成的结果是⼀致的。对于⾮幂等的操作幂等性如何保证
1在请求⽅式中的幂等性的体现
get多次get 结果是⼀致的post添加⾮幂等put修改幂等根据id修改delete根据id删除幂等
对于⾮幂等的请求我们在业务⾥要做幂等性保证。
2在消息队列中的幂等性体现
消息队列中很可能⼀条消息被冗余部署的多个消费者收到对于⾮幂等的操作⽐如⽤户的注册就需要做幂等性保证否则消息将会被重复消费。可以将情况概 括为以下⼏种:
⽣产者重复发送由于⽹络抖动导致⽣产者没有收到broker的ack⽽再次重发 消息实际上broker收到了多条重复的消息造成消息重复 消费者重复消费由于⽹络抖动消费者没有返回ack给broker导致消费者重试消费。
rebalance时的重复消费由于⽹络抖动在rebalance重分配时也可能出现消费者重复消费某条消息。
3如何保证幂等性消费
mysql 插⼊业务id作为主键主键是唯⼀的所以⼀次只能插⼊⼀条 使⽤redis或zk的分布式锁主流的⽅案
⼗⼀、RocketMQ最佳实践
1.保证消息顺序消费
1为什么要保证消息有序
⽐如有这么⼀个物联⽹的应⽤场景IOT中的设备在初始化时需要按顺序接收这样 的消息
设置设备名称设置设备的⽹络重启设备使配置⽣效
如果这个顺序颠倒了可能就没有办法让设备的配置⽣效因为只有重启设备才能 让配置⽣效但重启的消息却在设置设备消息之前被消费。
2如何保证消息顺序消费
全局有序消费的所有消息都严格按照发送消息的顺序进⾏消费局部有序消费的部分消息按照发送消息的顺序进⾏消费 2.快速处理积压消息
在rocketmq中如果消费者消费速度过慢⽽⽣产者⽣产消息的速度⼜远超于消费 者消费消息的速度那么就会造成⼤量消息积压在mq中。
1如何查看消息积压的情况
在console控制台中可以查看 2如何解决消息积压
在这个消费者中使⽤多线程充分利⽤机器的性能进⾏消费消息。通过业务的架构设计提升业务层⾯消费的性能。创建⼀个消费者该消费者在RocketMQ上另建⼀个主题该消费者将poll下来 的消息不进⾏消费直接转发到新建的主题上。新建的主题配上多个 MessageQueue多个MessageQueue再配上多个消费者。此时新的主题的多个 分区的多个消费者就开始⼀起消费了。 3.保证消息可靠性投递 保证消息可靠性投递⽬的是消息不丢失可以顺利抵达消费者并被消费。要想实 现可靠性投递需要完成以下⼏个部分。
1⽣产者发送事务消息
参考第五章事务消息章节的内容
2broker集群使⽤Dledger⾼可⽤集群
dledger集群的数据同步由两阶段完成
第⼀阶段同步消息到follower消息状态是uncommitted。follower在收到消息 以后返回⼀个ack给leaderleader⾃⼰也会返回ack给⾃⼰。leader在收到集群中的半数以上的ack后开始进⼊到第⼆阶段。第⼆阶段leader发送committed命令集群中的所有的broker把消息写⼊到⽇ 志⽂件中此时该消息才表示接收完毕。
3保证消费者的同步消费
消费者使⽤同步的⽅式在消费完后返回ack。
4使⽤基于缓存中间件的MQ降级⽅案 当MQ整个服务不可⽤时为了防⽌服务雪崩消息可以暂存于缓存中间件中⽐如redis。待MQ恢复后将redis中的数据重新刷进MQ中。