官方网站数据如何做脚注,云网站建设 优帮云,东莞整站优化火速公司,seo优化思路大纲 用户登录创建聊天室监听Stream#xff08;聊天室#xff09;发送消息实验登录Tom侧Jerry侧 创建聊天室Jerry侧Tom侧 进入聊天室Jerry侧Tom侧 发送消息Jerry发送消息Jerry侧聊天室Tom侧聊天室 Tom发送消息Jerry侧聊天室Tom侧聊天室 代码工程参考资料 在《RabbitMQ实践——… 大纲 用户登录创建聊天室监听Stream聊天室发送消息实验登录Tom侧Jerry侧 创建聊天室Jerry侧Tom侧 进入聊天室Jerry侧Tom侧 发送消息Jerry发送消息Jerry侧聊天室Tom侧聊天室 Tom发送消息Jerry侧聊天室Tom侧聊天室 代码工程参考资料 在《RabbitMQ实践——搭建单人聊天服务》一文中我们搭建了Tom和Jerry两人的聊天服务。在这个服务中它们都向Fanout交换器发送消息。而Fanout会将消息路由到它们两各自监听的队列。这样它们就可以得到全部消息。 如果是多人聊天比如10个人聊天按上述方案需要Fanout交换器绑定10个队列。这就会使得结构变得非常复杂。 这是因为Classic类型队列在消费者确认读取消息后会将消息从队列中删除。这样就需要我们使用fanout向多个队列路由消息以供不同消费者消费。如果多个消费者消费同一个队列则会导致每个消费者得到的都是部分信息。这就不符合我们理解的聊天场景。 但是我们可以使用Stream类型队列来解决这个问题。 Stream类型队列和之前的Classic队列的不同点是Stream队列并不会清除消息。消息会一直存在于Stream队列中消费者可以从指定位置开始读取消息。这样我们只要有一个Stream队列保存消息所有消费者都从队列中读取消息即可。
用户登录
关于用户登录的流程我们在《RabbitMQ实践——搭建单人聊天服务》中已经有详细的介绍。即上图中黑色字体1、2、3、4、5的步骤。
创建聊天室
我们会创建一个以聊天室名称命名的交换器和Stream类型队列。即上图中黑色字体6、7、8、9的步骤。 需要注意的是Stream类型队列创建方案和Classic类型类似只需要多指定x-queue-type“stream”。但是对于Durable持久化只能设置为Trueexclusive只能设置为FalseautoDelete只能设置为False。
package com.rabbitmq.chat.service;import java.util.Collections;
import java.util.Date;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import reactor.core.publisher.Flux;Service
public class ChatRoomV2 {Autowiredprivate RabbitTemplate rabbitTemplate;public void createChatRoom(String admin, String roomName) {createChatRoom(roomName);}private void createChatRoom(String roomName) {rabbitTemplate.execute(action - {action.exchangeDeclare(roomName, fanout, false, true, null);action.queueDeclare(roomName, true, false, false,Collections.singletonMap(x-queue-type, stream));action.queueBind(roomName, roomName, );return null;});}聊天室创建完毕后会通知所有登录的用户。 PostMapping(/create)public void create(RequestParam String admin, RequestParam String roomName) {chatRoomV2.createChatRoom(admin, roomName);core.notifyEveryone(roomName is created);}监听Stream聊天室 public FluxString receive(String username, String roomName) {return Flux.create(emitter - {rabbitTemplate.execute(channel - {channel.basicQos(100);Date timestamp new Date(System.currentTimeMillis());channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap(x-stream-offset, timestamp),(consumerTag, message) - {String senderOfMessage message.getProperties().getHeaders().get(username).toString();String show You Said: ;if (!senderOfMessage.equals(username)) {show senderOfMessage Said: ;}show new String(message.getBody());System.out.println(show);emitter.next(show);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag - { });return null;});});}我们将x-stream-offset设置为当前毫秒数是表示我们只读取当前时间之后发布的消息。这也符合聊天室的业务特点不能读取历史消息。 当我们收到消息后我们会获取消息Header中的自定义字段username它标志了消息的发布者。如果发布者和读取者是同一人我们将展示内容前面新增“You Said:”如果是别人说的则标记发布者的名称。 由于我们使用了WebFlux响应式编程所以Controller层要做特殊处理 GetMapping(value /receive, produces text/event-stream)public FluxString receive(RequestParam String username, RequestParam String roomName) {return chatRoomV2.receive(username, roomName);}发送消息
每个聊天室用户只要给之前创建的Fanout交换器发送消息即可。在这一步我们给他们发送的消息Header中新增了字段username以标记是谁发送的。 public void send(String username, String roomName, String message) {Message msg MessageBuilder.withBody(message.getBytes()).setHeader(username, username).build();rabbitTemplate.send(roomName, , msg);}实验
登录
Tom侧 Jerry侧 创建聊天室
Jerry侧
Jerry申请创建一个聊天室 在管理后台我们看到对应的交换器和Stream都创建出来了。 同时在刚才的登录接口界面Jerry收到了通知
Tom侧
Tom也会收到通知
进入聊天室
Tom和Jerry在收到通知后可以通过receive接口进入聊天室监听聊天室内容变化。
Jerry侧 Tom侧 发送消息
Jerry发送消息 Jerry侧聊天室 Tom侧聊天室 Tom发送消息 Jerry侧聊天室 Tom侧聊天室 代码工程
https://github.com/f304646673/RabbitMQDemo
参考资料
https://www.rabbitmq.com/docs/streams