建设我们的网站,政务网站建设原则,分页网站,品牌建设岗位职责一、介绍 RabbitMQ消息传递模型的核心思想是#xff1a;生产者生产的消息从不会直接发送到队列。实际上#xff0c;通常生产者甚至不知道这些消息传递到了哪些队列中。相反#xff0c;生产者只能将消息发送到交换机#xff0c;交换机工作的内容非常简单#xff0c;一方… 一、介绍 RabbitMQ消息传递模型的核心思想是生产者生产的消息从不会直接发送到队列。实际上通常生产者甚至不知道这些消息传递到了哪些队列中。 相反生产者只能将消息发送到交换机交换机工作的内容非常简单一方面他接受来自生产者的消息另一方面他将他们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们放到许多队列还是说应该丢弃他们。这就由交换机来决定。 二、类型 1、类型 总共有以下类型直接direct[路由],主题topic标题headers,扇出fanout[发布订阅] 默认类型[无名类型] 通过()进行标识 channel.basicPublish(,TASK_QUEUE_NAME,null,message.getBytes()); 第一个参数是交换机名称空字符串表示默认或无名的交换机消息能路由发送到队列中其实是由routingKey(bindingKey)绑定key指定的如果它存在的话。 2、 临时队列 每当我们连接到Rabbit时我们都需要一个全新的空队列为此我们可以创建一个具有随机名称的队列或者能让服务器为我们选择一个随机队列名称。其次一旦我们断开了消费者连接队列将被自动删除。 创建临时队列的方式如下 String queueName channel.queueDeclare().getQueue(); 3、绑定bingings binding其实时exchange和queue之间的桥梁他告诉我们exchange和哪个队列进行了绑定关系 4、fanout 他是将接收到的所有消息广播到他知道的所有队列中 消费者另一个复制即可public class ReceiveLogs01 {//交换机的名称public static final String EXCHANGE_NAME logs;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtil.getChannel();//声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME,fanout);//声明一个队列 临时队列/*** 队列的名称是随机的* 当消费者断开与队列的连接的时候队列就自动删除*/String queueName channel.queueDeclare().getQueue();/*** 绑定队列与交换机*/channel.queueBind(queueName,EXCHANGE_NAME,);System.out.println(等待接收消息把接收到的消息打印在屏幕上......);//接收消息DeliverCallback deliverCallback (consumerTag,message) -{System.out.println(01控制台打印接收到的消息:new String(message.getBody()));};CancelCallback cancelCallback (consumerTag) -{};channel.basicConsume(queueName,true,deliverCallback,cancelCallback);}
} 生产者public class EmitLog {//交换机的名称public static final String EXCHANGE_NAME logs;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtil.getChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME,fanout);Scanner scanner new Scanner(System.in);while (scanner.hasNext()){String message scanner.next();channel.basicPublish(EXCHANGE_NAME,,null,message.getBytes());System.out.println(生产者发出消息message);}}
} 结果 5、direct 消息只去到他绑定的routingKey队列中支持多重绑定当exchange的绑定类型是direct但是他绑定的多个队列的key如果都相同在这种情况下虽然绑定类型是direct但是他表现的就和fanout有点类似了。 生产者public class DirectLogs {//交换机的名称public static final String EXCHANGE_NAME direct_logs;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtil.getChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);Scanner scanner new Scanner(System.in);while (scanner.hasNext()){String message scanner.next();channel.basicPublish(EXCHANGE_NAME,warning,null,message.getBytes());System.out.println(生产者发出消息message);}}
} 消费者1public class ReceiveLogs01 {//交换机名称public static final String EXCHANGE_NAME direct_logs;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtil.getChannel();//声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//声明一个队列channel.queueDeclare(console,false,false,false,null);//绑定队列与交换机channel.queueBind(console,EXCHANGE_NAME,info);channel.queueBind(console,EXCHANGE_NAME,warning);DeliverCallback deliverCallback (consumerTag,message) -{System.out.println(direct01控制台打印接收到的消息:new String(message.getBody()));};CancelCallback cancelCallback (consumerTag) -{};channel.basicConsume(console,true,deliverCallback,cancelCallback);}
}消费者2public class ReceiveLogs02 {//交换机名称public static final String EXCHANGE_NAME direct_logs;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtil.getChannel();//声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//声明一个队列channel.queueDeclare(disk,false,false,false,null);//绑定队列与交换机channel.queueBind(disk,EXCHANGE_NAME,error);DeliverCallback deliverCallback (consumerTag,message) -{System.out.println(direct02控制台打印接收到的消息:new String(message.getBody()));};CancelCallback cancelCallback (consumerTag) -{};channel.basicConsume(disk,true,deliverCallback,cancelCallback);}
} 5、topic 发送到类型是topic交换机的消息的routing_key不能随意写必须满足一定的要求它必须是一个单词列表以点号分隔开。这些单词可以是任意单词比如说“stock.usd.nyse”,nyse.vmw,quick.orange.rabbit这种类型的。但是这个单词列表最多不能超过255个字节。【* 可以代替一个单词#可以替代零个或多个单词】 例如Q1-绑定的是orange带三个单词的字符串*.orange.* Q2-绑定的是最后一个是rabbit的3个单词*.*.rabbit 第一个单词是lazy的多个单词lazy.# 当一个队列绑定键是#,那么这个队列将接收所有数据有点像fanout如果队列绑定键当中没有#h和*出现那么该队列绑定类型就是direct了。 生产者public class EmitLogTopic {//交换机名称public static final String EXCHANGE_NAME topic_logs;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtil.getChannel();MapString, String bindingKeyMap new HashMap();bindingKeyMap.put(quick.orange.rabbit,被队列Q1Q2接收到);bindingKeyMap.put(lazy.orange.eleplant,被队列Q1Q2接收到);bindingKeyMap.put(quick.orange.fox,被队列Q1接收到);bindingKeyMap.put(lazy.brown.fox,被队列Q2接收到);bindingKeyMap.put(lazy.pink.rabbit,虽然满足两个绑定但只被队列Q2接收一次);bindingKeyMap.put(quick.brown.fox,不匹配任何绑定不会被任何队列接收到会被丢弃);bindingKeyMap.put(quick.orange.male.rabbit,是四个单词不匹配任何绑定会被丢弃);bindingKeyMap.put(lazy.orange.male.rabbit,是四个单词但匹配Q2);for (Map.EntryString, String bindingKeyEntry : bindingKeyMap.entrySet()) {String routingKey bindingKeyEntry.getKey();String message bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());System.out.println(生产者发出消息message);}}
} 消费者1public class ReceiveLogsTopic01 {//交换机名称public static final String EXCHANGE_NAME topic_logs;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtil.getChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//声明队列String queueName Q1;channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,*.orange.*);System.out.println(Q1等待接收消息。。。。。。);//接收消息channel.basicConsume(queueName,true,(consumeTag,message)-{System.out.println(new String(message.getBody()));System.out.println( 接收队列queueName 绑定键message.getEnvelope().getRoutingKey());},(message)-{});}
} 消费者2public class ReceiveLogsTopic02 {//交换机名称public static final String EXCHANGE_NAME topic_logs;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtil.getChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//声明队列String queueName Q2;channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,*.*.rabbit);channel.queueBind(queueName,EXCHANGE_NAME,lazy.#);System.out.println(Q2等待接收消息。。。。。。);//接收消息channel.basicConsume(queueName,true,(consumeTag,message)-{System.out.println(new String(message.getBody()));System.out.println( 接收队列queueName 绑定键message.getEnvelope().getRoutingKey());},(message)-{});}
}