Springboot项目使用原生Websocket

目录

  • 1.启用Websocket功能
  • 2.封装操作websocket session的工具
  • 3.保存websocket session的接口
  • 4.保存websocket session的类
  • 5.定义websocket 端点
  • 6.创建定时任务 ping websocket 客户端

1.启用Websocket功能

package com.xxx.robot.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
@EnableWebSocket
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpoint() {return new ServerEndpointExporter();}}

2.封装操作websocket session的工具

package com.xxx.robot.websocket.util;import java.util.Map;import javax.websocket.Session;import org.apache.tomcat.websocket.Constants;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;import com.xxx.framework.security.config.MyUserDetails;
import com.xxx.framework.security.entity.LoginUser;
import com.xxx.user.entity.User;public final class WebSocketSessionUtils {private WebSocketSessionUtils() {}public static final int WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;public static final int WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;/*** websocket block 发送超时 毫秒*/public static final long WEBSOCKET_BLOCKING_SEND_TIMEOUT = 10 * 1000;/*** 从 websocket session 中找到登录用户* 其中 MyUserDetails 继承自 org.springframework.security.core.userdetails.User* LoginUser、User 从业务层自定义的类* 项目中使用了spring security框架*/public static User findUser (Session session) {UsernamePasswordAuthenticationToken uToken = (UsernamePasswordAuthenticationToken) session.getUserPrincipal();MyUserDetails userDetails = (MyUserDetails) uToken.getPrincipal();LoginUser loginUser = (LoginUser) userDetails.getUserData();return (User) loginUser.getAdditionalInfo();}/*** 给 websocket session 设置参数*/public static void setProperties(Session session) {//设置websocket文本消息的长度为8M,默认为8ksession.setMaxTextMessageBufferSize(WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE);//设置websocket二进制消息的长度为8M,默认为8ksession.setMaxBinaryMessageBufferSize(WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE);Map<String, Object> userProperties = session.getUserProperties();//设置websocket发送消息的超时时长为10秒,默认为20秒userProperties.put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, WEBSOCKET_BLOCKING_SEND_TIMEOUT);}
}

3.保存websocket session的接口

package com.xxx.robot.websocket;import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;import javax.websocket.Session;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public interface WebSocketSessionManager {Logger log = LoggerFactory.getLogger(WebSocketSessionManager.class);String PING = "ping";String PONG = "pong";Session get (String key);List<String> keys();void add (String key, Session session);Session remove (String key);/*** ping每一个websocket客户端,如果ping超时,则触发由@OnError注释的方法*/default void pingBatch () {List<String> keyList = keys();log.info("WebSocket: {} 数量为:{}", this.getClass().getSimpleName(), keyList.size());for (String key : keyList) {if (key != null) {Session session = get(key);if (session != null) {try {session.getBasicRemote().sendPing(ByteBuffer.wrap(PING.getBytes()));try {Thread.sleep(10);} catch (InterruptedException e1) {}} catch (Exception e) {log.error("WebSocket-ping异常", e);}}}}}/*** 消除所有websocket客户端*/default void clearAllSession () {List<String> keyList = keys();int i = 0;for (String key : keyList) {if (key != null) {Session session = get(key);if (session != null) {try {remove(key);i++;session.close();} catch (IOException e1) {log.error("WebSocket-移除并关闭session异常", e1);}if (i % 10 == 0) {try {Thread.sleep(0);} catch (InterruptedException e1) {}}}}}log.info("WebSocket-移除并关闭session数量为:{}", i);}
}

4.保存websocket session的类

package com.xxx.robot.websocket.robot.manager;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;import javax.websocket.Session;import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;import com.xxx.robot.websocket.WebSocketSessionManager;/*** 机器人模块WebSocket Session管理器*/
@Component
public class RobotSessionManager implements WebSocketSessionManager {/*** key = userId + '-' + managerId* userId 从当前登录用户中可得到, managerId由客户端连接websocket时按服务端的接口传给服务端* 因为业务中不仅要获取每一个客户端,还要获取同一个用户下的所有客户端,所以由ConcurrentHashMap改为ConcurrentSkipListMap*/private static final ConcurrentSkipListMap<String, Session> SESSION_POOL = new ConcurrentSkipListMap<>();public static final String joinKey (String userId, String managerId) {return userId + '-' + managerId;}public static final String joinKey (Long userId, String managerId) {return userId.toString() + '-' + managerId;}public static final String[] splitKey (String key) {return StringUtils.split(key, '-');}@Overridepublic Session get(String key) {return SESSION_POOL.get(key);}/*** 根据用户ID查询所有websocket session的key* @param userId* @param excludeManagerId 排除的key, 可为空* @return*/public List<String> keysByUserId(String userId, String excludeManagerId) {//'-'的ascii码为45, '.'的ascii码为46, 所以下面获得的是key以 userId + '-' 为前缀的map视图ConcurrentNavigableMap<String, Session> subMap = SESSION_POOL.subMap(userId + '-', userId + '.');NavigableSet<String> keySet = subMap.navigableKeySet();List<String> list = new ArrayList<>();if (StringUtils.isBlank(excludeManagerId)) {for (String key : keySet) {if (key != null) {list.add(key);}}} else {for (String key : keySet) {if (key != null && !key.equals(excludeManagerId)) {list.add(key);}}}return list;}@Overridepublic List<String> keys() {NavigableSet<String> keySet = SESSION_POOL.navigableKeySet();List<String> list = new ArrayList<>();for (String key : keySet) {if (key != null) {list.add(key);}}return list;}@Overridepublic synchronized void add(String key, Session session) {removeAndClose(key);SESSION_POOL.put(key, session);}@Overridepublic synchronized Session remove(String key) {return SESSION_POOL.remove(key);}/*** 必须key和value都匹配才能删除*/public synchronized void remove(String key, Session session) {SESSION_POOL.remove(key, session);}private void removeAndClose (String key) {Session session = remove(key);if (session != null) {try {session.close();} catch (IOException e) {}}}}

5.定义websocket 端点

package com.xxx.robot.websocket.robot.endpoint;import java.util.Map;import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;import org.springframework.stereotype.Component;import com.fasterxml.jackson.databind.JsonNode;
import com.xxx.framework.util.SpringBeanUtils;
import com.xxx.user.entity.User;
import com.xxx.robot.corefunc.service.RobotCoreService;
import com.xxx.robot.util.serial.BaseJsonUtils;
import com.xxx.robot.websocket.WebSocketSessionManager;
import com.xxx.robot.websocket.robot.manager.RobotSessionManager;
import com.xxx.robot.websocket.util.WebSocketSessionUtils;import lombok.extern.slf4j.Slf4j;/*** 机器人模块WebSocket接口* 每一次websocket请求,RobotWebSocketServer都是一个新的实例,所以成员变量是安全的* 以致虽然类由@Component注释,但不可使用@Autowired等方式注入bean*/
@Slf4j
@Component
@ServerEndpoint(value = "/robot/{id}")
public class RobotWebSocketServer {private volatile User user;private volatile String id;private volatile Session session;private volatile Map<String, RobotCoreService> robotCoreServiceMap;/*** 所有初始化操作都写在@OnOpen注释的方法中* 连接成功* @param session*/@OnOpenpublic void onOpen(@PathParam("id") String id, Session session) {WebSocketSessionUtils.setProperties(session);this.user = WebSocketSessionUtils.findUser(session);this.id = id;this.session = session;log.info("连接成功:{}, {}", id, this.user.getUserCode());//使用BeanUtils代替@Autowired获取bean, //RobotCoreService为业务类,不必关心robotCoreServiceMap = SpringBeanUtils.getApplicationContext().getBeansOfType(RobotCoreService.class);RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);//保存websocket sessionrobotSessionManager.add(RobotSessionManager.joinKey(this.user.getId(), id), session);}/*** 连接关闭* @param session*/@OnClosepublic void onClose() {log.info("连接关闭:{}, {}", this.id, this.user.getUserCode());RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);//连接关闭时,使用两个参数的remove方法,多线程下安全删除robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);}@OnErrorpublic void onError(Throwable error) {log.error("onError:id = {}, {}, {}", this.id, this.session.getId(), this.user.getUserCode(), error);RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);//websocket异常时,使用两个参数的remove方法,多线程下安全删除//比如ping客户端超时,触发此方法,删除该客户端robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);}/*** 接收到消息* @param message*/@OnMessagepublic void onMessage(String message) {log.info("onMessage:id = {}, {}, {}", this.id, this.user.getUserCode(), message);if (WebSocketSessionManager.PING.equals(message)) {//自定义ping接口,收到ping后,响应pong,客户端暂时未使用此接口this.session.getAsyncRemote().sendText(WebSocketSessionManager.PONG);return;}//用 try...catch 包裹防止抛出异常导致websocket关闭try {//业务层,使用jackson反序列化json,不必关心具体的业务JsonNode root = BaseJsonUtils.readTree(message);String apiType = root.at("/apiType").asText();//业务层代码应在子线程中执行,防止wesocket线程执行时间过长导致websocket关闭robotCoreServiceMap.get(apiType + "Service").receiveFrontMessage(this.user, RobotSessionManager.joinKey(this.user.getId(), this.id), root);} catch (Exception e) {log.error("处理消息错误", e);}}}

6.创建定时任务 ping websocket 客户端

package com.xxx.robot.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;/*** 启用定时任务功能* 因为websocket session是有状态的,只能保存在各自的服务端,* 所以只能使用单机式的定时任务,而不能使用分布式定时任务,* 因此 springboot自带的定时任务功能成为了首选* springboot定时任务线程池*/
@Configuration
@EnableScheduling
public class TaskExecutorConfig {@Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(5);executor.setQueueCapacity(10);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("scheduler-executor-");return executor;}}
package com.xxx.robot.websocket;import java.util.List;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;/*** @author Sunzhihua*/
@Slf4j
@Component
public class WebSocketSchedulerTask {/*** 注入所有的 websocket session 管理器*/@Autowiredprivate List<WebSocketSessionManager> webSocketSessionManagers;/*** initialDelay 表示 延迟60秒初始化* fixedDelay 表示 上一次任务结束后,再延迟30秒执行*/@Scheduled(initialDelay = 60000, fixedDelay = 30000)public void clearInvalidSession() {try {log.info("pingBatch 开始。。。");for (WebSocketSessionManager webSocketSessionManager : webSocketSessionManagers) {webSocketSessionManager.pingBatch();}log.info("pingBatch 完成。。。");} catch (Exception e) {log.error("pingBatch异常", e);}}
}

本文链接:https://my.lmcjl.com/post/1977.html

展开阅读全文

4 评论

留下您的评论.