diff --git a/xjs-business/xjs-business-warning/pom.xml b/xjs-business/xjs-business-warning/pom.xml index 8e6e8acf..b8d64745 100644 --- a/xjs-business/xjs-business-warning/pom.xml +++ b/xjs-business/xjs-business-warning/pom.xml @@ -23,6 +23,12 @@ xjs-business-common 3.3.0 + + + + org.springframework.boot + spring-boot-starter-websocket + \ No newline at end of file diff --git a/xjs-business/xjs-business-warning/src/main/java/com/xjs/config/WebSocketConfig.java b/xjs-business/xjs-business-warning/src/main/java/com/xjs/config/WebSocketConfig.java new file mode 100644 index 00000000..544cf8e0 --- /dev/null +++ b/xjs-business/xjs-business-warning/src/main/java/com/xjs/config/WebSocketConfig.java @@ -0,0 +1,21 @@ +package com.xjs.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +/** + * websocket配置 + * @author xiejs + * @since 2022-01-13 + */ +@Configuration +public class WebSocketConfig { + /** + * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/xjs-business/xjs-business-warning/src/main/java/com/xjs/server/WebSocketServer.java b/xjs-business/xjs-business-warning/src/main/java/com/xjs/server/WebSocketServer.java new file mode 100644 index 00000000..ea5c1ccc --- /dev/null +++ b/xjs-business/xjs-business-warning/src/main/java/com/xjs/server/WebSocketServer.java @@ -0,0 +1,145 @@ +package com.xjs.server; + +import lombok.extern.log4j.Log4j2; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; + +/** + * api预警websocket + * + * @author xiejs + * @since 2022-01-13 + */ +@Log4j2 +@ServerEndpoint("warning/api/{userId}") +@Component +public class WebSocketServer { + /** + * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 + */ + private static int onlineCount = 0; + /** + * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 + */ + private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); + /** + * 与某个客户端的连接会话,需要通过它来给客户端发送数据 + */ + private Session session; + /** + * 接收userId + */ + private String userId = ""; + + /** + * 连接建立成功调用的方法 + */ + @OnOpen + public void onOpen(Session session, @PathParam("userId") String userId) { + this.session = session; + this.userId = userId; + if (webSocketMap.containsKey(userId)) { + webSocketMap.remove(userId); + webSocketMap.put(userId, this); + //加入set中 + } else { + webSocketMap.put(userId, this); + //加入set中 + addOnlineCount(); + //在线数加1 + } + + log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount()); + + try { + sendMessage("连接成功"); + } catch (IOException e) { + log.error("用户:" + userId + ",网络异常!!!!!!"); + } + } + + /** + * 连接关闭调用的方法 + */ + @OnClose + public void onClose() { + if (webSocketMap.containsKey(userId)) { + webSocketMap.remove(userId); + //从set中删除 + subOnlineCount(); + } + log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount()); + } + + /** + * 收到客户端消息后调用的方法 + * + * @param message 客户端发送过来的消息 + */ + @OnMessage + public void onMessage(String message, Session session) { + log.info("用户消息:" + userId + ",报文:" + message); + //可以群发消息 + //消息保存到数据库、redis + if (StringUtils.isNotBlank(message)) { + + + try { + session.getBasicRemote().sendText("回复的消息"); + } catch (IOException e) { + e.printStackTrace(); + } + + + } + } + + /** + * @param session + * @param error + */ + @OnError + public void onError(Session session, Throwable error) { + log.error("用户错误:" + this.userId + ",原因:" + error.getMessage()); + error.printStackTrace(); + } + + /** + * 实现服务器主动推送 + */ + public void sendMessage(String message) throws IOException { + this.session.getBasicRemote().sendText(message); + } + + + /** + * 发送自定义消息 + */ + public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException { + log.info("发送消息到:" + userId + ",报文:" + message); + if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) { + webSocketMap.get(userId).sendMessage(message); + } else { + log.error("用户" + userId + ",不在线!"); + } + } + + public static synchronized int getOnlineCount() { + return onlineCount; + } + + public static synchronized void addOnlineCount() { + WebSocketServer.onlineCount++; + } + + public static synchronized void subOnlineCount() { + WebSocketServer.onlineCount--; + } + +}