找企业网站制作,建筑企业网站模板,整站多关键词优化,购物网页html代码消息中间件在现代分布式系统中起着关键作用#xff0c;它们提供了一种可靠且高效的方法来进行异步通信和解耦。在这篇博客中#xff0c;我们将重点介绍 RabbitMQ#xff0c;一个广泛使用的开源消息中间件。我们将深入探讨 RabbitMQ 的特性、工作原理以及如何在应用程序中使用… 消息中间件在现代分布式系统中起着关键作用它们提供了一种可靠且高效的方法来进行异步通信和解耦。在这篇博客中我们将重点介绍 RabbitMQ一个广泛使用的开源消息中间件。我们将深入探讨 RabbitMQ 的特性、工作原理以及如何在应用程序中使用它来实现可靠的消息传递。 一、RabbitMQ 简介
RabbitMQ 是基于 AMQP高级消息队列协议的开源消息中间件。它提供了一个可靠的、灵活的、可扩展的消息传递机制广泛应用于各行各业。RabbitMQ 的核心思想是生产者将消息发送到交换机交换机根据路由规则将消息传递给队列然后消费者从队列中获取并处理消息。
二、相关概念
RabbitMQ 是一个开源的消息中间件它是由 Erlang 语言编写的并且实现了高级消息队列协议AMQP。作为一种可靠、灵活和可扩展的消息传递系统RabbitMQ 提供了在应用程序之间传输数据的可靠机制。
下面是 RabbitMQ 的一些关键特性和概念的详解 消息队列RabbitMQ 使用消息队列来存储和传递消息。消息队列是一种先进先出FIFO的数据结构它将消息暂时存储在其中直到消费者准备好接收并处理它们。 生产者和消费者消息的发送者称为生产者而消息的接收者称为消费者。生产者将消息发送到队列中而消费者从队列中获取消息并进行处理。 队列队列是 RabbitMQ 的核心部分它是消息的存储和传递载体。生产者将消息发送到队列中而消费者则从队列中获取消息。队列可以持久化这意味着即使 RabbitMQ 服务器关闭消息也不会丢失。 交换机Exchange交换机是消息的路由器它将消息从生产者发送到队列。它根据特定的规则将消息路由到一个或多个队列。RabbitMQ 提供了不同类型的交换机包括直连交换机、主题交换机、广播交换机等。 绑定Binding绑定将交换机和队列关联起来以定义消息在交换机和队列之间的路由规则。绑定规定了消息应该如何从交换机路由到队列。 路由键Routing Key路由键是生产者在发送消息时与消息一起指定的属性。交换机根据路由键来确定将消息路由到哪个队列。 持久化当队列或消息被标记为持久化时它们会被保存到磁盘上以防止在 RabbitMQ 重启后丢失数据。 发布-订阅模式RabbitMQ 支持发布-订阅模式其中一个生产者发送消息到交换机交换机将消息广播给所有与其绑定的队列然后每个队列的消费者都可以接收并处理消息。 ACK 机制消费者可以使用 ACK 机制告知 RabbitMQ 已经成功接收和处理了消息。只有在消费者明确确认之后RabbitMQ 才会将消息从队列中删除。 消息确认和持久化RabbitMQ 允许生产者在发送消息时请求确认。如果消息无法成功路由到队列或者在路由过程中发生错误RabbitMQ 将通知生产者。此外消息和队列的持久化可以确保即使在 RabbitMQ 重启后也不会丢失数据。
总之RabbitMQ 是一个可靠和灵活的消息中间件它以消息队列作为核心使用交换机、队列、绑定等概念来进行消息的路由和传递。它提供了高度可靠的消息传递机制和丰富的特性广泛用于分布式系统、微服务架构、任务队列等场景中帮助应用程序实现解耦、异步通信和可靠性保证。
三、RabbitMQ 的工作原理
RabbitMQ 的工作原理可以简单地概括为以下几个步骤
1、生产者发送消息到交换机
生产者通过连接到 RabbitMQ并将消息发送到预定义的交换机。消息可以包含任何结构化数据如 JSON、XML 等格式。
String message Hello, RabbitMQ!;
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());2、交换机将消息路由到队列
交换机根据预定义的路由规则将消息路由到一个或多个绑定到它上面的队列。路由规则可以根据完全匹配、模式匹配等方式进行。
channel.queueBind(queueName, exchangeName, routingKey);3、消费者从队列中获取消息
消费者通过连接到 RabbitMQ并订阅感兴趣的队列。一旦有消息到达队列RabbitMQ 将立即将该消息推送给订阅的消费者进行处理。
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message new String(body, UTF-8);System.out.println(Received message: message);}
});4、消费者处理消息并发送确认
消费者获取到消息后可以进行相应的业务逻辑处理。一旦消息被成功处理消费者将发送确认给 RabbitMQ告知消息已经被消费RabbitMQ 可以安全删除该消息。
生产者将消息发送到名为 my_exchange 的交换机并通过路由键 my_routing_key 将消息路由到名为 my_queue 的队列。消费者订阅了 my_queue 队列并在收到消息时调用回调函数进行处理。
四、RabbitMQ 的特性
RabbitMQ 提供了许多强大的特性使其成为一个广泛使用的消息中间件
持久化RabbitMQ 可以将消息和队列持久化到磁盘即使在服务器重启后也不会丢失消息。灵活的路由规则通过使用不同的交换机类型和路由键可以实现精确的消息路由策略。可靠性和可恢复性RabbitMQ 提供了多种保证消息可靠传递的机制如消息确认acknowledgement、事务、发布者确认等。可扩展性RabbitMQ 支持分布式部署和集群模式可以实现高吞吐量和高可用性。多语言客户端RabbitMQ 提供了多种官方支持的客户端库如 Java、Python、Ruby 等方便开发者在不同的语言环境下使用。 五、RabbitMQ 在实际应用中的应用场景
参考文章
【RabbitMQ】什么是RabbitMQRabbitMQ有什么用应用场景有那些_路遥叶子的博客-CSDN博客
RabbitMQ 在各种场景中都有广泛的应用包括但不限于
异步任务处理将需要耗时较长的任务发送到 RabbitMQ并由消费者异步执行以提高系统的响应性能和可伸缩性。事件驱动架构通过使用 RabbitMQ 来实现事件的发布与订阅不同的组件可以通过订阅感兴趣的事件来解耦。应用解耦与流量控制通过引入消息中间件不同的应用程序可以进行解耦并实现流量控制、服务降级等机制。日志收集、分析使用 RabbitMQ 将分布式系统中的日志消息发送到中央日志服务器进行集中管理和分析。 5.1 服务间解耦
用户订单库存处理。【服务间解耦】 使用MQ前
系统正常时用户下单订单系统调用库存系统进行删减操作操作成功将成返回消息提醒下单成功。系统异常时库存系统将无法访问导致订单删减操作无法执行最终导致下单失败。
使用MQ后
订单系统和库存系统之间不在互相影响独立运行达到了应用解耦的目的。订单系统只需要将下单消息写入MQ,就可以直接执行下一步操作。这时即使库存系统出现异常也不会影响订单系统的操作且下单的库存删减记录将会被永久保存到MQ中直到库存系统恢复正常从MQ中订阅下单消息进行消费成功为止。
使用MQ前
使用MQ后 5.2 实现异步通信
用户注册发送手机短信邮件。【实现异步通信】
使用MQ前
整个操作流程全部在主线程完成。点击用户注册 --》 入库添加用户 --》发送邮件 --》发送短信。每一步都需要等待上一步完成后才能执行。且每一步操作的响应时间不固定如果请求过多会导致主线程请求耗时很长响应慢甚至会导致死机的情况出现严重影响了用户的体验。
使用MQ后
主线程只需要处理耗时较低的入库操作然后把需要处理的消息写进MQ消息队列中然后由不同的独立的邮件系统和发短信系统同时订阅消息队列中的消息进行消费。这样通过消息队列作为一个中间人去保存和传递消息不仅仅耗时低消耗的资源也很少且单个服务器能够承受的并发请求将更多。 5.3 流量削峰
商品秒杀和抢购。【流量削峰】
流量削峰是消息队列中常用的场景 一般在秒杀或团购活动中使用广泛。
使用MQ前对于秒杀、抢购活动用户访问所产生的流量会很大甚至会在同一时间段出现上万上亿条请求这股瞬间的流量暴涨我们的应用系统配置是无法承受的会导致系统直接崩溃死机。
例如A系统平时每秒请求100个系统稳定运行 但是晚上8点有秒杀活动 每秒并发增至1万条 系统最大处理每秒1000条 于是导致系统崩溃。
使用MQ后我们在大量用户进行秒杀请求时将那个巨大的流量请求拒在系统业务处理的上层并将其转移至MQ中而不是直接涌入我们的接口。在这里MQ消息队列起到了缓存作用。
例如100万用户在高峰期每秒请求5000个将这5000个请求写入MQ系统每秒只能处理2000请求因为MySQL只能处理2000个请求 ; 系统每秒拉取2000个请求 不超过自己的处理能力即可。
使用MQ前 使用MQ后 5.4 其他应用场景
1、订单处理系统
在一个电子商务平台中可以使用 RabbitMQ 来处理订单。当用户下单时订单信息被发布到 RabbitMQ 的交换机中然后相关的消费者从队列中获取订单消息并进行处理如验证订单、库存管理、支付等。
2、日志收集与分发
假设有多个应用程序生成日志并希望将它们集中处理和存储。每个应用程序可以将日志消息发布到一个名为 log_exchange 的交换机中然后有不同的消费者订阅该交换机并将日志消息写入数据库或发送到日志分析系统。
3、实时数据传输
如果有一个实时监控系统需要将传感器数据实时传输到监控平台进行处理和可视化展示。传感器将数据发布到 RabbitMQ 的交换机中监控平台的消费者订阅交换机并处理数据从而实现实时监控和报警功能。
4、异步任务处理
假设有一个应用程序需要处理大量耗时的任务如图像处理、PDF 转换等。应用程序将任务发布到 RabbitMQ 的队列中然后有多个工作节点作为消费者从队列中获取任务并进行处理以实现任务的并行处理和减轻主应用程序的压力。
5、消息通知系统 假设在一个订阅系统中用户可以订阅不同的主题或事件。当有新的消息发布时RabbitMQ 会将消息路由到对应的队列然后订阅该队列的用户会收到相应的通知。这种方式可以用于实现邮件订阅、新闻推送等功能。
6、微服务架构
在一个微服务架构中不同的服务之间可能需要进行消息传递和协作。使用 RabbitMQ 可以实现服务之间的解耦和异步通信每个服务通过交换机和队列收发消息从而实现微服务之间的松耦合。
六、 RabbitMQ 安装
官网地址
RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ
6.1 brew安装 brew update #更新一下homebrew
brew install rabbitmq #安装rabbitMQ 安装结果 Caveats rabbitmq Management Plugin enabled by default at http://localhost:15672 To restart rabbitmq after an upgrade: brew services restart rabbitmq Or, if you dont want/need a background service you can just run: CONF_ENV_FILE/opt/homebrew/etc/rabbitmq/rabbitmq-env.conf /opt/homebrew/opt/rabbitmq/sbin/rabbitmq-server rabbitmq 的安装路径
/opt/homebrew/opt/rabbitmq 6.2 、配置环境变量
1、 vi ~/.bash_profile 2、 export RABBIT_HOME${PATH}:/opt/homebrew/opt/rabbitmq
export PATH${PATH}:$RABBIT_HOME/sbin 3、 source ~/.bash_profile 6.3 、启动RabbitMQ 1、前台运行 rabbitmq-server
2、后台运行 rabbitmq-server -detached
3、查看运行状态 rabbitmqctl status
4、开始 Web插件 rabbitmq-plugins enable rabbitmq_management
5、重启 rabbitmq-server restart
5、关闭 rabbitmqctl stop
6.4、访问MQ
1、浏览器地址 http://localhost:15672/ 默认用户名和密码为guest 添加用户 rabbitmqctl add_user miaojiang 123
设置用户为管理员 rabbitmqctl set_user_tags miaojiang administrator
配置用户可以远程登录 rabbitmqctl set_permissions -p / miaojaing .* .* .*
查看新添加的账户
rabbitmqctl list_users
查看用于的权限 rabbitmqctl list_permissions -p /
七、Spring Boot 项目应用RabbitMQ
7.1、添加Maven依赖
在你的项目的pom.xml文件中添加RabbitMQ客户端库的依赖
!--AMQP依赖包含RabbitMQ--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency7.2、配置RabbitMQ连接
在Spring Boot的配置文件(application.properties 或 application.yml)中添加RabbitMQ的连接信息。
application.properties:
spring.rabbitmq.hostlocalhost
spring.rabbitmq.port5672
spring.rabbitmq.usernameguest
spring.rabbitmq.passwordguest在application.yml配置mq的参数
spring:rabbitmq:#设置RabbitMQ的IP地址host: localhost#设置rabbitmq服务器用户名username: guest#设置rabbitmq服务器密码password: guest#设置rabbitmq服务器连接端口port: 56727.3 创建交换机
自定义交换机名称
创建名为“myExchange”的交换机
package com.example.usermanagement.mq;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitMQConfig {/*使用 Configuration 注解创建一个配置类并通过 Bean 注解创建了一个名为 declareExchange 的方法用于声明创建交换机。请根据实际情况修改交换机名称、类型和持久化设置。*/public static final String EXCHANGE_NAME myExchange;Beanpublic Exchange declareExchange() {return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();}
} 7.4 创建消息发送者
创建消息发送者创建一个消息发送者的类用于发送消息到RabbitMQ
package com.example.usermanagement.mq;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;Component
public class MessageSender{private final AmqpTemplate amqpTemplate;private final String exchangeName myExchange; // 自定义交换机名称Autowiredpublic MessageSender(AmqpTemplate amqpTemplate) {this.amqpTemplate amqpTemplate;}public void sendMessage(Object message) {amqpTemplate.convertAndSend(exchangeName, , message); // 发送消息到默认交换机和空路由键}
}
注意 sendMessage 类型使用的是Object类型 7.5 RabbitMQ管理后台添加对列
步骤 打开浏览器输入RabbitMQ管理后台的URL。默认情况下该URL为http://localhost:15672/。请确保你的RabbitMQ服务器正在运行并且端口号正确。 输入用户名和密码以登录到RabbitMQ管理后台。默认情况下用户名为guest密码也为guest。如果你修改过用户名和密码请使用你的自定义凭据进行登录。 成功登录后你将看到RabbitMQ管理后台的主界面。在顶部导航栏中选择Queues选项卡。 在Queues页面上你将看到已经存在的队列列表。如果你想要创建一个新队列请点击Add a new queue按钮。 在添加队列的页面上填写以下信息 Name队列的名称。为队列提供一个唯一的名称。如myQueue)Durability队列的持久性。选择是或否以指定队列是否应该在RabbitMQ服务重启后保留。Auto delete队列的自动删除。选择是或否以指定当最后一个消费者断开连接后是否删除队列。Arguments队列的其他参数。这是可选的你可以为队列设置一些特定的参数。 填写完队列信息后点击Add queue按钮以创建队列。 创建成功后你将在Queues页面上看到新添加的队列。你可以在该页面上查看队列的详细信息包括消息数量、消费者数量等。
http://localhost:15672/#/queues
只需要添加队列名称就可以 7.6 调用生产者
1、注入MessageSender实例
Autowired
private MessageSender messageSender;
2、在需要发送消息的地方调用messageSender.sendMessage方法。根据你的业务逻辑你可以在合适的位置调用该方法。例如在订单创建成功后你可以添加以下代码
messageSender.sendMessage(订单已创建 order.getOrderId());
7.7 创建消息接收者
创建消息接收者创建一个消息接收者的类用于处理接收到的RabbitMQ消息。
这里就直接写处理RabbitMQ消息的逻辑。
package com.example.usermanagement.mq;import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class MessageReceiver {RabbitListener(bindings QueueBinding(value Queue(your_queue_name),exchange Exchange(value RabbitMQConfig.EXCHANGE_NAME)
// key your_routing_key))public void receiveMessage(Object message) {System.out.println(Received message: message);// 处理消息逻辑}
}注意 sendMessage 类型使用的是Object类型
your_queue_name 替换为你要监听的队列的名称,如myQueue)
将 your_routing_key 替换为适当的路由键如果使用 八、MQ界面介绍
Overview概览提供了一份概览报告包括服务器和集群信息、节点状态、队列和连接摘要、以及最近的相关日志条目。 Connections连接显示当前连接到 RabbitMQ 服务器的客户端应用程序包括连接的名称、协议、虚拟主机等信息。 Channels通道显示每个连接上的活动通道以及与每个通道相关的一些指标如消费者数量、未确认的消息数量等。 Exchanges交换机列出了所有的交换机包括名称、类型、绑定的队列和绑定的数量。 Queues队列显示了所有的队列包括名称、消息数量、消费者数量等信息。您还可以通过队列进行一些操作如创建、删除、清空等。 Admin管理员提供了一些高级管理功能如用户和权限管理、虚拟主机管理、插件管理等。
8.1 Overview概览
Overview概览提供了一份概览报告包括服务器和集群信息、节点状态、队列和连接摘要、以及最近的相关日志条目。
8.2 Connections连接
Connections连接显示当前连接到 RabbitMQ 服务器的客户端应用程序包括连接的名称、协议、虚拟主机等信息。
8.3 Channels通道 Channels通道显示每个连接上的活动通道以及与每个通道相关的一些指标如消费者数量、未确认的消息数量等。
8.4 Exchanges交换机 Exchanges交换机列出了所有的交换机包括名称、类型、绑定的队列和绑定的数量。
8.5 Queues队列 Queues队列显示了所有的队列包括名称、消息数量、消费者数量等信息。您还可以通过队列进行一些操作如创建、删除、清空等。
8.6 Admin管理员 Admin管理员提供了一些高级管理功能如用户和权限管理、虚拟主机管理、插件管理等