童装 技术支持 东莞网站建设,上海发布官网app下载,网站运营顾问,个人网站设计论文题目目录
成为死信的条件
消息TTL过期 队列达到最大长度 消息被拒
延迟队列 延迟队列使用场景 消息设置 TTL
队列设置 TTL 两者区别 producer 将消息投递到 broker 或者直接到 queue 里了#xff0c; consumer 从 queue 取出消息 进行消费#xff0c;但某些时候由…目录
成为死信的条件
消息TTL过期 队列达到最大长度 消息被拒
延迟队列 延迟队列使用场景 消息设置 TTL
队列设置 TTL 两者区别 producer 将消息投递到 broker 或者直接到 queue 里了 consumer 从 queue 取出消息 进行消费但某些时候由于特定的 原因导致 queue 中的某些消息无法被消费 这样的消息如果没有后续的处理就变成了死信有死信自然就有了死信队列。 成为死信的条件 超过消息的存活时间TTL可以为消息设置一个存活时间在该时间段之后如果消息还未被消费或者被重新投递到其他队列该消息将成为死信。 消息被拒绝Reject当消息被消费者拒绝接收时可以选择将该消息重新投递到另一个队列或将其标记为死信。 消息达到最大重试次数可以通过在消费者端设置重试次数限制当消息达到一定的重试次数而仍然无法被消费时该消息将成为死信。 队列满溢Queue Overflow当一个队列的消息数量已经超过队列的最大容量限制时新消息无法进入队列而被视为死信。 消息TTL过期
生产者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE normal_exchange;public static void main(String[] argv) throws Exception {try (Channel channel RabbitMqUtils.getChannel()) {channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//设置消息的 TTL 时间AMQP.BasicProperties properties new AMQP.BasicProperties().builder().expiration(10000).build();//该信息是用作演示队列个数限制for (int i 1; i 11 ; i) {String messageinfoi;channel.basicPublish(NORMAL_EXCHANGE, zhangsan, properties,message.getBytes());System.out.println(生产者发送消息:message);}}}
} 消费者1
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE normal_exchange;//死信交换机名称private static final String DEAD_EXCHANGE dead_exchange;public static void main(String[] argv) throws Exception {Channel channel RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue dead-queue;channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定死信交换机与 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, lisi);//正常队列绑定死信队列信息MapString, Object params new HashMap();//正常队列设置死信交换机 参数 key 是固定值params.put(x-dead-letter-exchange, DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key 是固定值params.put(x-dead-letter-routing-key, lisi);String normalQueue normal-queue;channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, zhangsan);System.out.println(等待接收消息.....);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println(Consumer01 接收到消息message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag - {});}
}
消费者2
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {private static final String DEAD_EXCHANGE dead_exchange;public static void main(String[] argv) throws Exception {Channel channel RabbitMqUtils.getChannel();channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);String deadQueue dead-queue;channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, lisi);System.out.println(等待接收死信队列消息.....);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println(Consumer02 接收死信队列的消息 message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag - {});}
}
关闭消费者1模拟出故障 正常队列 消息超时进入死信队列 死信队列接收信息 队列达到最大长度
生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE normal_exchange;public static void main(String[] argv) throws Exception {try (Channel channel RabbitMqUtils.getChannel()) {channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//该信息是用作演示队列个数限制for (int i 1; i 11 ; i) {String messageinfoi;channel.basicPublish(NORMAL_EXCHANGE,zhangsan,null, message.getBytes());System.out.println(生产者发送消息:message);}}}
} 消费者1
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE normal_exchange;//死信交换机名称private static final String DEAD_EXCHANGE dead_exchange;public static void main(String[] argv) throws Exception {Channel channel RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue dead-queue;channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定死信交换机与 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, lisi);//正常队列绑定死信队列信息MapString, Object params new HashMap();//正常队列设置死信交换机 参数 key 是固定值params.put(x-dead-letter-exchange, DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key 是固定值params.put(x-dead-letter-routing-key, lisi);// 设置正常队列长度的限制params.put(x-max-length,6);String normalQueue normal-queue;channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, zhangsan);System.out.println(等待接收消息.....);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println(Consumer01 接收到消息message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag - {});}
} 消费者2
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {private static final String DEAD_EXCHANGE dead_exchange;public static void main(String[] argv) throws Exception {Channel channel RabbitMqUtils.getChannel();channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);String deadQueue dead-queue;channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, lisi);System.out.println(等待接收死信队列消息.....);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println(Consumer02 接收死信队列的消息 message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag - {});}
}
关闭消费者1模拟接收不到信息 死信队列消费了四个消息 分析生产者产生10个消息正常队列只能接受6个消息多的消息便被转移到死信队列去了 消息被拒
生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE normal_exchange;public static void main(String[] argv) throws Exception {try (Channel channel RabbitMqUtils.getChannel()) {channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//该信息是用作演示队列个数限制for (int i 1; i 11 ; i) {String messageinfoi;channel.basicPublish(NORMAL_EXCHANGE,zhangsan,null, message.getBytes());System.out.println(生产者发送消息:message);}}}
}
消费者1 import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE normal_exchange;//死信交换机名称private static final String DEAD_EXCHANGE dead_exchange;public static void main(String[] argv) throws Exception {Channel channel RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue dead-queue;channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定死信交换机与 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, lisi);//正常队列绑定死信队列信息MapString, Object params new HashMap();//正常队列设置死信交换机 参数 key 是固定值params.put(x-dead-letter-exchange, DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key 是固定值params.put(x-dead-letter-routing-key, lisi);String normalQueue normal-queue;channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, zhangsan);System.out.println(等待接收消息.....);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);if(message.equals(info5)){System.out.println(Consumer01 接收到消息 message 并拒绝签收该消息);//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);}else {System.out.println(Consumer01 接收到消息message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};boolean autoAck false;channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag - {});}
} 消费者2
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumer02 {private static final String DEAD_EXCHANGE dead_exchange;public static void main(String[] argv) throws Exception {Channel channel RabbitMqUtils.getChannel();channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);String deadQueue dead-queue;channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, lisi);System.out.println(等待接收死信队列消息.....);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println(Consumer02 接收死信队列的消息 message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag - {});}
} 结果
消费者1拒绝接受消息info5info5进入死信队列 消费者2死信队列接受到info5 延迟队列 延时队列 , 队列内部是有序的最重要的特性就体现在它的延时属性上延时队列中的元素是希望 在指定时间到了以后或之前取出和处理简单来说延时队列就是用来存放需要在指定时间被处理的 元素的队列。 延迟队列使用场景 1. 订单在十分钟之内未支付则自动取消 2. 新创建的店铺如果在十天内都没有上传过商品则自动发送消息提醒。 3. 用户注册成功后如果三天内没有登陆则进行短信提醒。 4. 用户发起退款如果三天内没有得到处理则通知相关运营人员。 5. 预定会议后需要在预定的时间点前十分钟通知各个与会人员参加会议 消息设置 TTL import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;public class Producer {private static final String NORMAL_EXCHANGE normal_exchange;public static void main(String[] argv) throws Exception {try (Channel channel RabbitMqUtils.getChannel()) {channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//设置消息的 TTL 时间AMQP.BasicProperties properties new AMQP.BasicProperties().builder().expiration(10000).build();//该信息是用作演示队列个数限制for (int i 1; i 11 ; i) {String messageinfoi;channel.basicPublish(NORMAL_EXCHANGE, zhangsan, properties,message.getBytes());System.out.println(生产者发送消息:message);}}}
} 队列设置 TTL
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;
import java.util.Map;public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE normal_exchange;//死信交换机名称private static final String DEAD_EXCHANGE dead_exchange;public static void main(String[] argv) throws Exception {Channel channel RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue dead-queue;channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定死信交换机与 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, lisi);//正常队列绑定死信队列信息MapString, Object params new HashMap();//正常队列设置死信交换机 参数 key 是固定值params.put(x-dead-letter-exchange, DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key 是固定值params.put(x-dead-letter-routing-key, lisi);// 设置 TTL 值为 5000 毫秒5 秒params.put(x-message-ttl, 5000);String normalQueue normal-queue;channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, zhangsan);System.out.println(等待接收消息.....);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);if(message.equals(info5)){System.out.println(Consumer01 接收到消息 message 并拒绝签收该消息);//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);}else {System.out.println(Consumer01 接收到消息message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};boolean autoAck false;channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag - {});}
} 两者区别 如果设置了队列的 TTL 属性那么一旦消息过期就会被队列丢弃 ( 如果配置了死信队列被丢到死信队列中) 而第二种方式消息即使过期也不一定会被马上丢弃因为 消息是否过期是在即将投递到消费者 之前判定的 如果当前队列有严重的消息积压情况则已过期的消息也许还能存活较长时间另外还需 要注意的一点是如果不设置 TTL 表示消息永远不会过期如果将 TTL 设置为 0 则表示除非此时可以 直接投递该消息到消费者否则该消息将会被丢弃。