加强网站建设与管理的通知,中国十大室内设计公司,网站开发典型,东莞网站建设制作目录
一、概念
二、使用场景
三、RabbitMQ 中的 TTL
#xff08;一#xff09;队列设置 TTL
#xff08;二#xff09;消息设置 TTL
#xff08;三#xff09;两者的区别
四、整合SpringBoot实现延迟队列
#xff08;一#xff09;创建项目
#xff08;二一队列设置 TTL
二消息设置 TTL
三两者的区别
四、整合SpringBoot实现延迟队列
一创建项目
二添加依赖
三修改配置文件
四添加Swagger配置类
五、队列TTL
一代码架构图
二配置文件类
三消息生产者
四消息消费者
六、延迟队列优化
一代码架构图
二配置文件类
三消息生产者
七、Rabbitmq 插件实现延迟队列
一代码架构图
二配置文件类
三消息生产者
四消息消费者
八、总结 一、概念 延时队列队列内部是有序的最重要的特性就体现在它的延时属性上延时队列中的元素是希望在指定时间到了以后或之前取出和处理简单来说延时队列就是用来存放需要在指定时间被处理的元素的队列。 二、使用场景
订单在十分钟之内未支付则自动取消 新创建的店铺如果在十天内都没有上传过商品则自动发送消息提醒。 用户注册成功后如果三天内没有登陆则进行短信提醒。 用户发起退款如果三天内没有得到处理则通知相关运营人员。 预定会议后需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点需要在某个事件发生之后或者之前的指定时间点完成某一项任务如 发生订单生成事件在十分钟之后检查该订单支付状态然后将未支付的订单进行关闭看起来似乎使用定时任务一直轮询数据每秒查一次取出需要被处理的数据然后处理不就完事了吗如果数据量比较少确实可以这样做比如对于“如果账单一周内未支付则进行自动结算”这样的需求如果对于时间不是严格限制而是宽松意义上的一周那么每天晚上跑个定时任务检查一下所有未支付的账单确实也是一个可行的方案。但对于数据量比较大并且时效性较强的场景如“订单十分钟内未支付则关闭“短期内未支付的订单数据可能会有很多活动期间甚至会达到百万甚至千万级别对这么庞大的数据量仍旧使用轮询的方式显然是不可取的很可能在一秒内无法完成所有订单的检查同时会给数据库带来很大压力无法满足业务要求而且性能低下。 三、RabbitMQ 中的 TTL
TTL 是 RabbitMQ 中一个消息或者队列的属性表明一条消息或者该队列中的所有消息的最大存活时间单位是毫秒。换句话说如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列那么这条消息如果在 TTL 设置的时间内没有被消费则会成为死信。如果同时配置了队列的 TTL 和消息的TTL那么较小的那个值将会被使用有两种方式设置 TTL。 一队列设置 TTL 第一种是在创建队列的时候设置队列的“x-message-ttl”属性MapString, Object arguments new HashMap();
// 声明队列的TTL
arguments.put(x-message-ttl, 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); 二消息设置 TTL
另一种方式便是针对每条消息设置 TTL
rabbitTemplate.convertAndSend(X, XC, message, msg - {msg.getMessageProperties().setExpiration(ttl);return msg;
});
三两者的区别
如果设置了队列的 TTL 属性那么一旦消息过期就会被队列丢弃(如果配置了死信队列被丢到死信队列中)而第二种方式消息即使过期也不一定会被马上丢弃因为消息是否过期是在即将投递到消费者之前判定的如果当前队列有严重的消息积压情况则已过期的消息也许还能存活较长时间另外还需要注意的一点是如果不设置 TTL表示消息永远不会过期如果将 TTL 设置为 0则表示除非此时可以直接投递该消息到消费者否则该消息将会被丢弃。 前一小节我们介绍了死信队列刚刚又介绍了 TTL至此利用 RabbitMQ 实现延时队列的两大要素已经集齐接下来只需要将它们进行融合再加入一点点调味料延时队列就可以新鲜出炉了。想想看延时队列不就是想要消息延迟多久被处理吗TTL 则刚好能让消息在延迟多久之后成为死信另一方面成为死信的消息都会被投递到死信队列里这样只需要消费者一直消费死信队列里的消息就完事了因为里面的消息都是希望被立即处理的消息。四、整合SpringBoot实现延迟队列
一创建项目 二添加依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependency!--RabbitMQ 依赖--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.47/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency!--swagger--dependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger2/artifactIdversion2.9.2/version/dependencydependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger-ui/artifactIdversion2.9.2/version/dependency!--RabbitMQ 测试依赖--dependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit-test/artifactIdscopetest/scope/dependency
三修改配置文件
spring.rabbitmq.host192.168.23.100
spring.rabbitmq.port5672
spring.rabbitmq.usernameguest
spring.rabbitmq.passwordguest
四添加Swagger配置类
Configuration
EnableSwagger2
public class SwaggerConfig {Beanpublic Docket webApiConfig() {return new Docket(DocumentationType.SWAGGER_2).groupName(webapi).apiInfo(webApiInfo()).select().build();}public ApiInfo webApiInfo() {return new ApiInfoBuilder().title(rabbitmq 接口文档).description(本文档描述了 rabbitmq 微服务接口定义).version(1.0).contact(new Contact(enjoy6288, http://atguigu.com,1551388580qq.com)).build();}
}
五、队列TTL
一代码架构图 创建两个队列 QA 和 QB两者队列 TTL 分别设置为 10S 和 40S然后在创建一个交换机 X 和死信交换机 Y它们的类型都是 direct创建一个死信队列 QD它们的绑定关系如下 二配置文件类
Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE X;public static final String QUEUE_A QA;public static final String QUEUE_B QB;public static final String Y_DEAD_LETTER_EXCHANGE Y;public static final String DEAD_LETTER_QUEUE QD;// 声明xExchangeBean(xExchange)public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}// 声明yExchangeBean(yExchange)public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}// 声明队列ABean(queueA)public Queue queueA() {MapString, Object arguments new HashMap();// 当前队列的死信交换机arguments.put(x-dead-letter-exchange, Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put(x-dead-letter-routing-key, YD);// 声明队列的TTLarguments.put(x-message-ttl, 10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}// 声明队列A绑定交换机XBeanpublic Binding queueABindingX(Qualifier(queueA) Queue queueA,Qualifier(xExchange)DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with(XA);}// 声明队列BBean(queueB)public Queue queueB() {MapString, Object arguments new HashMap();arguments.put(x-dead-letter-exchange, Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put(x-dead-letter-routing-key, YD);// 声明队列的TTLarguments.put(x-message-ttl, 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}// 声明队列B绑定交换机XBeanpublic Binding queueBBindingX(Qualifier(queueB) Queue queueB,Qualifier(xExchange)DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with(XB);}// 声明死信队列Bean(queueD)public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);}Bean// 声明死信队列 QD 绑定关系public Binding queuedBindingY(Qualifier(queueD)Queue queueD,Qualifier(yExchange)DirectExchange exchange) {return BindingBuilder.bind(queueD).to(exchange).with(YD);}}
三消息生产者 GetMapping(/sendMsg/{message})public void sendMsg(PathVariable String message) {log.info(当前时间是{}发送一条信息给两个 TTL 队列:{}, new Date().toString(), message);rabbitTemplate.convertAndSend(X, XA, 消息来自ttl为10s的队列 message);rabbitTemplate.convertAndSend(X, XB, 消息来自ttl为40s的队列 message);}
四消息消费者
Component
Slf4j
public class DeadLetterQueueConsumer {RabbitListener(queues QD)public void receiveD(Message message, Channel channel) throws IOException {String msg new String(message.getBody());log.info(当前时间{},收到死信队列信息{}, new Date().toString(), msg);}
}
发起一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻 第一条消息在 10S 后变成了死信消息然后被消费者消费掉第二条消息在 40S 之后变成了死信消息然后被消费掉这样一个延时队列就打造完成了。不过如果这样使用的话岂不是每增加一个新的时间需求就要新增一个队列这里只有 10S 和 40S 两个时间选项如果需要一个小时后处理那么就需要增加 TTL 为一个小时的队列如果是预定会议室然后提前通知这样的场景岂不是要增加无数个队列才能满足需求 六、延迟队列优化 一代码架构图 在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间而是由生产者设置过期时间 二配置文件类 Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE X;public static final String QUEUE_A QA;public static final String QUEUE_B QB;public static final String Y_DEAD_LETTER_EXCHANGE Y;public static final String DEAD_LETTER_QUEUE QD;public static final String QUEUE_C QC;// 声明xExchangeBean(xExchange)public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}// 声明yExchangeBean(yExchange)public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}// 声明队列ABean(queueA)public Queue queueA() {MapString, Object arguments new HashMap();// 当前队列的死信交换机arguments.put(x-dead-letter-exchange, Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put(x-dead-letter-routing-key, YD);// 声明队列的TTLarguments.put(x-message-ttl, 10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}// 声明队列A绑定交换机XBeanpublic Binding queueABindingX(Qualifier(queueA) Queue queueA,Qualifier(xExchange)DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with(XA);}// 声明队列BBean(queueB)public Queue queueB() {MapString, Object arguments new HashMap();arguments.put(x-dead-letter-exchange, Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put(x-dead-letter-routing-key, YD);// 声明队列的TTLarguments.put(x-message-ttl, 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}// 声明队列B绑定交换机XBeanpublic Binding queueBBindingX(Qualifier(queueB) Queue queueB,Qualifier(xExchange)DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with(XB);}// 声明队列CBean(queueC)public Queue queueC() {MapString, Object arguments new HashMap();arguments.put(x-dead-letter-exchange, Y_DEAD_LETTER_EXCHANGE);// 当前队列的死信路由keyarguments.put(x-dead-letter-routing-key, YD);return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();}// 声明队列C绑定交换机XBeanpublic Binding queueCBindingX(Qualifier(queueC) Queue queueC,Qualifier(xExchange)DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with(XC);}// 声明死信队列Bean(queueD)public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);}Bean// 声明死信队列 QD 绑定关系public Binding queuedBindingY(Qualifier(queueD)Queue queueD,Qualifier(yExchange)DirectExchange exchange) {return BindingBuilder.bind(queueD).to(exchange).with(YD);}}三消息生产者 GetMapping(/sendExpirationMsg/{message}/{ttl})public void sendMsg(PathVariable String message, PathVariable String ttl) {log.info(当前时间是{}发送一条过期信息给两个 TTL 队列:{}, new Date().toString(), message);rabbitTemplate.convertAndSend(X, XC, message, msg - {msg.getMessageProperties().setExpiration(ttl);return msg;});} 发起请求 http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000 http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000 看起来似乎没什么问题但是在最开始的时候就介绍过如果使用在消息属性上设置 TTL 的方式消息可能并不会按时“死亡“因为 RabbitMQ 只会检查第一个消息是否过期如果过期则丢到死信队列如果第一个消息的延时时长很长而第二个消息的延时时长很短第二个消息并不会优先得到执行。 七、Rabbitmq 插件实现延迟队列 关于插件的安装可以查看这篇文章Docker安装RabbitMq延迟队列插件 一代码架构图
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange绑定关系如下: 二配置文件类
在我们自定义的交换机中这是一种新的交换类型该类型消息支持延迟投递机制 消息传递后并 不会立即投递到目标队列中而是存储在 mnesia(一个分布式数据系统)表中当达到投递时间时才投递到目标队列中。 三消息生产者 /** 基于插件的延迟队列和延迟交换机*/
Configuration
public class DelayedQueueConfig {public static final String DELAYED_QUEUE_NAME delayed.queue;public static final String DELAYED_EXCHANGE_NAME delayed.exchange;public static final String DELAYED_ROUTING_KEY delayed.routingkey;// 声明队列Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}// 声明自定义交换机Beanpublic CustomExchange delayedExchange() {MapString, Object args new HashMap();args.put(x-delayed-type, direct);return new CustomExchange(DELAYED_EXCHANGE_NAME, x-delayed-message, true, false, args);}// 声明队列和延迟交换机的绑定Beanpublic Binding bindingDelayedQueue(Qualifier(delayedQueue)Queue delayedQueue,Qualifier(delayedExchange)CustomExchange exchange) {return BindingBuilder.bind(delayedQueue).to(exchange).with(DELAYED_ROUTING_KEY).noargs();}
}
四消息消费者
Component
Slf4j
public class DelayedQueueConsumer {RabbitListener(queues DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiveDelayedQueue(String message) {log.info(当前时间{} 接收到消息: {}, new Date().toString(), message);}
}
发起请求 http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000 http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000第二个消息被先消费掉了符合预期 八、总结 延时队列在需要延时处理的场景下非常有用使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性如消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外通过 RabbitMQ 集群的特性可以很好的解决单点故障问题不会因为单个节点挂掉导致延时队列不可用或者消息丢失。 当然延时队列还有很多其它选择比如利用 Java 的 DelayQueue利用 Redis 的 zset利用 Quartz或者利用 kafka 的时间轮这些方式各有特点,看需要适用的场景。