玉溪市网站建设,网站建设朱宁,内容展示类网站,南京广告公司户外广告一、初见MQ
#xff08;一#xff09;什么是MQ#xff1f;
MQ#xff08;MessageQueue#xff09;#xff0c;意思是消息队列#xff0c;也就是事件驱动架构中的Broker。
#xff08;二#xff09;同步调用
1、概念#xff1a; 同步调用是指#xff0c;某一服务…一、初见MQ
一什么是MQ
MQMessageQueue意思是消息队列也就是事件驱动架构中的Broker。
二同步调用
1、概念 同步调用是指某一服务需要多个服务共同参与但多个服务之间有一定的执行顺序当每一个服务都需要等待前面一个服务完成才能继续执行。
2、存在的问题
耦合度高 新需求需要改动原代码性能下降 调用者需要等待服务提供者相应如果调用链过长则响应时间等于每次调用的时间之和。资源浪费 调用链的每个服务在等待响应过程中不会释放请求资源高并发场景下会浪费系统资源。级联失败 若服务提供者出现宕机所有调用者都会因故障而导致整个服务集群故障。
三异步调用
1、实现模式 异步调用常见实现的就是事件驱动模式。
2、事件驱动的优势
服务解耦 只需要将请求交付给事件管理器进行管理即可完成服务。性能提升 与客户交互的服务短时间就能完成并不需要等待后续服务完成。服务弱依赖 其它服务宕机不影响服务集群的使用流量缓冲 事件管理器通过任务队列的方式使得订阅的服务按照自身速度进行执行。
3、事件驱动的缺点
高度依赖Broker的可靠性、安全性、吞吐能力架构复杂时业务没有明显的流程线不便于跟踪管理
四MQ常见框架
RabbitMQ中小企业ActiveMQRocketMQ大型企业Kafka公司/社区RabbitApacheAlibabaApache开发语言ErlangJavaJavaJava协议支持AMQPXMPPSMTPSTOMPOpenWireSTOMPRESTXMPPAMQP自定义协议自定义协议可用性高一般高高单机吞吐量一般差高极高消息延迟微妙级毫秒级毫秒级毫秒以内消息可靠高一般高一般
二、使用MQ
一RabbitMQ概述
RqbbitMQ是基于Erlang语言开发的开源消息通讯中间件官方地址https://rabbitmq.com/
二安装MQ
docker pull rabbitmq:3-management三运行RabbitMQ
#配置 MQ的用户名和密码容器名和主机名端口镜像名 注意15672端口是MQ的控制台访问端口5672是对外暴露的消息通信端口
docker run -e RABBITMQ_DEFAULT_USERxxx -e RABBITMQ_DEFAULT_PASSxxxx --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management访问MQ的控制台 4RabbitMQ的整体结构 5RabbitMQ中的几个概念
channel 操作MQ的工具exchange 路由消息到队列queue 缓存消息Virtual Host 虚拟主机是对queueexchange等资源进行逻辑分组
6常见的MQ模型
基本消息队列BasicQueue Publisher —1:1— Queue —1:1— Customer工作消息队列WorkQueue Publisher —1:1— Queue —1:n— Customer发布/订阅Publish、Subscribe 根据交换机类型又有三种模型 Fanout Exchange 广播Publisher—1:1—Exchange—1:n—Queue—1:1—CustomerDirect Exchange 路由Publisher—1:1—Exchange—1:n—Queue—1:1—CustomerTopic Exchange 主题 RPC发布者确认
第一种基本消息队列的基本使用
包含三种角色publisher、queue、consumer
publisher 消费发布者将消息发布到队列queuequeue 消息队列负责接受并缓存消息consumer 订阅队列处理队列中的消息
收发消息的过程 获取连接 》 建立通信通道 》 创建消息队列 》 收发消息 》 释放资源
1、publisher和consumer引入依赖 dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactId
/dependency2、Publisher创建发送消息通道
SpringBootTest
class PublisherApplicationTests {Testvoid testSendMessage() throws IOException, TimeoutException {
// 1、建立连接ConnectionFactory connectionFactory new ConnectionFactory();
// 2、设置连接参数connectionFactory.setHost(192.168.92.131);connectionFactory.setPort(5672);connectionFactory.setVirtualHost(/);connectionFactory.setUsername(root);connectionFactory.setPassword(root);
// 3、建立连接Connection connection connectionFactory.newConnection();
// 4、建立通信通道ChannelChannel channel connection.createChannel();
// 5、创建队列String queueName simple.queue;channel.queueDeclare(queueName,false,false,false,null);
// 6、发送信息String message hellorabbitmq!;channel.basicPublish(,queueName,null,message.getBytes(StandardCharsets.UTF_8));System.out.println(发送消息成功【message】);
// 7、关闭通道和连接channel.close();connection.close();}
}2、Consumer创建订阅通道
class ConsumerApplicationTests {public static void main(String[] args) throws IOException, TimeoutException {// 1、建立连接ConnectionFactory connectionFactory new ConnectionFactory();// 2、设置连接参数connectionFactory.setHost(192.168.92.131);connectionFactory.setPort(5672);connectionFactory.setVirtualHost(/);connectionFactory.setUsername(root);connectionFactory.setPassword(root);
// 3、建立连接Connection connection connectionFactory.newConnection();
// 4、建立通信通道ChannelChannel channel connection.createChannel();
// 5、创建队列String queueName simple.queue;channel.queueDeclare(queueName,false,false,false,null);
// 6、订阅消息channel.basicConsume(queueName,true,new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 7、处理消息String message new String(body);System.out.println(接收到消息【message】);}});System.out.println(等待接收消息....);}
}第二种Work Queue 工作队列
与基本队列的区别在于它能使用多个订阅队列进行高效的处理请求。因为一个订阅队列的处理速度是有限的
使用过程与基本队列几乎一致只是开启了多个订阅队列。
在使用过程中我们会发现多个订阅队列对任务的分配是平均的这就是预取机制。
我们需要的是快速处理的订阅队列获取更多的请求慢速处理的订阅队列获取少量的请求它如何实现呢
通过修改配置文件设置一个 preFetch 值。
spring:rabbitmq:host: 192.168.92.131 #IPport: 5672 #端口virtual-host: / #虚拟主机username: root #用户名password: root #密码listener:simple:prefetch: 1 # 每次取 1 个请求处理完才能取下一个。第三种FanoutQueue 广播消息队列
SpringAMQP提供声明交换机、队列、绑定关系的API
主要使用的是Exchange.FanoutExchange类。
实现思路 1、在consumer服务声明队列交换机并将两者绑定。
Configuration
public class FanoutConfig{//交换机Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(com.fanout);}//队列Beanpublic Queue fanoutQueue1(){return new Queue(com.queue1);}//绑定关系Beanpublic Binding bindingQueue(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}//...以相同方式声明第2个队列并完成绑定}2、在consumer服务编写两个消费者方法分别监听fanout.queue1和fanout.queue2
Component
public class SpringRabbitListener {RabbitListener(queues com.queue1)public void listenFanoutQueue1(String msg) throws InterruptedException {//...处理结果}RabbitListener(queues com.queue2)public void listenFanoutQueue2(String msg) throws InterruptedException {//...处理结果}
}3、在publisher编写测试方法向交换机发送信息
Test
public void sendFanoutExchange() {//1、交换机String exchangeName com.fanout;//2、消息String message Hello Fanout;//3、发送消息rabbitTemplate.covertAndSend(exchangeName, , message);
}第四种路由信息队列
路由模式的流程 即设置密钥的绑定关系只有携带相应的密钥才能进入相应的队列
每一个 Queue 与 Exchange 设置一个 BindingKey发布者发送消息时需要指定消息的 RoutingKeyExchange根据消息路由到 BindingKey 与 RoutingKey 一致的队列
实现思路 1、利用 RabbitListener 声明Exchange、Queue、RoutingKey
RabbitListener(bindings QueueBinding(value Queue(name direct.queue1), exchange Exchange(name com.exchange, type ExcahngeTypes.DIRECT), key {red,blue}))
public void listenRoutingQueue1(String msg) throws InterruptedException {//...处理结果
}RabbitListener(bindings QueueBinding(value Queue(name direct.queue2), exchange Exchange(name com.exchange, type ExcahngeTypes.DIRECT), key {red,green}))
public void listenRoutingQueue2(String msg) throws InterruptedException {//...处理结果2、发送消息实现
//指定队列处理
Test
public void sendRoutingExchange1(){//交换机消息String exchangeName com.exchange;String message Hello,RoutingMQ;//发送消息rabbitTemplate.covertAndSend(exchangeName, blue, message);
}//多队列处理
Test
public void sendRoutingExchange2(){//交换机消息String exchangeName com.exchange;String message Hello,RoutingMQ;//发送消息rabbitTemplate.covertAndSend(exchangeName, red, message);
}第五种主题信息队列通配key
TopicExchange 与 DirectExchange 的区别 routingkey必须是多个单词的列表并且以,分割。并且Queue与Exchange指定的BindingKey时可使用通配符
**#**代指 0 / n 个单词* 代指一个单词
实现思路 1、通过 RabbitListener 声明Exchange、Queue、RoutingKey
RabbitListener(bingdings QueueBinding(exchange Exchange(name com.exchange, type ExchangeTypes.TOPIC), queue Queue(name com.queue1), key {china.#}))
public void listenTopicQueue1(String msg) {//处理代码....
}RabbitListener(bingdings QueueBinding(exchange Exchange(name com.exchange, type ExchangeTypes.TOPIC), queue Queue(name com.queue2), key {#.news}))
public void listenTopicQueue2(String msg) {//处理代码....
}2、在publisher服务中向交换机发送消息
Test
public void sendTopicMessage(){//交换机消息String exchangeName com.exchange;String message Hello,Topic;rabbitTemplate.convertAndSend(exchangeName,china.call,message);
}四、SpringAMQP
一概念
AMQP Advanced Message Queuing Protocol 传递消息队列协议是用于在应用程序或之间传递业务消息的开放标准。该协议与语言及平台无关更符合为服务中独立性的要求。Spring AMQP Spring AMQP是基于AMQP协议定义的一套API规范提供了模板来发送和接收消息。其中 spring-amqp是基础抽象spring-rabbit是底层的默认实现。
二实现基础消息队列
1、引入spring-amqp依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency2、publisher服务中利用RabbitTemplate发送消息到任务队列
配置mq连接信息
spring:rabbitmq:host: 192.168.92.131 #IPport: 5672virtual-host: /username: rootpassword: root编写发送方法 Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void sendMessage(){String queueName simple.queue;String message Hello World;rabbitTemplate.convertAndSend(queueName,message);}3、在consumer服务中编写消费逻辑绑定simple.queue队列
配置mq连接信息
spring:rabbitmq:host: 192.168.92.131 #IPport: 5672virtual-host: /username: rootpassword: root编写发送方法1 Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void getMessage(){String queueName simple.queue;// receive 表示接收方法接收到的信息会封装到Message可以看receive的返回值Message message rabbitTemplate.receive(queueName);// Message.getBody 是 byte[]System.out.println(new String(message.getBody()));}编写发送方法2 创建一个监听类
// 注册成 Bean 对象
Component
public class SpringRabbitListener {// 监听器注释queues 订阅队列并将返回值注入参数列表中 RabbitListener(queues simple.queue)public void ListenSimpleQueueMessage(String msg){System.out.println(Spring 消费者接收到消息【 msg 】);}
}三消息转换器
为了让我们能够自由识别consumer发送的消息则需要使用的是消息转换器。
消息转换器如何使用
Spring对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理默认实现的是SimpleMessageConverter基于ObjectObjectOutputStream完成序列化。
我们只需要定义一个 MessageConverter 类型的Bean即可推荐使用JSON序列化
1、publisher引入依赖 !-- 接收消息需要使用jackson的转换依赖 --
dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactIdversion2.9.10/version
/dependency!-- 发送消息需要使用jackson的核心依赖 --
dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId
/dependency
2、publisher启动类声明MessageConverter
Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}3、consumer启动类声明MessageConverter
Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}4、监听队列消息
RabbitListener(queues object.queue)
public void listenObjectMessage(Object msg) {//处理数据....
}