当前位置: 首页 > news >正文

温江做网站的公司中国销售网

温江做网站的公司,中国销售网,马鞍山网站制作公司,天津市建设教育培训中心网站在分布式系统中#xff0c;消息队列通常用于解耦服务#xff0c;RabbitMQ是一个广泛使用的消息队列服务。延迟消息#xff08;也称为延时队列或TTL消息#xff09;是一种常见的场景应用#xff0c;特别适合处理某些任务在一段时间后执行的需求#xff0c;如订单超时处理、…在分布式系统中消息队列通常用于解耦服务RabbitMQ是一个广泛使用的消息队列服务。延迟消息也称为延时队列或TTL消息是一种常见的场景应用特别适合处理某些任务在一段时间后执行的需求如订单超时处理、延时通知等。 本文将以具体代码为例展示如何使用RabbitMQ来实现延迟消息处理涵盖队列和交换机的配置、消息的发送与接收以及死信队列的处理。 什么是延迟消息 延迟消息是指消息在发送到队列后经过设定的时间延迟再被消费。RabbitMQ 本身没有直接支持延迟队列的功能但可以通过 TTLTime To Live 死信队列Dead Letter Queue, DLQ 的组合来实现。当消息超过TTL消息存活时间后不会被立即消费而是会被转发到绑定的死信队列从而实现延迟处理。 RabbitMQ中的延迟消息原理 在RabbitMQ中我们可以通过以下几个概念来实现延迟消息 TTLTime To Live可以为队列设置TTL消息超过该时间后会被标记为“死信”。死信队列Dead Letter Queue当消息在正常队列中过期或处理失败时RabbitMQ可以将它们路由到一个死信队列死信队列可以用来处理这些过期或未处理的消息。x-dead-letter-exchange 和 x-dead-letter-routing-key可以通过配置队列的参数将过期消息发送到一个专门的死信交换器并根据指定的路由键转发到死信队列。 消息来到ttl.queue消息队列过期时间内无人消费消息来到死信交换机hmall.direct在direct.queue消息队列无需等待。 1. RabbitMQ的配置 首先我们需要配置两个队列和两个交换机一个用于存放延时消息另一个用于处理超时的死信消息。 package com.heima.stroke.configuration;import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.HashMap; import java.util.Map;Configuration public class RabbitConfig {// 延迟时间 单位毫秒 (这里设为30秒)private static final long DELAY_TIME 1000 * 30;// 行程超时队列public static final String STROKE_OVER_QUEUE STROKE_OVER_QUEUE;// 行程死信队列public static final String STROKE_DEAD_QUEUE STROKE_DEAD_QUEUE;// 行程超时队列交换机public static final String STROKE_OVER_QUEUE_EXCHANGE STROKE_OVER_QUEUE_EXCHANGE;// 行程死信队列交换机public static final String STROKE_DEAD_QUEUE_EXCHANGE STROKE_DEAD_QUEUE_EXCHANGE;// 行程超时交换机 Routing Keypublic static final String STROKE_OVER_KEY STROKE_OVER_KEY;// 行程死信交换机 Routing Keypublic static final String STROKE_DEAD_KEY STROKE_DEAD_KEY;/*** 声明行程超时队列并设置其参数* x-dead-letter-exchange绑定的死信交换机* x-dead-letter-routing-key死信路由Key* x-message-ttl消息的过期时间*/Beanpublic Queue strokeOverQueue() {MapString, Object args new HashMap(3);args.put(x-dead-letter-exchange, STROKE_DEAD_QUEUE_EXCHANGE);args.put(x-dead-letter-routing-key, STROKE_DEAD_KEY);args.put(x-message-ttl, DELAY_TIME); // 设置TTL为30秒return QueueBuilder.durable(STROKE_OVER_QUEUE).withArguments(args).build();}Beanpublic DirectExchange strokeOverQueueExchange() {return new DirectExchange(STROKE_OVER_QUEUE_EXCHANGE);}Beanpublic Binding bindingStrokeOverDirect() {return BindingBuilder.bind(strokeOverQueue()).to(strokeOverQueueExchange()).with(STROKE_OVER_KEY);} }解释 TTL设置我们通过x-message-ttl设置消息的过期时间为30秒。 死信队列绑定通过x-dead-letter-exchange和x-dead-letter-routing-key设置当消息过期时它会被转发到死信交换机再路由到死信队列。 2. 生产者发送延迟消息 接下来我们通过生产者向超时队列发送消息这些消息将在TTL过期后转发到死信队列。 package com.heima.stroke.rabbitmq;import com.alibaba.fastjson.JSON; import com.heima.modules.vo.StrokeVO; import com.heima.stroke.configuration.RabbitConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;Component public class MQProducer {private final static Logger logger LoggerFactory.getLogger(MQProducer.class);AutowiredRabbitTemplate rabbitTemplate;/*** 发送延时消息到行程超时队列** param strokeVO 消息体*/public void sendOver(StrokeVO strokeVO) {String mqMessage JSON.toJSONString(strokeVO);logger.info(send timeout msg:{}, mqMessage);rabbitTemplate.convertAndSend(RabbitConfig.STROKE_OVER_QUEUE_EXCHANGE, RabbitConfig.STROKE_OVER_KEY, mqMessage);} }解释 sendOver 方法将消息发送到超时队列消息将在超时后进入死信队列。生产者不需要额外处理TTL或死信的配置只需发送消息即可。 3. 消费者监听死信队列 当消息超过TTL后将会被转发到死信队列。消费者需要监听死信队列并处理这些消息。 j package com.heima.stroke.rabbitmq;import com.alibaba.fastjson.JSON; import com.heima.modules.vo.StrokeVO; import com.heima.stroke.configuration.RabbitConfig; import com.heima.stroke.handler.StrokeHandler; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;Component public class MQConsumer {private final static Logger logger LoggerFactory.getLogger(MQConsumer.class);Autowiredprivate StrokeHandler strokeHandler;/*** 监听死信队列** param message 消息体* param channel RabbitMQ的Channel* param tag 消息的Delivery Tag*/RabbitListener(bindings {QueueBinding(value Queue(value RabbitConfig.STROKE_DEAD_QUEUE, durable true),exchange Exchange(value RabbitConfig.STROKE_DEAD_QUEUE_EXCHANGE),key RabbitConfig.STROKE_DEAD_KEY)})RabbitHandlerpublic void processStroke(Message message, Channel channel, Header(AmqpHeaders.DELIVERY_TAG) long tag) {StrokeVO strokeVO JSON.parseObject(message.getBody(), StrokeVO.class);logger.info(get dead msg:{}, message.getBody());if (strokeVO null) {return;}try {// 处理超时的行程消息strokeHandler.timeoutHandel(strokeVO);// 手动确认消息channel.basicAck(tag, false);} catch (Exception e) {e.printStackTrace();}} }解释 RabbitListener 注解绑定了死信队列的监听器。当消息被转发到死信队列时该消费者会接收到消息。 使用 channel.basicAck(tag, false) 手动确认消息处理成功确保消息不会重复消费。 4. 处理超时业务逻辑 在我们的业务中当消息超时未处理时将其状态设置为超时。 public void timeoutHandel(StrokeVO strokeVO) {// 获取司机行程ID和乘客行程IDString inviterTripId strokeVO.getInviterTripId();String inviteeTripId strokeVO.getInviteeTripId();// 检查邀请状态是否为未确认String inviteeStatus redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId);String inviterStatus redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId);if (String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviteeStatus) String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviterStatus)) {// 更新为超时状态redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId, String.valueOf(InviteState.TIMEOUT.getCode()));redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId, String.valueOf(InviteState.TIMEOUT.getCode()));} }
http://www.hkea.cn/news/14389779/

