山西住房建设部网站,西部数码网站管理助手 301,沈阳建设工程交易中心,张家口市网站建设初识MQ
1.1.同步和异步通讯 微服务间通讯有同步和异步两种⽅式#xff1a; 同步通讯#xff1a;就像打电话#xff0c;需要实时响应。 异步通讯#xff1a;就像发邮件#xff0c;不需要⻢上回复。 两种⽅式各有优劣#xff0c;打电话可以⽴即得到响应#xff0c;但…初识MQ
1.1.同步和异步通讯 微服务间通讯有同步和异步两种⽅式 同步通讯就像打电话需要实时响应。 异步通讯就像发邮件不需要⻢上回复。 两种⽅式各有优劣打电话可以⽴即得到响应但是你却不能跟多个⼈同时通话。发送邮件可以同 时与多个⼈收发邮件但是往往响应会有延迟。 1.1.1.同步通讯 Feign调⽤就属于同步⽅式虽然调⽤可以实时得到结果但存在下⾯的问题 总结 同步调⽤的优点 时效性较强可以⽴即得到结果 同步调⽤的问题 耦合度⾼ 性能和吞吐能⼒下降 有额外的资源消耗 有级联失败问题由于⼀个故障导致了连锁反应使得系统中的其他组件或节点也相继失败 1.1.2.异步通讯 异步调⽤则可以避免上述问题 我们以购买商品为例⽤户⽀付后需要调⽤订单服务完成订单状态修改调⽤物流服务从仓库分配响应的库存并准备发货。 在事件模式中⽀付服务是事件发布者publisher在⽀付完成后只需要发布⼀个⽀付成功的事件 event事件中带上订单id。 订单服务和物流服务是事件订阅者Consumer订阅⽀付成功的事件监听到事件后完成⾃⼰业务即可。 为了解除事件发布者与订阅者之间的耦合两者并不是直接通信⽽是有⼀个中间⼈Broker。发布者发布事件到Broker不关⼼谁来订阅事件。订阅者从Broker订阅事件不关⼼谁发来的消息 Broker 是⼀个像数据总线⼀样的东⻄所有的服务要接收数据和发送数据都发到这个总线上这 个总线就像协议⼀样让服务间的通讯变得标准和可控。 好处 吞吐量提升⽆需等待订阅者处理完成响应更快速 故障隔离服务没有直接调⽤不存在级联失败问题 调⽤间没有阻塞不会造成⽆效的资源占⽤ 耦合度极低每个服务都可以灵活插拔可替换 流量削峰不管发布事件的流量波动多⼤都由Broker接收订阅者可以按照⾃⼰的速度去处理 事件 缺点 架构复杂了业务没有明显的流程线不好管理 需要依赖于Broker的可靠、安全、性能 好在现在开源软件或云平台上 Broker 的软件是⾮常成熟的⽐较常⻅的⼀种就是我们今天要学习 的MQ技术。 1.2.技术对⽐ MQ中⽂是消息队列MessageQueue字⾯来看就是存放消息的队列。也就是事件驱动架构 中的Broker。 ⼏种常⻅MQ的对⽐ 追求可⽤性Kafka、 RocketMQ 、RabbitMQ 追求可靠性RabbitMQ、RocketMQ 追求吞吐能⼒RocketMQ、Kafka 追求消息低延迟RabbitMQ、Kafka 不同的消息队列系统在不同场景下有各⾃的优势和适⽤性。以下是各个消息队列系统在不同场合下的最佳选择 Kafka
最佳场合⼤规模数据处理、实时⽇志收集和分析、流式处理。 优势⾼吞吐量、低延迟、⽔平扩展能⼒强、⻓期消息存储适合构建⼤规模的实时数据流处理平台如实时⽇志收集和分析、事件流处理等。 RabbitMQ 最佳场合传统的企业级应⽤、轻量级的消息传递场景。 优势简单易⽤、⽀持多种消息协议、适合点对点和发布/订阅模式对于传统的企业应⽤和中⼩规模的消息传递需求是⼀种可靠的选择 ActiveMQ 最佳场合中⼩规模的企业应⽤、Java⽣态系统中的集成需求。 优势Java开发环境友好、⽀持多种消息协议适合与Java⽣态系统的其他组件集成如Spring框架等。 RocketMQ 最佳场合⼤规模的分布式系统、互联⽹应⽤、⾦融领域的消息处理。 优势⾼吞吐量、低延迟、丰富的消息存储模式适⽤于处理⼤规模的消息传递场景特别是在互联⽹ 和⾦融领域。 综合考虑以上因素可以做如下简单总结 如果需要处理⼤规模的实时数据流、⽇志收集和分析等⾼吞吐量场景⾸选Kafka。 如果对于消息传递的简单性和易⽤性有较⾼要求适合中⼩规模的企业应⽤和轻量级消息传递需求可以选择RabbitMQ或ActiveMQ。 如果在⼤规模的分布式系统、互联⽹应⽤或⾦融领域需要处理消息传递RocketMQ是⼀个较好的选择。 2.RocketMQ简介 官⽹ http://rocketmq.apache.org/ RocketMQ是阿⾥巴巴2016年MQ中间件使⽤Java语⾔开发RocketMQ 是⼀款开源的分布式消息系 统基于⾼可⽤分布式集群技术提供低延时的、⾼可靠的消息发布与订阅服务。同时⼴泛应⽤于多个领域包括异步通信解耦、企业解决⽅案、⾦融⽀付、电信、电⼦商务、快递物流、⼴告营销、社交、即时通信、移动应⽤、⼿游、视频、物联⽹、⻋联⽹等。 RocketMQ的设计⽬标是⽀持⼤规模消息处理具有⾼并发、⾼可⽤和容错能⼒。它在多个⽅⾯提供了强⼤的功能和特性 分布式架构RocketMQ采⽤分布式架构⽀持在多个节点之间进⾏消息的发送和接收实现了⽔平扩展能⼒。 ⾼吞吐量RocketMQ可以在⼤规模并发场景下实现⾼吞吐量的消息处理适⽤于⾼并发的业务场景。 低延迟RocketMQ具有较低的消息传递延迟适⽤于需要实时性的应⽤场景。 消息可靠性RocketMQ提供了多种消息存储模式可以确保消息的可靠传递包括同步刷盘和异步刷盘等⽅式。 消息顺序性RocketMQ⽀持消息的顺序传递可以确保同⼀消息队列中的消息按照发送顺序被消费。 ⽀持多种消息模式RocketMQ⽀持发布/订阅模式和点对点模式可以根据业务需求选择合适的消息模式。 灵活的部署⽅式RocketMQ⽀持多种部署⽅式可以在单机上运⾏也可以搭建集群部署。 丰富的监控和管理⼯具RocketMQ提供了丰富的监控和管理⼯具⽅便管理员对消息队列进⾏监控和管理。 核⼼概念 Producer消息的发送者⽣产者举例发件⼈。 Consumer消息接收者消费者举例收件⼈。 Broker消息队列的中间服务器负责存储消息并将消息传递给消费者举例快递。 NameServer可以理解为是⼀个注册中⼼主要是⽤来保存topic路由信息管理Broker。在 NameServer的集群中NameServer与NameServer之间是没有任何通信的举例各个快递公司的管理机构相当于broker的注册中⼼保留了broker的信息。 Queue队列消息存放的位置⼀个Broker中可以有多个队列。 Topic消息的逻辑分类⽣产者发送消息到指定的Topic消费者从指定的Topic订阅消息。⼀个Topic可以有多个Producer和多个Consumer。 ProducerGroup⽣产者组 。 ConsumerGroup消费者组多个消费者组可以同时消费⼀个主题的消息。 ⼯作流程 Broker启动的时候会往每台NameServer因为NameServer之间不通信所以每台都得注册注册⾃⼰的信息这些信息包括⾃⼰的ip和端⼝号⾃⼰这台Broker有哪些topic等信息。 Producer在启动之后会跟会NameServer建⽴连接定期从NameServer中获取Broker的信息当发送消息的时候会根据消息需要发送到哪个topic去找对应的Broker地址如果有的话就向这台Broker发送请求没有找到的话就看根据是否允许⾃动创建topic来决定是否发送消息。 Broker在接收到Producer的消息之后会将消息存起来持久化如果有从节点的话也会主动同步给从节点实现数据的备份 Consumer启动之后也会跟会NameServer建⽴连接定期从NameServer中获取Broker和对应topic的信息然后根据⾃⼰需要订阅的topic信息找到对应的Broker的地址然后跟Broker建⽴连接获取消息进⾏消费 3.RocketMQ安装 本⽂档所涉及的是单机版的RocketMQ安装教程能够满⾜基本的学习使⽤属于⼊⻔级的教程 如果想要搭集群部署可以参考其他资料进⾏配置即可 进⼊[RocketMQ官⽹下载](下载 | RocketMQ (apache.org)) 1、选择Binary 下载 2、将压缩包解压⾄⾃定路径 3、配置系统中的环境变量 4.启动RocketMQ
4.启动RocketMQ 若出现如上图所示的命令框说明启动成功保留窗⼝切勿关闭 继续启动broker 与上述同样的路径下呼出cmd执⾏如下命令 Start-Process mqbroker.cmd -ArgumentList -n 127.0.0.1:9876, autoCreateTopicEnabletrue 5.配置可视化⻚⾯ 下载可视化插件源码 github下载地址https://github.com/apache/rocketmq-dashboard 复制下载链接后使⽤git下载 可⾃建⽂件夹进⼊后使⽤git bash下载 git clone https://github.com/apache/rocketmq-dashboard.git 下载完成后进⼊ application.yml 中查看配置 yarn-v1.22.10.tar.gz 下载超时了 Downloading https://github.com/yarnpkg/yarn/releases/download/v1.22.10/yarn-v1.22.10.tar.gz to D:\Maven\mvn_resp\com\github\eirslett\yarn\1.22.10\yarn-1.22.10.tar.gz [INFO] No proxies configured [INFO] No proxy was configured, downloading directly 这⾥直接去github拉去就⾏存⼊你的maven仓库 在该⽬录下打开cmd输⼊指令请保证已经运⾏NameServer和broker java -jar rocketmq-dashboard-2.0.1-SNAPSHOT.jar 6.集成springboot SpringBoot集成RocketMQ ⾸先在pom.xml中添加RocketMQ的依赖具体如下所示 dependency groupId org . apache . rocketmq /groupId artifactId rocketmq - spring - boot - starter /artifactId version 2.1 . 1 /version /dependency 然后在application.yml中添加RocketMQ的基本配置 # RocketMq rocketmq: name - server : 127.0.0 . 1 : 9876 producer: group: producer - group consumer: group: consumer - group 然后在application.yml中添加RocketMQ的基本配置 # RocketMq rocketmq: name - server : 127.0.0 . 1 : 9876 producer: group: producer - group consumer: group: consumer - group 创建消息⽣产者
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
Service
public class RocketMQProducer {Autowiredprivate RocketMQTemplate rocketMQTemplate;private final String topic demo-topic;// 1.同步发送消息// 同步发送是指发送⽅发送⼀条消息后会等待服务器返回确认信息后再进⾏后续操作。这种⽅式适⽤于需要可靠性保证的场景。public void sendSyncMessage(String message){rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build());System.out.printf(同步发送结果: %s\n, message);}// 2.异步发送消息// 异步发送是指发送⽅发送消息后不等待服务器返回确认信息⽽是通过回调接⼝处理返回结果。这种⽅式适⽤于对响应时间要求较⾼的场景。public void sendAsyncMessage(String message){rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {System.out.printf(异步发送成功: %s\n, sendResult);}Overridepublic void onException(Throwable throwable) {System.out.printf(异步发送失败: %s\n, throwable.getMessage());}});}// 3.单向发送消息// 单向发送是指发送⽅只负责发送消息不关⼼服务器的响应。该⽅式适⽤于对可靠性要求不⾼的场景如⽇志收集。public void sendOneWayMessage(String message){rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(message).build());System.out.println(单向消息发送成功);}
} 创建消息消费者
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
Service
RocketMQMessageListener(topic demo-topic, consumerGroup consumer-g
roup, messageModel MessageModel.CLUSTERING)
public class RocketMQConsumer implements RocketMQListenerString {Overridepublic void onMessage(String s) {System.out.printf(收到消息: %s\n, s);}
}