建设电子商务网站需要什么,重庆seo全网营销,厦门邮件网站,明天上海封控16个区一、使用场景
削峰、解耦、异步。 基于AMQP(高级消息队列协议)协议来统一数据交互,通过channel(网络信道)传递信息。erlang语言开发#xff0c;并发量12000#xff0c;支持持久化#xff0c;稳定性好#xff0c;集群不支持动态扩展。 RabbitMQ的基本概念 二、组成及工作流…一、使用场景
削峰、解耦、异步。 基于AMQP(高级消息队列协议)协议来统一数据交互,通过channel(网络信道)传递信息。erlang语言开发并发量12000支持持久化稳定性好集群不支持动态扩展。 RabbitMQ的基本概念 二、组成及工作流程
1.主要组成
Broker(消息代理): 消息队列服务进程,一个Broker以开多个虚拟主机(VirtualHost)。
VirtualHost(虚拟主机)虚拟主机用于进行逻辑隔离一个虚拟主机可以有若干个Exchange和Queue
Exchange(交换机)消息队列交换机按一定的规则将消息路由转发到某个队列。
Queue消息队列。2.工作流程 RabbitMQ的工作流程 生产者发送消息流程
1、和Broker建立TCP连接。
2、和Broker建立通道。
3、通过通道消息发送给Broker由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue队列。消费者接收消息流程
1、和Broker建立TCP连接
2、和Broker建立通道
3、监听指定的Queue队列
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、接收到消息。
6、ack回复。
三、交换机Exchange(默认direct)
交换机接受消息根据路由键发送消息到绑定的队列不具备消息存储的能力。
1.交换机种类
Direct: 单播直连交换机Exchange将消息完全匹配路由键(routing key)的方式绑定消息获取信息时也要匹配Exchange和路由键。直连交换机 fanout: 广播式交换机(Publish/subscribe)不管消息的路由键(routing key)Exchange都会将消息转发给所有绑定的Queue。广播/扇形交换机 topic: 主题交换机工作方式类似于组播Exchange会将消息转发和路由键(routing key)符合匹配模式的所有队列如: routing_key为user.stock的Message会转发给绑定匹配模式为 *.stock 、user.stock* 、 #.user.stock.#的队列。(*表是匹配一个任意词组#表示匹配0个或多个词组)。主题交换机 headers: 头交换机无Binding Key当然也无Routing Key。根据发送的消息内容中的headers属性进行匹配。2.交换机属性
Name交换机名称
Durability持久化标志表明此交换机是否是持久化的
Auto-delete删除标志表明当所有队列在完成使用此exchange时是否删除
Arguments依赖代理本身。3.交换机状态
持久durable 暂存transient
4.消息确认机制ACK
自动ACK消息一旦被接收消费者自动发送ACK。
手动ACK消息接收后不会发送ACK需要手动调用。
四、rabbitmq 客户端的使用
1.引入依赖 !-- rabbitmq 客户端依赖 --dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.8.0/version/dependency
2.创建连接工具
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class MyRabbitMQUtils {public static Connection getConnel() throws Exception{//1 创建 ConnectionFactoryConnectionFactory factory new ConnectionFactory() ;factory.setHost(127.0.0.1);//端口factory.setPort(5672);//设置虚拟机一个mq服务可以设置多个虚拟机每个虚拟机就相当于一个独立的mqfactory.setVirtualHost(/);factory.setUsername(guest);factory.setPassword(guest);factory.setAutomaticRecoveryEnabled(true);factory.setNetworkRecoveryInterval(3000);Connection connection factory.newConnection();// Channel channel connection.createChannel();return connection;}
}
3.生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer {// 交换机名称private final static String EXCHANGE_NAME simple_exchange;// 队列名称private final static String QUEUE_NAME simple_queue;public static void main(String[]args) throws Exception{Connection produceConnection MyRabbitMQUtils.getConnel();Channel produceChannel produceConnection.createChannel();// 建立交换机(广播)produceChannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,true);/** 1、queue 队列名称* 2、durable 是否持久化* 3、exclusive 是否独占连接队列只允许在该连接中访问如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除队列不再使用时是否自动删除此队列如果将此参数和exclusive参数设置为true就可以实现临时队列队列不用了就自动删除* 5、arguments 参数可以设置一个队列的扩展参数如可设置存活时间*/produceChannel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 1、exchange交换机如果不指定将使用mq的默认交换机设置为* 2、routingKey路由key交换机根据路由key来将消息转发到指定的队列如果使用默认交换机routingKey设置为队列的名称* 3、props消息的属性* 4、body消息内容*/for(int i0;i10;i){String message 生产者发布的消息---!;message messagei;produceChannel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes());System.out.println( Producer 发布 message );}//关闭通道和连接(资源关闭最好用try-catch-finally语句处理)produceChannel.close();produceConnection.close();}
}
4.消费者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;public class Comsumer {// 队列名称private final static String QUEUE_NAME simple_queue;public static void main(String[] argv) throws Exception {Connection comsumerConnection MyRabbitMQUtils.getConnel();Channel comsumerChannel comsumerConnection.createChannel();/** 参数明细* 1、queue 队列名称* 2、durable 是否持久化* 3、exclusive 是否独占连接队列只允许在该连接中访问如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除队列不再使用时是否自动删除此队列如果将此参数和exclusive参数设置为true就可以实现临时队列队列不用了就自动删除* 5、arguments 参数可以设置一个队列的扩展参数如可设置存活时间*/comsumerChannel.queueDeclare(QUEUE_NAME, false, false, false, null);//实现消费方法DefaultConsumer consumer new DefaultConsumer(comsumerChannel){/** 当接收到消息后此方法将被调用* param consumerTag 消费者标签用来标识消费者的在监听队列时设置channel.basicConsume* param envelope 信封通过envelope* param properties 消息属性* param body 消息内容*/Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange envelope.getExchange();//消息idmq在channel中用来标识消息的id可用于确认消息已接收long deliveryTag envelope.getDeliveryTag();// body 即消息体String msg new String(body,utf-8);System.out.println(Comsumer 获得: msg !);// 手动 ACKcomsumerChannel.basicAck(envelope.getDeliveryTag(),false);}};// 监听队列第二个参数是否自动进行消息确认。//参数String queue, boolean autoAck, Consumer callback/*** 参数明细* 1、queue 队列名称* 2、autoAck 自动回复当消费者接收到消息后要告诉mq消息已接收如果将此参数设置为tru表示会自动回复mq如果设置为false要通过编程实现回复。* 3、callback消费方法当消费者接收到消息要执行的方法。*/comsumerChannel.basicConsume(QUEUE_NAME, false, consumer);}
}
五、Spring中使用RabbitMQ
1.引入依赖 !-- AMQP 依赖 --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactIdversion2.2.7.RELEASE/version/dependency!--springboot测试依赖--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdversion2.5.6/version/dependency
2.更改配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest
3.把交换机、和队列加入IOC容器中
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitMQConfig {// email队列public static final String QUEUE_EMAIL queue_email;// sms队列public static final String QUEUE_SMS queue_sms;// topics类型交换机public static final String EXCHANGE_NAMEtopic.exchange;public static final String ROUTINGKEY_EMAILtopic.#.email.#;public static final String ROUTINGKEY_SMStopic.#.sms.#;//声明交换机Bean(EXCHANGE_NAME)public Exchange exchange(){//durable(true) 持久化mq重启之后交换机还在return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//声明email队列/*** new Queue(QUEUE_EMAIL,true,false,false)* durabletrue 持久化 rabbitmq重启的时候不需要创建新的队列* auto-delete 表示消息队列没有在使用时将被自动删除 默认是false* exclusive 表示该消息队列是否只在当前connection生效,默认是false*/Bean(QUEUE_EMAIL)public Queue emailQueue(){return new Queue(QUEUE_EMAIL);}//声明sms队列Bean(QUEUE_SMS)public Queue smsQueue(){return new Queue(QUEUE_SMS);}//ROUTINGKEY_EMAIL队列绑定交换机指定routingKeyBeanpublic Binding bindingEmail(Qualifier(QUEUE_EMAIL) Queue queue, Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();}//ROUTINGKEY_SMS队列绑定交换机指定routingKeyBeanpublic Binding bindingSMS(Qualifier(QUEUE_SMS) Queue queue, Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();}}
4.模拟业务发送消息
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;SpringBootTest
public class Send {AutowiredRabbitTemplate rabbitTemplate;Testpublic void sendMsgByTopics(){/*** 参数* 1、交换机名称* 2、routingKey* 3、消息内容*/for (int i0;i5;i){String message 恭喜您注册成功useridi;rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,topic.sms.email,message);System.out.println( [x] Sent message );}}
}
5.消息的监听及处理
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class Receive {//监听邮件队列RabbitListener(bindings QueueBinding(value Queue(value queue_email, durable true),exchange Exchange(value topic.exchange, ignoreDeclarationExceptions true, type ExchangeTypes.TOPIC),key {topic.#.email.#,email.*}))public void rece_email(String msg){System.out.println( [邮件服务] received : msg !);}//监听短信队列RabbitListener(bindings QueueBinding(value Queue(value queue_sms, durable true),exchange Exchange(value topic.exchange, ignoreDeclarationExceptions true, type ExchangeTypes.TOPIC),key {topic.#.sms.#}))public void rece_sms(String msg){System.out.println( [短信服务] received : msg !);}}