查看网站是否做百度推广,网络棋牌推广平台有哪些,以下哪些不属于h5制作软件,重庆安全建设工程信息网1、业务介绍
消息源服务的消息不能直接推给用户侧#xff0c;用户与中间服务建立websocket连接#xff0c;中间服务再与源服务建立websocket连接#xff0c;源服务的消息推给中间服务#xff0c;中间服务再将消息推送给用户。流程如下图#xff1a; 此例中我们定义中间服…1、业务介绍
消息源服务的消息不能直接推给用户侧用户与中间服务建立websocket连接中间服务再与源服务建立websocket连接源服务的消息推给中间服务中间服务再将消息推送给用户。流程如下图 此例中我们定义中间服务A的端口为8082消息源头服务B的端口为8081方便阅读下面代码。 说明此例子只实现了中间服务的转发连接的关闭等其他逻辑并没有完善如需要请自行完善
2、中间服务实现
中间服务即为上图的中间服务A由于中间服务既要发送发给用户端消息又要接收从消息源服务B接收消息故服务A分为服务端与客户端。 服务A的websocket服务端我们使用springboot websocket实现客户端使用okhttp实现会话缓存暂使用内存缓存实际项目中可置于其他缓存中 中间服务所需依赖为
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-websocket/artifactId
/dependency
dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId
/dependency
dependencygroupIdcom.squareup.okhttp3/groupIdartifactIdokhttp/artifactIdversion4.2.2/version
/dependency缓存类
public class WSCache {//存储客户端session信息, {会话idws_session}public static MapString, Session clients new ConcurrentHashMap();//存储把不同用户的客户端session信息集合 {userId, [会话id1,会话id2,会话id3,会话id4]}public static MapString, SetString connection new ConcurrentHashMap();
}自定义消息类
Accessors(chain true)
Data
public class MsgInfo {private String massage;//为userId用于从缓存中获取对应用户的websocket sessionprivate String userKey;
}2.1 中间服务A的客户端
客户端也可以使用springboot websocket当下我们选择使用okhttp实现。
Slf4j
public class CommonWSClient extends WebSocketListener {/*** websocket连接建立** param webSocket* param response*/Overridepublic void onOpen(WebSocket webSocket, Response response) {super.onOpen(webSocket, response);log.info(客户端连接建立{}, response.body().string());}/*** 收到消息* param webSocket* param text*/Overridepublic void onMessage(WebSocket webSocket, String text) {super.onMessage(webSocket, text);log.info(okhttp receive{}, text);//todo 收到源8081的消息取到对应userId的消息并将消息通过本地server发送给用户ObjectMapper mapper new ObjectMapper();try {MsgInfo msgInfo mapper.readValue(text, MsgInfo.class);SetString strings WSCache.connection.get(msgInfo.getUserKey());if(!CollectionUtils.isEmpty(strings)){for (String sid : strings) {Session session WSCache.clients.get(sid);session.getBasicRemote().sendText(msgInfo.getMassage());}}} catch (Exception e) {e.printStackTrace();//throw new RuntimeException(e);}}Overridepublic void onMessage(WebSocket webSocket, ByteString bytes) {super.onMessage(webSocket, bytes);}Overridepublic void onClosing(WebSocket webSocket, int code, String reason) {super.onClosing(webSocket, code, reason);log.info(okhttp socket closing.);}Overridepublic void onClosed(WebSocket webSocket, int code, String reason) {super.onClosed(webSocket, code, reason);log.info(okhttp socket closed.);}Overridepublic void onFailure(WebSocket webSocket, Throwable t, Response response) {super.onFailure(webSocket, t, response);if (response null) {log.error(okhttp onFailure, response is null.);return;}try {log.error(okhttp onFailure, code: {}, errmsg: {}, response.code(), response.body().string());} catch (IOException e) {log.warn(okhttp onFailure failed, error: {}, e.getMessage());}}}2.2 中间服务A的服务端
websocket服务
Slf4j
Component
ServerEndpoint(/notice/{userId})
public class WebSocketServer {//会话idprivate String sid null;//建立连接的用户idprivate String userId;/*** description: 当与用户端连接成功时执行该方法* PathParam 获取ServerEndpoint路径中的占位符信息类似 控制层的 PathVariable注解**/OnOpenpublic String onOpen(Session session, PathParam(userId) String userId){this.sid UUID.randomUUID().toString();this.userId userId;WSCache.clients.put(this.sid,session);//判断该用户是否存在会话信息不存在则添加SetString clientSet WSCache.connection.get(userId);if (CollectionUtils.isEmpty(clientSet)){clientSet new HashSet();clientSet.add(this.sid);}else {clientSet.add(this.sid);}WSCache.connection.put(userId,clientSet);log.info(用户{}与本地8082server建立连接, this.userId);//todo 本地client与源server8081连接Request requestRemote new Request.Builder().url(ws://127.0.0.1:8081/api/notice/ userId).build();OkHttpClient webSocketClientRemote new OkHttpClient.Builder().build();WebSocket localClientRemote webSocketClientRemote.newWebSocket(requestRemote, new CommonWSClient());log.info(本地server创建本地client且本地client与远程8082server连接成功);return userId 与本地server连接;}/*** description: 当连接失败时执行该方法**/OnClosepublic void onClose(){WSCache.clients.remove(this.sid);System.out.println(this.sid连接断开);}/*** description: 当收到client发送的消息时执行该方法**/OnMessagepublic void onMessage(String message, Session session) {System.out.println(-----------收到来自用户 this.userId 的信息 message);}/*** description: 当连接发生错误时执行该方法**/OnErrorpublic void onError(Throwable error){System.out.println(error--------系统错误);error.printStackTrace();}
}websocket配置类
Configuration
public class WebSocketConfig {Beanpublic ServerEndpointExporter serverEndpointExporter(){return new ServerEndpointExporter();}
}3、消息源服务
消息源服务B只需要websocket服务用来发送消息即可其实现与中间服务A的服务端相同。 服务
Slf4j
Component
ServerEndpoint(/notice/{userId})
public class WebSocketServer {//存储客户端session信息, {会话idws_session}public static MapString, Session clients new ConcurrentHashMap();//存储把不同用户的客户端session信息集合 {userId, [会话id1,会话id2,会话id3,会话id4]}public static MapString, SetString connection new ConcurrentHashMap();//会话idprivate String sid null;//建立连接的用户idprivate String userId;/*** description: 当与客户端的websocket连接成功时执行该方法* PathParam 获取ServerEndpoint路径中的占位符信息类似 控制层的 PathVariable注解**/OnOpenpublic void onOpen(Session session, PathParam(userId) String userId){log.info(onOpen--session.getRequestParameterMap():{}, session.getRequestParameterMap());this.sid UUID.randomUUID().toString();this.userId userId;clients.put(this.sid,session);//判断该用户是否存在会话信息不存在则添加SetString clientSet connection.get(userId);if (clientSet null){clientSet new HashSet();connection.put(userId,clientSet);}clientSet.add(this.sid);System.out.println(this.userId 用户建立连接 this.sid连接开启);}/*** description: 当连接失败时执行该方法**/OnClosepublic void onClose(){clients.remove(this.sid);System.out.println(this.sid连接断开);}/*** description: 当收到客户端发送的消息时执行该方法**/OnMessagepublic void onMessage(String message, Session session) {System.out.println(-----------收到来自用户 this.userId 的信息 message);//自定义消息实体MsgInfo msgInfo new MsgInfo().setUserKey(this.userId).setMassage(服务端- System.currentTimeMillis() :已收到用户 this.userId 的信息: message);sendMessageByUserId(this.userId, msgInfo);}/*** description: 当连接发生错误时执行该方法**/OnErrorpublic void onError(Throwable error){System.out.println(error--------系统错误);error.printStackTrace();}/*** description: 通过userId向用户发送信息* 该类定义成静态可以配合定时任务实现定时推送**/public static void sendMessageByUserId(String userId, MsgInfo msgInfo){if (!StringUtils.isEmpty(userId)) {SetString clientSet connection.get(userId);//用户是否存在客户端连接if (Objects.nonNull(clientSet)) {IteratorString iterator clientSet.iterator();while (iterator.hasNext()) {String sid iterator.next();Session session clients.get(sid);//向每个会话发送消息if (Objects.nonNull(session)){try {//同步发送数据需要等上一个sendText发送完成才执行下一个发送ObjectMapper mapper new ObjectMapper();session.getBasicRemote().sendText(mapper.writeValueAsString(msgInfo));} catch (Exception e) {e.printStackTrace();}}}}}}Scheduled(cron 0/10 * * * * ?)public void testSendMessageByCron(){log.info(-----------模拟消息开始发送--------------);//模拟两个用户100和200MsgInfo msg100 new MsgInfo().setUserKey(100).setMassage(这是8081发给用户100的消息 System.currentTimeMillis());sendMessageByUserId(100, msg100);MsgInfo msg200 new MsgInfo().setUserKey(200).setMassage(这是8081发给用户200的消息 System.currentTimeMillis());sendMessageByUserId(200, msg200);}
}4、测试
我们使用: wss在线测试工具进行测试 1、 打开两个该工具窗口分别模拟用户100和用户200这两个用户都连接中间服务A端口8082的服务 2、分别启动消息源服务B和中间服务A 此时在服务B控制台我们可以看到 我们模拟的消息发送已经在给用户100和用户200发送因为我们的用户100和用户200均没有与中间服务A建立连接故此时测试界面看不到消息 当我们在用户100的模拟界面点击“开启连接”后可以在右侧看到发给用户100的模拟消息
之后我们再打开用户200的连接
好了到这里就结束了有任何问题请积极指出此例子只是个例子并未经受任何生产的测试欢迎讨论沟通