做视频网站有什么,wordpress RSS怎么用,申请个网站,wordpress工作室主题同步和异步 异步调用 MQ MQ优势#xff1a;①服务解耦 ②异步调用 ③流量削峰 结构 消息模型 RabbitMQ入门案例#xff0c;实现消息发送和消息接收 生产者#xff1a;
public class PublisherTest {Testpublic void testSendMessage() throws IOException, TimeoutExce… 同步和异步 异步调用 MQ MQ优势①服务解耦 ②异步调用 ③流量削峰 结构 消息模型 RabbitMQ入门案例实现消息发送和消息接收 生产者
public class PublisherTest {Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory new ConnectionFactory();// 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码factory.setHost(192.168.136.132);factory.setPort(5672);factory.setVirtualHost(/);factory.setUsername(itcast);factory.setPassword(123321);// 1.2.建立连接Connection connection factory.newConnection();// 2.创建通道ChannelChannel channel connection.createChannel();// 3.创建队列String queueName simple.queue;channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message hello, rabbitmq!;channel.basicPublish(, queueName, null, message.getBytes());System.out.println(发送消息成功【 message 】);// 5.关闭通道和连接channel.close();connection.close();}
}
消费者
public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory new ConnectionFactory();// 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码factory.setHost(192.168.136.132);factory.setPort(5672);factory.setVirtualHost(/);factory.setUsername(itcast);factory.setPassword(123321);// 1.2.建立连接Connection connection factory.newConnection();// 2.创建通道ChannelChannel channel connection.createChannel();// 3.创建队列String queueName simple.queue;channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message new String(body);System.out.println(接收到消息【 message 】);}});System.out.println(等待接收消息。。。。);}
} SpringAMQP 引入依赖 !--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency 普通队列 第一步publisher服务配置文件发消息
spring:rabbitmq:host: 192.168.136.132port: 5672username: itcastpassword: 123321virtual-host: /
RunWith(SpringRunner.class)
SpringBootTest
public class SpringAMQPTest {//获取RabbitTemplateAPIResourceprivate RabbitTemplate rabbitTemplate;Testpublic void test(){String queueName simple.queue;String message hello SpringAMQP;//使用API传入队列名和消息即可直接发送rabbitTemplate.convertAndSend(queueName,message);}}第二步Consumer服务配置信息监听消息
spring:rabbitmq:host: 192.168.136.132port: 5672username: itcastpassword: 123321virtual-host: /
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;//定义一个监听类去监听消息
Component
public class SpringRabbitListener {RabbitListener(queues simple.queue)public void ListenSimpleQueue(String msg){System.out.println(msg msg);}
} Work Queue队列 多个消费者绑定到同一个队列可以通过prefetch来控制消费者消息预取的数量
第一步 生产者发送消息
RunWith(SpringRunner.class)
SpringBootTest
public class SpringAMQPTest {//获取RabbitTemplateAPIResourceprivate RabbitTemplate rabbitTemplate;Testpublic void test01() throws InterruptedException {String queueName simple.queue;String message hello SpringAMQP--;for (int i 0; i 50; i) {//使用API传入队列名和消息即可直接发送rabbitTemplate.convertAndSend(queueName,messagei);Thread.sleep(20);}}}
第二步消费者设置多个监听消息
Component
public class SpringRabbitListener {RabbitListener(queues simple.queue)public void ListenWorkQueue(String msg) throws InterruptedException {System.out.println(消费者一接收到消息---- msg LocalDateTime.now());Thread.sleep(20);}RabbitListener(queues simple.queue)public void ListenWorkQueue01(String msg) throws InterruptedException {System.out.println(消费者二接收到消息---- msg LocalDateTime.now());Thread.sleep(200);}
}第三步消费者可通过prehtch设置消息预取数量
spring:rabbitmq:host: 192.168.136.132port: 5672username: itcastpassword: 123321virtual-host: /listener:simple:prefetch: 1 发布-订阅模型 Fanout广播交换机 ---多个队列收到交换机的消息
第一步Consumer声明交换机队列并进行绑定。
Configuration
public class FanoutConfig {//声明交换机Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(itcast.fanout);}//声明队列1Beanpublic Queue fanoutQueue1(){return new Queue(fanout.queue1);}//绑定队列1到交换机上Beanpublic Binding fanoutBanding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//声明队列2Beanpublic Queue fanoutQueue2(){return new Queue(fanout.queue2);}//绑定队列2到交换机上Beanpublic Binding fanoutBanding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
} 第二步Consumer进行监听消息
Component
public class SpringRabbitListener {RabbitListener(queues fanout.queue1)public void ListenSimpleQueue1(String msg){System.out.println(消费者接收到fanout.queue1的消息 msg);}RabbitListener(queues fanout.queue2)public void ListenSimpleQueue2(String msg){System.out.println(消费者接收到fanout.queue2的消息 msg);}
}
第三步Publisher向交换机发送消息
RunWith(SpringRunner.class)
SpringBootTest
public class SpringAMQPTest {//获取RabbitTemplateAPIResourceprivate RabbitTemplate rabbitTemplate;Testpublic void testExchange() {//声明交换机名称String exchangeName itcast.fanout;//消息String message Hello Everyone;//发送消息rabbitTemplate.convertAndSend(exchangeName,,message);}
} Direct路由交换机 ---将消息发给指定key的队列 第一步在Listener中声明队列交换机以及key
Component
public class SpringRabbitListener {//声明队列1交换机以及队列1的bindingKeyRabbitListener(bindings QueueBinding(value Queue(name derict.queue1),exchange Exchange(name itcast.direct,type ExchangeTypes.DIRECT),key {red,blue}))public void ListenDirectQueue1(String msg){System.out.println(消费者接收到direct.queue1的消息 msg);}//声明队列2交换机以及队列2的bindingKeyRabbitListener(bindings QueueBinding(value Queue(name derict.queue2),exchange Exchange(name itcast.direct,type ExchangeTypes.DIRECT),key {red,yellow}))public void ListenDirectQueue2(String msg){System.out.println(消费者接收到direct.queue2的消息 msg);}
}
第二步向指定key的队列发送消息 Testpublic void testDirect() {//声明交换机名称String exchangeName itcast.direct;//消息String message Hello Blue;//发送消息指定交换机队列以及要发送的keyrabbitTemplate.convertAndSend(exchangeName,red,message);} Topic主题交换机 ----key必须是多个单词列表统一主题支持通配符 第一步在Listener中声明队列交换机以及通配符key
Component
public class SpringRabbitListener {//声明队列2的交换机队列以及通配符keyRabbitListener(bindings QueueBinding(value Queue(name topic.queue1),exchange Exchange(name itcast.topic,type ExchangeTypes.TOPIC),key china.#))public void ListenTopicQueue1(String msg){System.out.println(消费者接收到topic.queue1的消息 msg);}//声明队列2的交换机队列以及通配符keyRabbitListener(bindings QueueBinding(value Queue(name topic.queue2),exchange Exchange(name itcast.topic,type ExchangeTypes.TOPIC),key #.news))public void ListenTopicQueue2(String msg){System.out.println(消费者接收到topic.queue2的消息 msg);}}
第二步向主题通配符发送消息 RunWith(SpringRunner.class)
SpringBootTest
public class SpringAMQPTest {//获取RabbitTemplateAPIResourceprivate RabbitTemplate rabbitTemplate;Testpublic void testTopic() {//声明交换机名称String exchangeName itcast.topic;//消息String message Hello China;//发送消息rabbitTemplate.convertAndSend(exchangeName,china.news,message);}
} 消息转换器 RabbitMQ发的消息体都是Object类型所有还可以发送对象数据。而且默认的消息转换器是MessageConverter实现的当使用的是Map数据类型时就会序列化成很多字节所以推荐使用JSON的序列化和反序列化直接修改默认的MessageConverter的类型 dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId/dependency
Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();} 对于RabbitMQ高级部分死信队列延迟队列发布确认幂等性优先惰性队列等有时间再学