顺德做网站公司,雄安邯郸网站制作多少钱,代做毕业设计找哪个网站好,怎样加强文化建设问题
前面谈到基于死信的延迟队列#xff0c;存在的问题#xff1a;如果第一个消息延时时间很长#xff0c;而第二个消息延时时间很短#xff0c;第二个消息并不会优先得到执行。
下载插件
地址#xff1a;https://github.com/rabbitmq/rabbitmq-delayed-message-excha…问题
前面谈到基于死信的延迟队列存在的问题如果第一个消息延时时间很长而第二个消息延时时间很短第二个消息并不会优先得到执行。
下载插件
地址https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases rabbitmq_delayed_message_exchange-3.8.0.ez 说明rabbitmq安装后会生成这个目录 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins/ 拷贝插件到上面这个目录 安装插件 需要重启rabbitmq
监测插件是否安装成功 可以看出不再使用延迟队列而是使用延迟交换机。 代码
配置代码
package com.xkj.org.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
public class DelayExchangeConfig {//队列public static final String DELAYED_QUEUE_NAME delayed.queue;//交换机public static final String DELAYED_EXCHANGE_NAME delayed.exchange;//RoutingKeypublic static final String DELYAED_ROUTING_KEY delayed.routingkey;Beanpublic CustomExchange delayedExchange() {MapString, Object arguments new HashMap();arguments.put(x-delayed-type, direct);//第一个参数交换机的名称//第二个参数交换机的类型//第三个参数是否持久化//第四个参数是否删除//第五个参数其他参数return new CustomExchange(DELAYED_EXCHANGE_NAME, x-delayed-message, true, false,arguments);}Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}Beanpublic Binding delayedQueueBindingDelayedExchange(Qualifier(delayedExchange)CustomExchange delayedExchange,Qualifier(delayedQueue)Queue delayedQueue) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELYAED_ROUTING_KEY).noargs();}}生产者
ApiOperation(基于插件的延迟消息)GetMapping(/sendDelayedMsg/{msg}/{delayedTime})public void sendDelayedMsg(ApiParam(value 消息内容, required true)PathVariable(msg) String message,ApiParam(value 延迟时间, required true)PathVariable(delayedTime)Integer delayedTime) {log.info(当前时间{}发送一条消息给延迟交换机{}delayedTime{}, new Date().toString(), message, delayedTime);rabbitTemplate.convertAndSend(delayed.exchange, delayed.routingkey, message, msg - {msg.getMessageProperties().setDelay(delayedTime);return msg;});}
消费者
package com.xkj.org.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;
import java.util.Date;/*** 基于插件的延迟消息队列监听*/
Slf4j
Component
public class DelayedQueueConsumer {RabbitListener(queues delayed.queue)public void receiver(Message message, Channel channel) throws UnsupportedEncodingException {String msg new String(message.getBody(), UTF-8);log.info(当前时间{}收到延迟队列的消息{}, new Date().toString(), msg);}}总结
延迟队列可以保证消息可靠发送消息可靠投递死信队列保证消息至少被消费一次已经未被处理的消息不会被丢弃。