湖北建站管理系统信息,黑色网站,国内十大知名广告公司,网站使用帮助内容目录 RabbitMQ安装 rabbitmqSpringAMQP 基础队列WorkQueue路由发布订阅 FanoutExchangeDirectExchangeTopicExchange RabbitMQ 安装 rabbitmq
首先确保自己已经安装好了 docker
是 docker 拉取镜像文件#xff1a;docker pull rabbitmq:3-management
拉取完毕#xff0c;打… 目录 RabbitMQ安装 rabbitmqSpringAMQP 基础队列WorkQueue路由发布订阅 FanoutExchangeDirectExchangeTopicExchange RabbitMQ 安装 rabbitmq
首先确保自己已经安装好了 docker
是 docker 拉取镜像文件docker pull rabbitmq:3-management
拉取完毕打开容器
docker run \-e RABBITMQ_DEFAULT_USERitcast \-e RABBITMQ_DEFAULT_PASS123321 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management浏览器访问虚拟机的 15672 端口即可看见 rabbitmq 管理界面 我们可以在 admin 选项卡内添加新的用户
其中的can access virtual hosts表示当前用户对应的虚拟主机 建议不同用户对应不同的虚拟主机可以实现隔离效果
虚拟主机可以点击上图右侧的 virtual hosts 按钮新建 SpringAMQP 基础队列
由于使用官方原生操作 rabbitmq 的方式太过生草代码巨多不适合日常开发推荐改用 SpringAMQP 来简化操作
导入坐标
dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId
/dependency
!--AMQP依赖包含RabbitMQ--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
!--单元测试--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId
/dependency
dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId
/dependency配置 rabbitmq 链接
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.113.146 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /publisher 编写测试类测试 AMQP
由于我们的 rabbitmq 默认没有创建队列 simple.queue 所以你直接发送是接收不到任何信息的必须要先进行判断如果队列不存在那就先创建对应队列后在发送才可以接受得到
Test
public void testSendMessage2SimpleQueue() {RabbitAdmin admin new RabbitAdmin(rabbitTemplate);String queueName simple.queue;String message hello, spring amqp!;// 队列是否存在的判断if (Objects.isNull(admin.getQueueProperties(queueName))) {Queue queue new Queue(queueName);admin.declareQueue(queue);}// 发送消息到消息队列rabbitTemplate.convertAndSend(queueName, message);
}不出意外的话你在 rabbitmq 控制台的 queue 选项内就可以看见新创建的 simple.queue 队列里面包含着我们发送的第一条信息 consumer 消费对应队列中的消息
监听之前也要和 publisher 配置相同的 application.yaml这样才可以连接到 rabbitmq
新建监听消费类 SpringRabbitListener 传入如下代码执行监听
Component
public class SpringRabbitListener {// 设置消费者需要监听的队列RabbitListener(queues simple.queue)public void listenWorkQueue1(String msg) throws InterruptedException {// 获取队列中信息System.out.println(消费者1接收到消息【 msg 】 LocalTime.now());Thread.sleep(20);}
}WorkQueue 上图展示了工作队列的流程图实际上就是增加了一个消费者来消费队列中的消息
由于我们上一节已经创建了 simple.queue这里就不用判断了直接往里面每隔 20ms 插入一条信息
Test
public void testSendMessage2WorkQueue() throws InterruptedException {String queueName simple.queue;String message hello, message__;for (int i 1; i 50; i) {rabbitTemplate.convertAndSend(queueName, message i);Thread.sleep(20);}
}同理按照流程图指示为设置两个消费者监听器 注意第一个消费者每隔 20ms 接受一次消息而第二个消费者则是每隔 200ms 接收一次
RabbitListener(queues simple.queue)
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println(消费者1接收到消息【 msg 】 LocalTime.now());Thread.sleep(20);
}RabbitListener(queues simple.queue)
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println(消费者2........接收到消息【 msg 】 LocalTime.now());Thread.sleep(200);
}先运行消费者然后使用 publisher 插入 50 条消息
从日志输出我们发现1、2 消费者处理了同样多的数据各自 25 条但很显然第二个消费者慢很多因为它每隔 200ms 才处理一个消息
出现这一情况的原因是消息预取也就是说所有消费者获取同样多的消息而不在乎自己每个多久处理一次消息 解决消息分配不均问题
配置文件添加 prefetch 配置项他表示必须先处理完 1 个消息后才可以取出下一消息进行处理有效规避了一瞬间预取全部消息堆积到一个消费者上的场面
spring:rabbitmq:host: 192.168.113.146 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /listener:simple:prefetch: 1此时重复上方操作发现实现了“能者多劳”的效果消费者 1 由于处理速度快故其消费了绝大多数消息而消费者 2 处理消息极少
这样做将整体处理时长压缩到 1s 及以下 路由发布订阅 FanoutExchange 设置路由发布订阅需要分为三步设置路由 Exchange、设置队列 Queue、将队列绑定到路由上
创建 fanout 配置文件 FanoutConfig我们按照以下的代码简单创建 1 个路由以及 2 个队列并实行绑定操作
Configuration
public class FanoutConfig {// 配置路由itcast.fanoutBeanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(itcast.fanout);}// 配置队列fanout.queue1Beanpublic Queue fanoutQueue1(){return new Queue(fanout.queue1);}// 绑定队列1到交换机Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// fanout.queue2Beanpublic Queue fanoutQueue2(){return new Queue(fanout.queue2);}// 绑定队列2到交换机Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}由于他们都添加了 bean 注解故直接运行 consumer 项目他们会自动装配
此时打开 rabbitmq 控制面板进入 exchange 选项就可以找到我们新创建的路由以及对应的队列绑定关系了 至于后续的发布者发布信息以及消费者消费信息的代码大家可以参照上一小节来自己补全这里就不过多赘述了 DirectExchange 可以将其理解为带规则的路由转发机制通过 bindingkey 和 routingkey 相一致配对来实现转发操作 配置消费者监听
// value 监听队列
// exchange 监听路由
// key bindingkey
RabbitListener(bindings QueueBinding(value Queue(name direct.queue1),exchange Exchange(name itcast.direct, type ExchangeTypes.DIRECT),key {red, blue}
))RabbitListener(bindings QueueBinding(value Queue(name direct.queue2),exchange Exchange(name itcast.direct, type ExchangeTypes.DIRECT),key {red, yellow}
))然后发布者再向对应的路由发送带 routingkey 的消息
Test
public void testSendDirectExchange() {// 交换机名称String exchangeName itcast.direct;// 消息String message hello, red!;// 发送消息rabbitTemplate.convertAndSend(exchangeName, red, message);
}TopicExchange 和 directexchange 类似但是 routingkey 为多个单词的列表具体格式参照上图
至于发布者与消费者的书写方式和 directexchange 基本一致需要注意的就是 routingkey 和 bindingkey 的书写方式而已