相关文章:

  • dw做的网站怎么上传图片wordpress无插件美化
  • 小清新博客网站保定中小企业网站制作
  • 门户网站模板图片北京做网站的外包公司
  • 软装设计网站有哪些asp.net网站开发第一步
  • 西部数码网站管理助手3.0教程长沙软件公司排行榜
  • 搭建电商平台网站wordpress 插件 文本
  • 网站建设实训报告外贸网站建设及推广
  • 做多国语言网站电子商务微网站制作
  • 石家庄百度关键词搜索做seo学网站
  • 免费个人博客网站作文网课哪家好
  • 网站开发备案费用wordpress 去除 栏头
  • 原单手表网站网站建设 400电话 广告语
  • 杭州网站优化推荐好的网站建设商家
  • 网站注册商标h5长图模板
  • 视频门户网站建设方案做网站的软件m开头
  • 企业做网站一般要多少钱h5源码网
  • 手机网站前端设计全国工商信息公示系统
  • 阿里云虚拟主机可以做几个网站建设银行企业网站失败
  • php网站项目wordpress添加文件
  • 濮阳网站建设费用湖南网站营销优化开发
  • 网站超级外链如何建一个个人网站
  • wordpress分享视频网站建设银行梅州分行网站
  • 天津工程建设协会网站音乐网站开发需要什么语言工具
  • 做网站有回扣拿吗网站的代码在哪里设置
  • 个人介绍微电影网站模板免费制作企业微商城
  • 宁德北京网站建设成都网站推广公司排名
  • 如何做网站demowordpress官方模版
  • 网站开发市场哪个浏览器看黄页最快夸克浏览器
  • 成都网站定制中心连云港做网站制作首选公司
  • 百度搜索这个网站为什么这么差小说网站建设方案书ppt模板