中国制造网官网入口,wordpress优化数据库,互联网信息服务,淘宝seo具体优化方法目录 1.前置知识
准备工作
2.导入依赖
3.生产者
4.消费者
5.验证
验证Direct
验证Fanout
验证Topic 1.前置知识
rabbitmq有五种工作模式#xff1b;按照有无交换机分为两大类
无交换机的#xff1a;简单队列(一对一,单生产单消费)、工作队列(工作队列有轮训分发和公…目录 1.前置知识
准备工作
2.导入依赖
3.生产者
4.消费者
5.验证
验证Direct
验证Fanout
验证Topic 1.前置知识
rabbitmq有五种工作模式按照有无交换机分为两大类
无交换机的简单队列(一对一,单生产单消费)、工作队列(工作队列有轮训分发和公平分发两种模式)
有交换机发布-订阅、路由模式、主题模式
准备工作
安装rabbitmq并成功启动
2.导入依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
3.生产者
生产端项目结构 逻辑生产者只对交换机进行生产至于队列绑定等放在消费端进行执行
BusinessConfig
定义了三个不同类型的交换机
direct类型:(当生产者往该交换机发送消息时他必须指定固定的routingkey当routingkey值为空他也会匹配routingkey为空的队列)
fanout类型:(当生产者往该交换机发送消息时他所绑定的队列都会收到消息routingkey即使写了也会忽略一般为空字符串)
Topic类型:(当生产者往该交换机发送消息时他并不像direct指定固定的routingkey可以进行模糊匹配当该routingkey为空时他会匹配routingkey为空的队列)
package com.zsp.quartz.queue;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;/*** Author: ZhangSP* Date: 2023/12/7 14:05*/
public class BusinessConfig {// 声明direct交换机public static final String EXCHANGE_DIRECT exchange_direct_inform;// 声明fanout交换机public static final String EXCHANGE_FANOUT exchange_fanout_inform;// 声明topic交换机public static final String EXCHANGE_TOPIC exchange_topic_inform;
}TestProducer
生产消息
package com.zsp.quartz.queue;import com.alibaba.fastjson.JSON;
import com.zsp.quartz.entity.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;SpringBootTest
RunWith(SpringRunner.class)
public class TestProducer {AutowiredRabbitTemplate rabbitTemplate;Testpublic void Producer_topics_springbootTest() {//使用rabbitTemplate发送消息String message ;User user new User();user.setName(张三);user.setEmail(anjduahsd);message JSON.toJSONString(user);// directrabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_DIRECT,,message);// fanoutrabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_FANOUT,,message);// topicrabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_TOPIC,,message);}
}4.消费者
消费者目录结构: BusinessConfig内容解析
①定义交换机类型
②配置交换机与队列的绑定关系
③通过容器工厂声明队列
package com.zsp.consumer.queue;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;/*** Author: ZhangSP* Date: 2023/12/7 14:05*/
Slf4j
Configuration
public class BusinessConfig {// 声明directpublic static final String EXCHANGE_DIRECT exchange_direct_inform;public static final String QUEUE_DIRECT_EMAIL queue_direct_inform_email;public static final String QUEUE_DIRECT_SMS queue_direct_inform_sms;public void BindDirectEmail(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);channel.queueDeclare(QUEUE_DIRECT_EMAIL, true, false, false, null);channel.queueBind(QUEUE_DIRECT_EMAIL, EXCHANGE_DIRECT, );} catch (Exception e) {log.error(声明Direct-email队列时失败, e);}}public void BindDirectSms(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);channel.queueDeclare(QUEUE_DIRECT_SMS, true, false, false, null);channel.queueBind(QUEUE_DIRECT_SMS, EXCHANGE_DIRECT, 123);} catch (Exception e) {log.error(声明Direct-sms失败, e);}}// 声明fanoutpublic static final String EXCHANGE_FANOUT exchange_fanout_inform;public static final String QUEUE_FANOUT_EMAIL queue_fanout_inform_email;public static final String QUEUE_FANOUT_SMS queue_fanout_inform_sms;public void BindFanoutEmail(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);channel.queueDeclare(QUEUE_FANOUT_EMAIL, true, false, false, null);channel.queueBind(QUEUE_FANOUT_EMAIL, EXCHANGE_FANOUT, );} catch (Exception e) {log.error(声明Fanout-email队列时失败, e);}}public void BindFanoutSms(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);channel.queueDeclare(QUEUE_FANOUT_SMS, true, false, false, null);channel.queueBind(QUEUE_FANOUT_SMS, EXCHANGE_FANOUT,);} catch (Exception e) {log.error(声明Fanout-sms失败, e);}}// 声明topicpublic static final String EXCHANGE_TOPIC exchange_topic_inform;public static final String QUEUE_TOPIC_EMAIL queue_topic_inform_email;public static final String QUEUE_TOPIC_SMS queue_topic_inform_sms;public static final String ROUTINGKEY_EMAILinform.#.email.#;public static final String ROUTINGKEY_SMSinform.#.sms.#;public void BindTopicEmail(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);channel.queueDeclare(QUEUE_TOPIC_EMAIL, true, false, false, null);channel.queueBind(QUEUE_TOPIC_EMAIL, EXCHANGE_TOPIC, ROUTINGKEY_EMAIL);} catch (Exception e) {log.error(声明Topic-email队列时失败, e);}}public void BindTopicSms(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);channel.queueDeclare(QUEUE_TOPIC_SMS, true, false, false, null);channel.queueBind(QUEUE_TOPIC_SMS, EXCHANGE_TOPIC,);} catch (Exception e) {log.error(声明Topic-sms失败, e);}}// 声明队列AutowiredQualifier(value zspConnectionFactory)private ConnectionFactory connectionFactory;PostConstructpublic void shengmingQueue() {try {Connection connection connectionFactory.createConnection();Channel channel connection.createChannel(false);BindDirectEmail(channel);BindDirectSms(channel);BindFanoutEmail(channel);BindFanoutSms(channel);BindTopicEmail(channel);BindTopicSms(channel);} catch (Exception e) {log.error(业务实例声明绑定队列报错:,e);}}
}RabbitFactory内容解析
①创建自定义连接工厂
②通过Qualifier准确注入连接工厂创建个性化容器工厂
package com.zsp.consumer.queue;import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
EnableRabbit
public class RabbitFactory {Bean(zspConnectionFactory)public ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory new CachingConnectionFactory();// 设置RabbitMQ的连接信息如主机名、端口号、用户名和密码等connectionFactory.setHost(localhost);connectionFactory.setPort(5672);connectionFactory.setUsername(root);connectionFactory.setPassword(root);return connectionFactory;}Bean(rabbitListenerContainerFactory)public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(Qualifier(zspConnectionFactory) ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setConcurrentConsumers(5);factory.setMaxConcurrentConsumers(10);return factory;}
}ReceiveHandler内容解析
监听绑定的队列消息
package com.zsp.consumer.queue;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class ReceiveHandler {//监听自定义的Direct队列RabbitListener(queues BusinessConfig.QUEUE_DIRECT_SMS, containerFactory rabbitListenerContainerFactory)public void directSMS(String msg, Message message, Channel channel) {JSONObject jsonObject JSONObject.parseObject(msg);System.out.println(Direct队列-sms队列 jsonObject);}RabbitListener(queues BusinessConfig.QUEUE_DIRECT_EMAIL, containerFactory rabbitListenerContainerFactory)public void directEmail(String msg, Message message, Channel channel) {JSONObject jsonObject JSONObject.parseObject(msg);System.out.println(Direct队列-email队列 jsonObject);}//监听自定义的Fanout队列RabbitListener(queues BusinessConfig.QUEUE_FANOUT_SMS, containerFactory rabbitListenerContainerFactory)public void FanoutSMS(String msg, Message message, Channel channel) {JSONObject jsonObject JSONObject.parseObject(msg);System.out.println(Fanout队列-sms队列 jsonObject);}RabbitListener(queues BusinessConfig.QUEUE_FANOUT_EMAIL, containerFactory rabbitListenerContainerFactory)public void FanoutEmail(String msg, Message message, Channel channel) {JSONObject jsonObject JSONObject.parseObject(msg);System.out.println(Fanout队列-email队列 jsonObject);}//监听自定义的Topic队列RabbitListener(queues BusinessConfig.QUEUE_TOPIC_SMS, containerFactory rabbitListenerContainerFactory)public void TopicSMS(String msg, Message message, Channel channel) {JSONObject jsonObject JSONObject.parseObject(msg);System.out.println(Topic队列-sms队列 jsonObject);}RabbitListener(queues BusinessConfig.QUEUE_TOPIC_EMAIL, containerFactory rabbitListenerContainerFactory)public void TopicEmail(String msg, Message message, Channel channel) {JSONObject jsonObject JSONObject.parseObject(msg);System.out.println(Topic队列-email队列 jsonObject);}
}
5.验证
先启动消费者端然后执行TestProducer
验证Direct
1.向routingkey为空的队列发消息 我们在消费者端配置了routingkey为空的队列叫做 QUEUE_DIRECT_EMAIL 因此会打印出下面这条记录 2.向routingkey为123的队列发消息 我们在消费者端配置了routingkey为123的队列叫做 QUEUE_DIRECT_SMS 因此会打出下面这条记录 验证Fanout 谁跟我绑定了我都发 验证Topic
模糊匹配routingkey 匹配sms队列 会把下面这个打印出来 需要注意的是如果我们没有自定义容器工厂的话这个containerFactory可以不写 简单理解就是实例也就是rabbitmq服务地址是在哪里实例包括了域名、端口、账号、密码等。