新建网站费用,深圳网络推广有几种方法,网站建设方案规划书,像wordpressRabbitMQ消息队列原理与应用
一、消息队列概述
#xff08;一#xff09;概念 消息队列#xff08;Message Queue#xff0c;简称MQ#xff09;是一种应用程序间的通信方式#xff0c;它允许应用程序通过将消息放入队列中#xff0c;而不是直接调用其他应用程序的接口…RabbitMQ消息队列原理与应用
一、消息队列概述
一概念 消息队列Message Queue简称MQ是一种应用程序间的通信方式它允许应用程序通过将消息放入队列中而不是直接调用其他应用程序的接口实现应用程序间的解耦。发送消息的应用程序称为生产者Producer接收消息的应用程序称为消费者Consumer消息在队列Queue中暂存直到被消费者消费。 二作用
1. 解耦
在分布式系统中不同服务之间可能存在复杂的依赖关系使用消息队列可以将这些服务解耦。例如在一个电商系统中订单服务产生订单后需要通知库存服务和物流服务若不使用消息队列订单服务可能需要直接调用库存服务和物流服务的接口。而使用消息队列后订单服务只需将消息发送到队列库存服务和物流服务作为消费者从队列中获取消息这样它们之间就不需要直接调用减少了服务间的耦合使得系统更易于维护和扩展。从源码角度来看生产者只关心将消息发送到队列消费者只关心从队列接收消息它们通过队列这个中介实现了间接通信而无需知晓对方的具体实现。
2. 异步处理
一些操作可能比较耗时如发送邮件、短信通知等使用消息队列可以将这些操作异步化。生产者将消息发送到队列后即可继续处理后续业务而不必等待这些耗时操作完成。在 RabbitMQ 的 Channel 类中生产者使用 basicPublish() 方法将消息发送到队列此过程是异步的发送完消息后生产者的线程可以立即处理其他任务而耗时的消息处理工作则由消费者在其他时间处理。
3. 流量削峰
在高并发场景下如电商平台的秒杀活动可能会瞬间产生大量请求使用消息队列可以将这些请求暂存到队列中消费者按照自己的处理能力从队列中取出消息进行处理避免系统因瞬间流量过大而崩溃。在 RabbitMQ 的架构中队列起到缓冲作用将高峰流量存储下来再由消费者慢慢消化避免后端服务直接承受高并发压力。
三应用场景
1. 分布式系统集成
不同服务间通过消息队列进行通信和协作实现系统的分布式部署和松耦合如在微服务架构中各个微服务可通过消息队列进行事件通知、数据传递等。例如用户注册服务完成用户注册后可将消息发送到消息队列其他服务如积分服务、通知服务等可根据这些消息进行相应操作。
2. 日志处理
将日志信息发送到消息队列然后由专门的日志处理服务进行处理和存储避免日志处理影响主要业务逻辑。多个服务作为生产者将日志消息发送到队列日志处理服务作为消费者从队列中拉取消息将日志存储到数据库或文件系统中。
二、RabbitMQ 的安装与配置
一安装过程
使用 Docker 安装
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management这个命令启动了一个 RabbitMQ 容器同时开启了管理界面端口 15672和消息队列服务端口5672方便管理和使用 RabbitMQ。
详细安装过程请参考这篇文章 RabbitMQ实战
二代码示例核心概念与操作
1. 发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMQProducer {private final static String QUEUE_NAME hello;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message Hello, RabbitMQ!;channel.basicPublish(, QUEUE_NAME, null, message.getBytes(UTF-8));System.out.println( [x] Sent message );}}
}代码解释
ConnectionFactory 用于创建连接 RabbitMQ 的工厂设置了 RabbitMQ 服务器的主机地址、端口以及用户名和密码。 connection.createChannel() 创建了一个通信通道通过该通道可以进行消息的发送和接收操作。channel.queueDeclare(QUEUE_NAME, false, false, false, null) 声明了一个队列这里的参数依次表示队列名称、是否持久化、是否独占、是否自动删除、队列的其他属性。channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes(“UTF-8”)) 将消息发送到队列第一个参数是交换机名称这里为空表示使用默认交换机第二个参数是队列名称第三个参数是消息属性最后是消息的字节数组表示。
2. 接收消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMQConsumer {private final static String QUEUE_NAME hello;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});}
}代码解释
同样使用 ConnectionFactory 创建连接和通道。channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {}) 用于消费消息第一个参数是队列名称第二个参数表示是否自动确认消费这里设置为 true第三个参数是消息到达时的回调函数最后是取消消费的回调函数。
三核心概念
交换机Exchange 交换机是消息发送的入口根据路由键Routing Key将消息路由到不同的队列。在 RabbitMQ 的 Exchange 类中负责消息的分发不同类型的交换机有不同的路由策略。队列Queue 存储消息的地方在 Queue 类中负责消息的存储和管理队列可以设置持久化、独占、自动删除等属性不同属性会影响队列的存储和使用方式。绑定Binding 绑定将交换机和队列联系起来通过路由键将消息从交换机路由到队列在 Binding 类中存储了交换机、队列和路由键的绑定关系根据这个关系实现消息的路由。
四不同类型的交换机及应用场景
1. 直连交换机Direct Exchange
特点根据消息的路由键将消息路由到相应的队列消息的路由键必须完全匹配队列绑定的路由键。在 DirectExchange 类中会根据路由键进行精确匹配找到对应的队列。应用场景适用于需要精确路由的场景如订单处理系统中根据订单类型如普通订单、团购订单等将消息路由到不同队列由不同的服务处理不同类型的订单。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class DirectExchangeProducer {private final static String EXCHANGE_NAME direct_exchange;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, direct);String message Direct Exchange Message;String routingKey order.normal;channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(UTF-8));System.out.println( [x] Sent message with routing key routingKey );}}
}import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;public class DirectExchangeConsumer {private final static String EXCHANGE_NAME direct_exchange;private final static String QUEUE_NAME normal_order_queue;private final static String ROUTING_KEY order.normal;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, direct);channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});}
}2. 主题交换机Topic Exchange
特点使用通配符* 表示一个单词# 表示零个或多个单词的路由键更灵活地路由消息。在 TopicExchange 类中会根据通配符匹配路由键将消息路由到多个队列。应用场景适用于更灵活的路由场景如日志系统根据日志级别如 *.info、#.error 等将不同级别的日志消息路由到不同队列。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class TopicExchangeProducer {private final static String EXCHANGE_NAME topic_exchange;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, topic);String message Topic Exchange Message;String routingKey log.info;channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(UTF-8));System.out.println( [x] Sent message with routing key routingKey );}}
}import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;public class TopicExchangeConsumer {private final static String EXCHANGE_NAME topic_exchange;private final static String QUEUE_NAME info_log_queue;private final static String ROUTING_KEY log.info;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, topic);channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});}
}3. 扇出交换机Fanout Exchange
特点将消息广播到所有绑定的队列不考虑路由键。在 FanoutExchange 类中会将收到的消息复制到所有绑定的队列中。应用场景适用于需要广播消息的场景如消息通知系统将一条消息同时发送给多个服务或用户如在一个在线教育平台当课程更新时将更新消息发送给所有订阅该课程的用户。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class FanoutExchangeProducer {private final static String EXCHANGE_NAME fanout_exchange;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, fanout);String message Fanout Exchange Message;channel.basicPublish(EXCHANGE_NAME, , null, message.getBytes(UTF-8));System.out.println( [x] Sent message );}}
}import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;public class FanoutExchangeConsumer {private final static String EXCHANGE_NAME fanout_exchange;private final static String QUEUE_NAME fanout_queue;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, fanout);channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, );DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});}
}三、RabbitMQ 的消息持久化机制与可靠性保证
一消息持久化机制
1. 队列持久化
在声明队列时将 durable 参数设置为 true可使队列持久化在 RabbitMQ 重启后队列依然存在。在 Queue 类的 queueDeclare 方法中会根据 durable 参数将队列信息存储到磁盘上确保队列在服务器重启后不会丢失。
channel.queueDeclare(QUEUE_NAME, true, false, false, null);2. 消息持久化
在发送消息时设置消息的 deliveryMode 属性为 2表示消息是持久化的会将消息存储在磁盘上。在 BasicProperties 类中设置 deliveryMode 可以将消息持久化这样即使 RabbitMQ 服务器重启消息也不会丢失。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class PersistentProducer {private final static String QUEUE_NAME persistent_queue;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, true, false, false, null);String message Persistent Message;AMQP.BasicProperties properties new AMQP.BasicProperties().builder().deliveryMode(2).build();channel.basicPublish(, QUEUE_NAME, properties, message.getBytes(UTF-8));System.out.println( [x] Sent message );}}
}代码解释
ConnectionFactory 用于创建与 RabbitMQ 的连接通过设置 host、port、username 和 password 来指定连接的相关信息。channel.queueDeclare(QUEUE_NAME, true, false, false, null) 声明了一个持久化队列这里将 durable 参数设为 true表示该队列会在 RabbitMQ 服务器重启后仍然存在确保队列的持久性。AMQP.BasicProperties 类用于设置消息的属性通过 builder().deliveryMode(2).build() 为消息设置 deliveryMode 为 2使消息持久化确保即使在 RabbitMQ 服务器重启或意外关闭的情况下消息不会丢失。channel.basicPublish(“”, QUEUE_NAME, properties, message.getBytes(“UTF-8”)) 方法将带有持久化属性的消息发送到指定队列确保消息被存储在磁盘上。
二确认机制
1. 生产者确认
生产者可以使用确认机制确保消息已被正确发送到 RabbitMQ 服务器。在 Channel 类中使用 confirmSelect() 方法开启确认模式然后通过 waitForConfirms() 或 waitForConfirmsOrDie() 方法等待确认消息。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConfirmListener;public class ConfirmedProducer {private final static String QUEUE_NAME confirmed_queue;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.confirmSelect();String message Confirmed Message;channel.basicPublish(, QUEUE_NAME, null, message.getBytes(UTF-8));if (channel.waitForConfirms()) {System.out.println( [x] Sent message and confirmed);} else {System.out.println( [x] Sent message but not confirmed);}}}
}代码解释
channel.confirmSelect() 开启确认模式使得生产者可以确认消息是否已被服务器接收。channel.basicPublish() 发送消息。channel.waitForConfirms() 等待服务器确认消息如果收到确认则消息成功发送否则可能出现发送失败的情况。
2. 消费者确认
消费者可以使用确认机制告知 RabbitMQ 消息是否已被成功处理。在 Channel 类中通过 basicAck() 方法手动确认消息已被成功消费使用 basicNack() 或 basicReject() 方法表示消息处理失败。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;public class ConfirmedConsumer {private final static String QUEUE_NAME confirmed_queue;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );try {// 模拟消息处理过程Thread.sleep(1000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);System.out.println( [x] Acknowledged);} catch (Exception e) {channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);System.out.println( [x] Not acknowledged);}};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag - {});}
}代码解释
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag - {}) 中将自动确认设置为 false需要手动确认消息的消费。deliverCallback 是消息到达时的回调函数收到消息后会对消息进行处理这里模拟处理过程使用 Thread.sleep(1000)。channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false) 手动确认消息已成功处理通知 RabbitMQ 可以将消息从队列中移除。channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true) 表示消息处理失败第三个参数 true 表示重新入队将消息重新放回队列中以便再次消费。
三消息的可靠性保证策略
1. 持久化与确认机制结合
结合队列和消息的持久化以及生产者和消费者的确认机制可以大大提高消息传递的可靠性。生产者发送持久化消息并等待确认消费者手动确认消息确保消息在整个生命周期内的可靠性。
2. 备份交换机Alternate Exchange
当消息无法路由到任何队列时可以使用备份交换机将消息路由到备用队列。在 Exchange 类中设置 alternate-exchange 属性可以将无法路由的消息发送到备份交换机。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class AlternateExchangeProducer {private final static String MAIN_EXCHANGE_NAME main_exchange;private final static String BACKUP_EXCHANGE_NAME backup_exchange;private final static String QUEUE_NAME backup_queue;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {channel.exchangeDeclare(MAIN_EXCHANGE_NAME, direct, true, false, null);channel.exchangeDeclare(BACKUP_EXCHANGE_NAME, fanout, true, false, null);channel.queueDeclare(QUEUE_NAME, true, false, false, null);channel.queueBind(QUEUE_NAME, BACKUP_EXCHANGE_NAME, );channel.exchangeBind(MAIN_EXCHANGE_NAME, BACKUP_EXCHANGE_NAME, );String message Message for Alternate Exchange;channel.basicPublish(MAIN_EXCHANGE_NAME, non_existing_routing_key, null, message.getBytes(UTF-8));System.out.println( [x] Sent message );}}
}代码解释
channel.exchangeDeclare(MAIN_EXCHANGE_NAME, “direct”, true, false, null) 声明主交换机设置为持久化。channel.exchangeDeclare(BACKUP_EXCHANGE_NAME, “fanout”, true, false, null) 声明备份交换机这里使用扇出交换机将消息广播到所有绑定队列。channel.queueDeclare(QUEUE_NAME, true, false, false, null) 声明备份队列设置为持久化。channel.queueBind(QUEUE_NAME, BACKUP_EXCHANGE_NAME, “”) 将备份队列绑定到备份交换机。channel.exchangeBind(MAIN_EXCHANGE_NAME, BACKUP_EXCHANGE_NAME, “”) 将备份交换机绑定到主交换机作为备用路由。channel.basicPublish(MAIN_EXCHANGE_NAME, “non_existing_routing_key”, null, message.getBytes(“UTF-8”)) 发送消息到主交换机但使用了一个不存在的路由键会导致消息无法路由到正常队列从而被路由到备份交换机最终到达备份队列。
3. 死信队列Dead Letter Queue
当消息在队列中满足一定条件如过期、被拒绝、超出队列长度等时会被发送到死信队列方便对这些消息进行后续处理。在 Queue 类中通过设置 x-dead-letter-exchange 和 x-dead-letter-routing-key 参数将队列关联到死信队列。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class DeadLetterQueueProducer {private final static String QUEUE_NAME normal_queue;private final static String DEAD_LETTER_QUEUE_NAME dead_letter_queue;private final static String EXCHANGE_NAME dead_letter_exchange;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, direct);channel.queueDeclare(DEAD_LETTER_QUEUE_NAME, true, false, false, null);channel.queueBind(DEAD_LETTER_QUEUE_NAME, EXCHANGE_NAME, dead_letter_key);MapString, Object args new HashMap();args.put(x-dead-letter-exchange, EXCHANGE_NAME);args.put(x-dead-letter-routing-key, dead_letter_key);channel.queueDeclare(QUEUE_NAME, true, false, false, args);String message Message for Dead Letter Queue;channel.basicPublish(, QUEUE_NAME, null, message.getBytes(UTF-8));System.out.println( [x] Sent message );}}
}代码解释
channel.exchangeDeclare(EXCHANGE_NAME, “direct”) 声明一个用于死信队列的交换机。channel.queueDeclare(DEAD_LETTER_QUEUE_NAME, true, false, false, null) 声明死信队列并绑定到该交换机。channel.queueBind(DEAD_LETTER_QUEUE_NAME, EXCHANGE_NAME, “dead_letter_key”) 完成绑定。args.put(“x-dead-letter-exchange”, EXCHANGE_NAME) 和 args.put(“x-dead-letter-routing-key”, “dead_letter_key”) 为正常队列设置死信队列相关属性当正常队列中的消息满足死信条件时会被路由到死信队列。channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes(“UTF-8”)) 发送消息到正常队列。
四、实际项目案例展示
一案例背景
考虑一个在线票务系统包括用户下单、库存管理、支付处理等服务这些服务之间需要高效、可靠的消息传递。
二架构设计
生产者服务用户下单
用户下单后订单服务作为生产者将订单消息发送到 RabbitMQ 的队列或交换机。使用直连交换机将不同类型的订单如电影票订单、演出票订单路由到不同队列。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class TicketOrderProducer {private final static String EXCHANGE_NAME ticket_order_exchange;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, direct);String message New Ticket Order;String routingKey movie_ticket_order;channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(UTF-8));System.out.println( [x] Sent message with routing key routingKey );}}
}消费者服务库存管理
库存管理服务作为消费者从相应队列接收订单消息处理订单的库存扣减操作。使用确认机制确保消息可靠处理。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;public class InventoryConsumer {private final static String EXCHANGE_NAME ticket_order_exchange;private final static String QUEUE_NAME movie_ticket_order_queue;private final static String ROUTING_KEY movie_ticket_order;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, direct);channel.queueDeclare(QUEUE_NAME, true, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );try {// 模拟库存扣减操作Thread.sleep(2000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);System.out.println( [x] Inventory deducted and acknowledged);} catch (Exception e) {channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);System.out.println( [x] Inventory deduction failed, not acknowledged);}};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag - {});}
}消费者服务支付处理
支付服务作为另一个消费者从相应队列接收订单消息处理订单的支付操作同样使用确认机制保证消息的可靠处理。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;public class PaymentConsumer {private final static String EXCHANGE_NAME ticket_order_exchange;private final static String QUEUE_NAME payment_order_queue;private final static String ROUTING_KEY payment_order;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, direct);channel.queueDeclare(QUEUE_NAME, true, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );try {// 模拟支付操作Thread.sleep(1500);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);System.out.println( [x] Payment processed and acknowledged);} catch (Exception e) {channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);System.out.println( [x] Payment processing failed, not acknowledged);}};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag - {});}
}代码解释
ConnectionFactory 类用于创建与 RabbitMQ 的连接通过设置 host、port、username 和 password 来配置连接信息。channel.exchangeDeclare(EXCHANGE_NAME, “direct”) 声明一个直连类型的交换机用于路由消息。channel.queueDeclare(QUEUE_NAME, true, false, false, null) 声明一个持久化的队列该队列在 RabbitMQ 重启后不会丢失。channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY) 将队列绑定到交换机上使用 ROUTING_KEY 作为路由键确保消息能正确路由到该队列。DeliverCallback 是一个回调函数当消息被接收时会被调用。在这个回调函数中 String message new String(delivery.getBody(), “UTF-8”); 将接收到的消息体转换为字符串。Thread.sleep(1500); 模拟支付操作的耗时。channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 表示成功处理消息后手动确认通知 RabbitMQ 可以将该消息从队列中移除。channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); 表示处理消息失败将消息重新放回队列第三个参数 true 表示重新入队以便后续再次处理。channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag - {}); 开始消费消息false 表示需要手动确认消息消费deliverCallback 是消息接收的回调函数consumerTag - {} 是取消消费的回调函数这里未实现具体逻辑。
三性能优化与可靠性保障
1. 性能优化
连接池的使用
可以使用连接池来管理与 RabbitMQ 的连接避免频繁创建和关闭连接提高性能。例如使用 Apache Commons Pool 等库创建连接池。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;public class RabbitMQConnectionPool {private final static String QUEUE_NAME pooled_queue;private final ObjectPoolChannel channelPool;public RabbitMQConnectionPool() {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);Connection connection;try {connection factory.newConnection();} catch (Exception e) {throw new RuntimeException(Failed to create connection, e);}GenericObjectPoolConfigChannel poolConfig new GenericObjectPoolConfig();poolConfig.setMaxTotal(10);channelPool new GenericObjectPool(new RabbitMQChannelFactory(connection), poolConfig);}public void sendMessage(String message) throws Exception {try (Channel channel channelPool.borrowObject()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicPublish(, QUEUE_NAME, null, message.getBytes(UTF-8));System.out.println( [x] Sent message );}}private static class RabbitMQChannelFactory extends BasePooledObjectFactoryChannel {private final Connection connection;public RabbitMQChannelFactory(Connection connection) {this.connection connection;}Overridepublic Channel create() throws Exception {return connection.createChannel();}Overridepublic PooledObjectChannel wrap(Channel channel) {return new DefaultPooledObject(channel);}}public static void main(String[] args) throws Exception {RabbitMQConnectionPool pool new RabbitMQConnectionPool();pool.sendMessage(Message from connection pool);}
}代码解释
GenericObjectPoolConfig 用于配置连接池的参数如 setMaxTotal(10) 设置连接池的最大连接数。RabbitMQChannelFactory 是一个自定义的连接工厂用于创建 Channel 对象。channelPool 是一个 ObjectPool 类型的连接池通过 GenericObjectPool 实现。sendMessage 方法从连接池获取 Channel 对象发送消息后将 Channel 对象归还给连接池避免频繁创建和关闭 Channel提高性能。
批量发送和接收消息
生产者可以将多个消息批量发送减少网络开销。消费者也可以批量处理消息提高处理效率。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.AMQP.BasicProperties;import java.util.ArrayList;
import java.util.List;public class RabbitMQBatchProducer {private final static String QUEUE_NAME batch_queue;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);Listbyte[] messages new ArrayList();for (int i 0; i 10; i) {messages.add((Batch Message i).getBytes(UTF-8));}channel.confirmSelect();channel.basicPublish(, QUEUE_NAME, new BasicProperties().builder().deliveryMode(2).build(), messages.get(0));for (int i 1; i messages.size(); i) {channel.basicPublish(, QUEUE_NAME, null, messages.get(i));}if (channel.waitForConfirms()) {System.out.println( [x] Sent batch messages and confirmed);} else {System.out.println( [x] Failed to send batch messages);}}}
}import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;
import java.util.ArrayList;
import java.util.List;public class RabbitMQBatchConsumer {private final static String QUEUE_NAME batch_queue;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);ListString receivedMessages new ArrayList();DeliverCallback deliverCallback (consumerTag, delivery) - {receivedMessages.add(new String(delivery.getBody(), UTF-8));if (receivedMessages.size() 5) {processBatchMessages(receivedMessages);receivedMessages.clear();}};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - {});}private static void processBatchMessages(ListString messages) {for (String message : messages) {System.out.println( [x] Received message );// 在这里可以进行批量消息的处理操作}}
}代码解释
在 RabbitMQBatchProducer 中
messages 列表存储要发送的多个消息。channel.confirmSelect() 开启确认模式。先发送第一个消息并设置属性后续消息使用 channel.basicPublish(“”, QUEUE_NAME, null, messages.get(i)) 发送减少属性设置次数。channel.waitForConfirms() 等待确认。在 RabbitMQBatchConsumer 中receivedMessages 列表存储接收到的消息当达到一定数量这里是 5 条时调用 processBatchMessages 进行批量处理。
2. 可靠性保障
持久化、确认机制和重试机制的综合运用 确保队列和消息的持久化生产者和消费者都使用确认机制并结合重试机制处理异常情况提高系统的可靠性。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConfirmListener;
import java.util.concurrent.atomic.AtomicInteger;public class ReliableMessageSystem {private final static String QUEUE_NAME reliable_queue;private static final int MAX_RETRIES 3;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);factory.setPort(5672);factory.setUsername(guest);factory.setPassword(guest);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, true, false, false, null);channel.confirmSelect();AtomicInteger retries new AtomicInteger(0);channel.addConfirmListener(new ConfirmListener() {Overridepublic void handleAck(long deliveryTag, boolean multiple) {System.out.println( [x] Message acknowledged);retries.set(0);}Overridepublic void handleNack(long deliveryTag, boolean multiple) {if (retries.getAndIncrement() MAX_RETRIES) {try {System.out.println( [x] Message not acknowledged, retrying...);// 重新发送消息String message Retry Message;channel.basicPublish(, QUEUE_NAME, null, message.getBytes(UTF-8));} catch (Exception e) {System.out.println( [x] Failed to retry);}} else {System.out.println( [x] Exceeded max retries);}}});String message Reliable Message;channel.basicPublish(, QUEUE_NAME, null, message.getBytes(UTF-8));System.out.println( [x] Sent message );}}
}代码解释
channel.queueDeclare(QUEUE_NAME, true, false, false, null) 确保队列持久化。channel.confirmSelect() 开启生产者确认模式。AtomicInteger retries 用于记录重试次数。channel.addConfirmListener 添加确认监听器根据 handleAck 和 handleNack 处理确认和未确认的情况未确认时根据重试次数进行消息的重新发送。
四总结 在分布式系统中RabbitMQ 作为一种强大的消息队列通过其丰富的功能和特性可以帮助我们实现系统的解耦、异步处理和流量削峰提升系统的性能和可靠性。通过深入理解其核心概念如交换机、队列、绑定以及使用不同类型的交换机满足不同的路由需求我们可以构建出灵活多样的消息传递架构。同时通过消息持久化、确认机制、备份交换机、死信队列等手段可以保证消息的可靠性。在实际项目中我们可以根据不同的业务场景和性能要求结合性能优化技术如连接池、批量处理和可靠性保障策略如持久化、确认和重试机制构建出高效、可靠的消息传递系统。 RabbitMQ 的源码实现细节展现了其在消息存储、路由、消费和确认等方面的精细设计例如在 Channel 类中对消息的发送、接收和确认操作的实现以及 Queue 类对队列的管理和存储等。通过上述的代码示例和实际案例希望你能更好地掌握 RabbitMQ 的使用在分布式系统开发中灵活运用消息队列技术提高系统的可维护性和可扩展性满足不同业务场景下的消息传递需求。 需要注意的是在实际使用 RabbitMQ 时要根据系统的负载、性能要求和可靠性要求合理配置和优化各项参数确保系统在不同的场景下都能稳定、高效地运行。同时持续关注 RabbitMQ 的官方文档和社区资源以便更好地应对可能出现的问题和利用新的特性。 相关资料已更新 关注公众号搜 架构研究站回复资料领取即可获取全部面试题以及1000份学习资料