asp+sql server典型网站建设案例 光盘,官网订机票,高端网站制作怎么样,链接买卖是什么意思1. mq简介 消息队列是分布式系统中的异步通信中间件#xff0c;采用生产者-消费者模型实现服务间解耦通信
核心作用
服务解耦异步处理流量削峰数据同步最终一致性
消息队列模式
发布/订阅模式#xff1a;一对多广播工作队列模式#xff1a;竞争消费死信队列…1. mq简介 消息队列是分布式系统中的异步通信中间件采用生产者-消费者模型实现服务间解耦通信
核心作用
服务解耦异步处理流量削峰数据同步最终一致性
消息队列模式
发布/订阅模式一对多广播工作队列模式竞争消费死信队列处理失败消息延迟队列定时任务处理消息回溯Kafka按offset重新消费
2. mq入门 使用SpringAMQP实现HelloWorld中的基础消息队列功能一个生产者一个队列一个消费者
2.1 启动mq 打开mq下载目录输入命令rabbitmq-server start启动 网址localhost:15672访问账号密码均为guest 2.2 导入依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.gaohe/groupIdartifactIdclouddemo/artifactIdpackagingpom/packagingversion0.0.1-SNAPSHOT/versionmodulesmodulepublisher/modulemoduleconsumer/module/modulesnameclouddemo/namedescriptionclouddemo/descriptionpropertiesjava.version17/java.versionproject.build.sourceEncodingUTF-8/project.build.sourceEncodingproject.reporting.outputEncodingUTF-8/project.reporting.outputEncodingspring-boot.version3.3.3/spring-boot.versionmaven.compiler.source17/maven.compiler.sourcemaven.compiler.target17/maven.compiler.target/propertiesdependencies!--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency!--lombok--dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependencydependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId/dependencydependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId/dependency!--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependenciesdependencyManagementdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-dependencies/artifactIdversion${spring-boot.version}/versiontypepom/typescopeimport/scope/dependency/dependencies/dependencyManagementbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdversion${spring-boot.version}/versionconfigurationmainClasscom.gaohe.clouddemo.ClouddemoApplication/mainClassskiptrue/skip/configurationexecutionsexecutionidrepackage/idgoalsgoalrepackage/goal/goals/execution/executions/plugin/plugins/build/project2.3 在yml配置文件中配置连接信息
spring:rabbitmq:host: localhost # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: guest # 用户名password: guest # 密码2.4 在publisher中利用RabbitTemplate发送信息到simple.queue队列
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;Slf4j
SpringBootTest(classes PublisherApplication.class)
public class PublisherTest {Autowiredpublic RabbitTemplate rabbitTemplate;// 发送消息
Test
public void test1(){
// 1.发送的队列String queueName1 hello.queue;
// 2.发送的消息String msg 你好我哟一个帽衫;
// 3.发送rabbitTemplate.convertAndSend(queueName1,msg);
}}2.5 在consumer服务中编写消费逻辑绑定simple.queue这个队列
package com.gaohe.consumer.lisenner;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class HelloLisenner {RabbitListener(queues hello.queue)public void helloQueueLisenner(String msg){System.out.println(helloQueueLisennermsg);}RabbitListener(queues hello.queue)public void helloQueueLisenner2(String msg){System.out.println(helloQueueLisenner2msg);}}3.交换机 Exchange是消息队列系统中的消息路由中枢负责接收生产者发送的消息并根据特定规则将消息路由到一个或多个队列中。
常见exchange类型包括
Fanout广播Direct路由Topic话题
3.1 路由交换机FanoutExchange
在consumer服务创建一个类添加注解声明交换机队列以及绑定关系对象
package com.gaohe.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;Component
public class FanoutConfig {// 交换机Beanpublic FanoutExchange fanout1(){return new FanoutExchange(itgaohe.fanout);}// 定义队列Beanpublic Queue queue1(){return new Queue(fanout.queue1);}// 队列绑定交换机Beanpublic Binding binding1(FanoutExchange fanout1){return BindingBuilder.bind(queue1()).to(fanout1);}// 定义队列Beanpublic Queue queue2(){return new Queue(fanout.queue2);}// 队列绑定交换机Beanpublic Binding binding2(FanoutExchange fanout1){return BindingBuilder.bind(queue2()).to(fanout1);}
}在consumer服务中的监听类中添加方法进行监听
package com.gaohe.consumer.lisenner;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class FanoutLisenner {RabbitListener(queues fanout.queue1)public void fanoutQueueLisenner(String msg){System.out.println(fanoutQueueLisenner:msg);}RabbitListener(queues fanout.queue2)public void fanoutQueueLisenner2(String msg){System.out.println(fanoutQueueLisenner2:msg);}
}在publisher服务创建测试类进行测试 import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;Slf4j
SpringBootTest(classes PublisherApplication.class)
public class PublisherTest {Autowiredpublic RabbitTemplate rabbitTemplate;Testpublic void test3(){
// 1.发送的队列String exName itgaohe.fanout;
// 2.发送的消息String msg 你好;
// 3.发送rabbitTemplate.convertAndSend(exName,,msg);}}3.2 路由交换机DirectExchange 交换机队列不仅可以单独配置也可以在监听类使用注解进行配置
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class DirectLisenner {RabbitListener(bindings QueueBinding(value Queue(direct.queue1),exchange Exchange(value itgaohe.direct,type ExchangeTypes.DIRECT),key {blue,red}))public void directQueueLisenner(String msg){System.out.println(directQueueLisennermsg);}RabbitListener(bindings QueueBinding(value Queue(direct.queue2),exchange Exchange(value itgaohe.direct,type ExchangeTypes.DIRECT),key {yellow,red}))public void directQueueLisenner2(String msg){System.out.println(directQueueLisenner2msg);}}publisher测试类进行测试
package com.gaohe.publisher;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;Slf4j
SpringBootTest(classes PublisherApplication.class)
public class PublisherTest {Autowiredpublic RabbitTemplate rabbitTemplate;Testpublic void test3(){
// 1.发送的队列String exName itgaohe.direct;
// 2.发送的消息String msg I LOVE YOU ;
// 3.发送rabbitTemplate.convertAndSend(exName,yellow,msg);}}3.3 广播交换机TopicExchange
监听类
package com.gaohe.consumer.lisenner;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;Component
public class TopicLisenner {RabbitListener(bindings QueueBinding(value Queue(topic.queue1),exchange Exchange(value itgaohe.topic,type ExchangeTypes.TOPIC),key {china.#,#.weather}))public void directQueueLisenner(String msg){System.out.println(directQueueLisennermsg);}RabbitListener(bindings QueueBinding(value Queue(topic.queue2),exchange Exchange(value itgaohe.topic,type ExchangeTypes.TOPIC),key {us.#,#.weather}))public void directQueueLisenner2(String msg){System.out.println(directQueueLisenner2msg);}}测试类
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;Slf4j
SpringBootTest(classes PublisherApplication.class)
public class PublisherTest {Autowiredpublic RabbitTemplate rabbitTemplate;Testpublic void test4(){
// 1.发送的String exName itgaohe.topic;
// 2.发送的消息String msg hello world6666;
// 3.发送rabbitTemplate.convertAndSend(exName,aa.weather,msg);}
}用的最多的是路由交换机和广播交换机
4. mq消息转换器 消息转换器是消息中间件中的数据格式转换层负责在消息生产/消费过程中实现 Java对象 ↔ 消息体序列化/反序列化 消息属性(headers/properties)的自动处理 不同数据格式间的相互转换
配置消息转换器
父工程导入依赖
dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId
/dependency
给提供者和消费者配置消息转换器Bean对象
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;Component
public class mqConfig {Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}定义消费者监听队列并消费消息 测试