0


springBoot使用webSocket的几种方式以及在高并发出现的问题及解决

一、第一种方式-原生注解(tomcat内嵌)

1.1、引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

1.2、配置文件

packagecn.jt.config;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.web.socket.server.standard.ServerEndpointExporter;/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年07月06日
 */@ConfigurationpublicclassWebSocketConfig{/**
     * 初始化Bean,它会自动注册使用了 @ServerEndpoint 注解声明的 WebSocket endpoint
     *
     * @return
     */@BeanpublicServerEndpointExporterserverEndpointExporter(){returnnewServerEndpointExporter();}}

1.3、构建安全的WebSocket抽象层

1、该类可以作为一个基础的安全抽象层,后续项目中如果需要做认证的操作,都可以继承该抽象类

ClientUserInfoService 大家可以看作一个 UserService 就是一张用户表的service类

这里认证采用的是 jwt的方式,大家可以换成自己的

2、**大家这里注意,我们使用的是

javax.websocket.Session;

这个是tomcat下的**
在这里插入图片描述

packagecn.jt.websocket;importcn.jt.client.entity.ClientUserInfo;importcn.jt.client.service.ClientUserInfoService;importcn.jt.jwt.JwtUtils;importcn.jt.utils.SpringContextUtils;importlombok.extern.slf4j.Slf4j;importjavax.websocket.Session;importjava.io.IOException;importjava.util.Date;/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年07月06日
 */@Slf4jpublicabstractclassSecureWebSocket{privatestaticfinalClientUserInfoService clientUserInfoService;static{
        clientUserInfoService =SpringContextUtils.getBean(ClientUserInfoService.class);}protectedSession session;protectedString token;protectedLong tokenExpiresAt;protectedClientUserInfo clientUserInfo;/**
     * 验证token是否有效(包含有效期)
     *
     * @param token  token
     * @param isInit 是否对token和userInfo进行初始化赋值
     * @return boolean
     */protectedbooleanisTokenValid(String token,boolean isInit){ClientUserInfo clientUserInfo;try{
            clientUserInfo =JwtUtils.getClientUserInfo(token);}catch(Exception e){
            log.error("ws 认证失败", e);returnfalse;}if(isInit){this.clientUserInfo = clientUserInfo;this.tokenExpiresAt =JwtUtils.getDecodedJWT(token).getExpiresAt().getTime();this.token = token;}returntrue;}/**
     * 认证失败,断开连接
     *
     * @param session session
     */protectedvoidsendAuthFailed(Session session){try{
            session.getBasicRemote().sendText("认证失败");
            session.close();}catch(IOException e){
            e.printStackTrace();}}}

1.4、构建基础的WebSocket

1、代码很简单,大家一看就知道逻辑了,这里就解释一下各个注解的含义

  • @ServerEndpoint:将目前的类定义成一个websocket服务器端,注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
  • @OnOpen:当WebSocket建立连接成功后会触发这个注解修饰的方法。
  • @OnClose:当WebSocket建立的连接断开后会触发这个注解修饰的方法。
  • @OnMessage:当客户端发送消息到服务端时,会触发这个注解修改的方法。
  • @OnError:当WebSocket建立连接时出现异常会触发这个注解修饰的方法。

2、**大家这里注意,我们使用的是

javax.websocket.Session;

这个是tomcat下的**
在这里插入图片描述

packagecn.jt.websocket;importcom.alibaba.fastjson.JSON;importcom.google.common.util.concurrent.ThreadFactoryBuilder;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Component;importjavax.websocket.*;importjavax.websocket.server.PathParam;importjavax.websocket.server.ServerEndpoint;importjava.io.IOException;importjava.util.Map;importjava.util.concurrent.*;/**
 * @author GXM
 * @version 1.0.0
 * @Description 
 * @createTime 2023年07月06日
 */@Slf4j@ServerEndpoint("/globalWs/{token}")@ComponentpublicclassGlobalWebsocketextendsSecureWebSocket{/**
     * key: userKye
     * value: GlobalWebsocket  这里你直接存储 session 也是可以的
     */privatestaticfinalMap<String,GlobalWebsocket>CLIENTS=newConcurrentHashMap<>();/**
     * // 如果允许 一个账号 多人登录的话  就 加上  "-" + tokenTime,因为每次登录的token过期时间都是不一样的
     * clientUserInfo.getId() + "-" + clientUserInfo.getAccount() ;
     */privateString userKye;@OnOpenpublicvoidonOpen(Session session,@PathParam("token")String token){if(!isTokenValid(token,true)){sendAuthFailed(session);return;}this.session = session;this.userKye = clientUserInfo.getId()+"-"+ clientUserInfo.getAccount()+"-"+super.tokenExpiresAt;CLIENTS.put(userKye,this);
        log.info("当前在线用户:{}",CLIENTS.keySet());try{
            session.getBasicRemote().sendText("连接成功!");}catch(IOException e){
            e.printStackTrace();}}@OnMessagepublicStringonMessage(Session session,String message){// 先判断当前token 是否已经到期了if(!isTokenValid(token,false)){sendAuthFailed(session);returnnull;}try{
            session.getBasicRemote().sendText("received");}catch(IOException e){
            e.printStackTrace();}returnnull;}@OnErrorpublicvoidonError(Session session,Throwable throwable){//        log.error("ws session 发生错误,session key is {}",throwable);
        log.error("ws session 发生错误:{}", throwable.getMessage());}@OnClosepublicvoidonClose(Session session){CLIENTS.remove(userKye);
        log.info("ws 用户 userKey {} 已下线,当前在线用户:{}", userKye,CLIENTS.keySet());}/**
     * 发送消息
     *
     * @param messageVo
     */publicvoidsendMessage(MessageVo messageVo){try{this.session.getBasicRemote().sendText(JSON.toJSONString(messageVo));}catch(IOException e){
            log.error("发送消息异常", e);}}/**
     * 向user精确用户发送消息
     *
     * @param userKey   由 account + "-" + refreshToken的签发时间组成,例:"admin-1635830649000"
     * @param messageVo 消息内容
     */publicstaticvoidsendToUser(String userKey,MessageVo messageVo){GlobalWebsocket globalWebsocket =CLIENTS.get(userKey);if(null!= globalWebsocket){
            globalWebsocket.sendMessage(messageVo);return;}
        log.error("发送消息到指定用户,但是用户不存在,userKey is {},message is {}", userKey,JSON.toJSONString(messageVo));}/**
     * 全体组播消息
     *
     * @param
     */publicstaticvoidbroadcast(MessageVo messageVo){CLIENTS.values().forEach(c ->{Session curSession = c.session;if(curSession.isOpen()){try{
                                curSession.getBasicRemote().sendText(JSON.toJSONString(messageVo));}catch(IOException e){
                                log.error("发送ws数据错误:{}", e.getMessage());}}});}}

1.5、SpringBoot 开启 WebSocket

@EnableWebSocket

在这里插入图片描述

1.6、高并发时候的问题

1、这里要说明一下在高并发下的问题,如果你同时向在线的 3 个webSocket 在线客户端发送消息,即广播所有在线用户(目前是3个),每个用户每秒10条,那就是说,你每秒要发送 30 条数据,我们调用上述的广播函数

broadcast()

,有时候会出现

java.lang.IllegalStateException: 远程 endpoint 处于 [xxxxxx] 状态,如:
The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for calle

这是因为在高并发的情况下,出现了session抢占的问题,导致session,的状态不一致,所以,这里可以去尝试加锁操作,如下

publicstaticfinalExecutorServiceWEBSOCKET_POOL_EXECUTOR=newThreadPoolExecutor(20,20,Integer.MAX_VALUE,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<>(),newThreadFactoryBuilder().setNameFormat("GlobalWebsocket-executor-"+"%d").setUncaughtExceptionHandler((thread, throwable)-> log.error("ThreadPool {} got exception", thread, throwable)).build(),newThreadPoolExecutor.AbortPolicy());/**
     * 全体组播消息
     *
     * @param
     */publicstaticvoidbroadcast(MessageVo messageVo){CLIENTS.values().forEach(c ->{Session curSession = c.session;if(curSession.isOpen()){// 建议单个session 一个线程,避免  一个session会话网络不好,会出现超时异常,当前线程会因此中断。// 导致后面的session没有进行发送操作。使用单个线程,单个session情况下避免session之间的相互影响。WEBSOCKET_POOL_EXECUTOR.execute(()->{synchronized(curSession){// 双重锁检查,外边的 isOpen 第一遍过滤,里面枷加锁之后,第二遍过滤if(curSession.isOpen()){try{
                                        curSession.getBasicRemote().sendText(JSON.toJSONString(messageVo));}catch(IOException e){
                                        log.error("发送ws数据错误:{}", e.getMessage());}}}});}});}

其中增加了,双重锁检查,以及线程池操作,当然加上锁之后,性能是肯定会有所下降的

建议单个session 一个线程,避免 一个session会话网络不好,会出现超时异常,当前线程会因此中断

2、按照上述的代码,我这边测试12个webSocket 链接,每秒每个客户端都发送10条数据,相当于每秒发送120条数据,目前看来,速度还是不错的,但是当客户端重连后,偶尔会出现错误信息

远程主机已经关闭了一个链接

,类似于这种错误,这条错误日志是在广播代码的如下位置打印的,这是因为当准备发送消息的时候,远程客户端还是关闭了。

try{
                                        curSession.getBasicRemote().sendText(JSON.toJSONString(messageVo));}catch(IOException e){
                                        log.error("发送ws数据错误:{}", e.getMessage());}

二、第二种方式-Spring封装

2.1、引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

2.2、自己的webSocket处理service

1、WebSocketService 处理器类如下

类似于 UserService 等等,主要是抽出一部分的业务逻辑

packagecn.jt.websocket.spring;importorg.springframework.web.socket.TextMessage;importorg.springframework.web.socket.WebSocketSession;importjava.io.IOException;/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年07月19日
 */publicinterfaceWebSocketService{/**
     * 会话开始回调
     *
     * @param session 会话
     */voidhandleOpen(WebSocketSession session);/**
     * 会话结束回调
     *
     * @param session 会话
     */voidhandleClose(WebSocketSession session);/**
     * 处理消息
     *
     * @param session 会话
     * @param message 接收的消息
     */voidhandleMessage(WebSocketSession session,String message);/**
     * 发送消息
     *
     * @param session 当前会话
     * @param message 要发送的消息
     * @throws IOException 发送io异常
     */voidsendMessage(WebSocketSession session,String message)throwsIOException;/**
     * 发送消息
     *
     * @param userId  用户id
     * @param message 要发送的消息
     * @throws IOException 发送io异常
     */voidsendMessage(Integer userId,TextMessage message)throwsIOException;/**
     * 发送消息
     *
     * @param userId  用户id
     * @param message 要发送的消息
     * @throws IOException 发送io异常
     */voidsendMessage(Integer userId,String message)throwsIOException;/**
     * 发送消息
     *
     * @param session 当前会话
     * @param message 要发送的消息
     * @throws IOException 发送io异常
     */voidsendMessage(WebSocketSession session,TextMessage message)throwsIOException;/**
     * 广播
     *
     * @param message 字符串消息
     * @throws IOException 异常
     */voidbroadCast(String message)throwsIOException;/**
     * 广播
     *
     * @param message 文本消息
     * @throws IOException 异常
     */voidbroadCast(TextMessage message)throwsIOException;/**
     * 处理会话异常
     *
     * @param session 会话
     * @param error   异常
     */voidhandleError(WebSocketSession session,Throwable error);}

2、WebSocketServiceImpl 实现类如下

类似于 UserServiceImpl 等等,主要是抽出一部分的业务逻辑

packagecn.jt.websocket.spring;importcn.jt.client.entity.ClientUserInfo;importlombok.extern.slf4j.Slf4j;importorg.springframework.web.socket.TextMessage;importorg.springframework.web.socket.WebSocketSession;importjava.io.IOException;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年07月19日
 */@Slf4jpublicclassWebSocketServiceImplimplementsWebSocketService{privatefinalMap<Integer,WebSocketSession> clients =newConcurrentHashMap<>();@OverridepublicvoidhandleOpen(WebSocketSession session){// 这个时候就需要在建立 webSocket 时存储的 用户信息了Map<String,Object> attributes = session.getAttributes();ClientUserInfo clientUserInfo =(ClientUserInfo) attributes.get("clientUserInfo");
        clients.put(clientUserInfo.getId(), session);

        log.info("a new connection opened,current online count:{}", clients.size());}@OverridepublicvoidhandleClose(WebSocketSession session){// 这个时候就需要在建立 webSocket 时存储的 用户信息了Map<String,Object> attributes = session.getAttributes();ClientUserInfo clientUserInfo =(ClientUserInfo) attributes.get("clientUserInfo");
        clients.remove(clientUserInfo.getId());
        log.info("a new connection closed,current online count:{}", clients.size());}@OverridepublicvoidhandleMessage(WebSocketSession session,String message){// 只处理前端传来的文本消息,并且直接丢弃了客户端传来的消息
        log.info("received a message:{}", message);}@OverridepublicvoidsendMessage(WebSocketSession session,String message)throwsIOException{this.sendMessage(session,newTextMessage(message));}@OverridepublicvoidsendMessage(Integer userId,TextMessage message)throwsIOException{WebSocketSession webSocketSession = clients.get(userId);if(webSocketSession.isOpen()){
            webSocketSession.sendMessage(message);}}@OverridepublicvoidsendMessage(Integer userId,String message)throwsIOException{this.sendMessage(userId,newTextMessage(message));}@OverridepublicvoidsendMessage(WebSocketSession session,TextMessage message)throwsIOException{
        session.sendMessage(message);}@OverridepublicvoidbroadCast(String message)throwsIOException{
        clients.values().forEach(session ->{if(session.isOpen()){try{
                    session.sendMessage(newTextMessage(message));}catch(IOException e){thrownewRuntimeException(e);}}});}@OverridepublicvoidbroadCast(TextMessage message)throwsIOException{
        clients.values().forEach(session ->{if(session.isOpen()){try{
                    session.sendMessage(message);}catch(IOException e){thrownewRuntimeException(e);}}});}@OverridepublicvoidhandleError(WebSocketSession session,Throwable error){
        log.error("websocket error:{},session id:{}", error.getMessage(), session.getId());
        log.error("", error);}}

2.3、实现spring框架的WebSocket处理器

1、注意这里的

webSocketSession

就是 spring 包下的了,不再是

tomcat

包下的了

在这里插入图片描述

这里其实就和我们之前使用原生注解(tomcat)的那个一样了,都是几个特定的函数

我们在特定的方法下,调用我们自己的 service去单独处理,解耦合

packagecn.jt.websocket.spring;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.socket.*;/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年07月19日
 */publicclassDefaultWebSocketHandlerimplementsWebSocketHandler{@AutowiredprivateWebSocketService webSocketService;/**
     * 建立连接
     *
     * @param session Session
     */@OverridepublicvoidafterConnectionEstablished(WebSocketSession session){
        webSocketService.handleOpen(session);}/**
     * 接收消息
     *
     * @param session Session
     * @param message 消息
     */@OverridepublicvoidhandleMessage(WebSocketSession session,WebSocketMessage<?> message){if(message instanceofTextMessage){TextMessage textMessage =(TextMessage) message;
            webSocketService.handleMessage(session, textMessage.getPayload());}}/**
     * 发生错误
     *
     * @param session   Session
     * @param exception 异常
     */@OverridepublicvoidhandleTransportError(WebSocketSession session,Throwable exception){
        webSocketService.handleError(session, exception);}/**
     * 关闭连接
     *
     * @param session     Session
     * @param closeStatus 关闭状态
     */@OverridepublicvoidafterConnectionClosed(WebSocketSession session,CloseStatus closeStatus){
        webSocketService.handleClose(session);}/**
     * 是否支持发送部分消息
     *
     * @return false
     */@OverridepublicbooleansupportsPartialMessages(){returnfalse;}}

2.4、自定义拦截器

这里,我们可以设置拦截器,在做请求参数,或者权限认证的时候,不用在建立链接的函数

afterConnectionEstablished

里面去处理

可以理解为 springMvc 每次请求前的拦截器

packagecn.jt.websocket.spring;importcn.jt.client.entity.ClientUserInfo;importcn.jt.jwt.JwtUtils;importlombok.extern.slf4j.Slf4j;importorg.springframework.http.server.ServerHttpRequest;importorg.springframework.http.server.ServerHttpResponse;importorg.springframework.http.server.ServletServerHttpRequest;importorg.springframework.web.socket.WebSocketHandler;importorg.springframework.web.socket.server.HandshakeInterceptor;importjava.util.Map;/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年07月19日
 */@Slf4jpublicclassWebSocketInterceptorimplementsHandshakeInterceptor{/**
     * 建立请求之前,可以用来做权限判断
     *
     * @param request    the current request
     * @param response   the current response
     * @param wsHandler  the target WebSocket handler
     * @param attributes the attributes from the HTTP handshake to associate with the WebSocket
     *                   session; the provided attributes are copied, the original map is not used.
     * @return
     * @throws Exception
     */@OverridepublicbooleanbeforeHandshake(ServerHttpRequest request,ServerHttpResponse response,WebSocketHandler wsHandler,Map<String,Object> attributes)throwsException{if(request instanceofServletServerHttpRequest){ServletServerHttpRequest servletServerHttpRequest =(ServletServerHttpRequest) request;// 模拟用户(通常利用JWT令牌解析用户信息)String token = servletServerHttpRequest.getServletRequest().getParameter("token");try{ClientUserInfo clientUserInfo =JwtUtils.getClientUserInfo(token);// 设置当前这个session的属性,后续我们在发送消息时,可以通过 session.getAttributes().get("clientUserInfo")可以取出 clientUserInfo参数
                attributes.put("clientUserInfo", clientUserInfo);}catch(Exception e){
                log.error("webSocket 认证失败 ", e);returnfalse;}returntrue;}returnfalse;}/**
     * 建立请求之后
     *
     * @param request   the current request
     * @param response  the current response
     * @param wsHandler the target WebSocket handler
     * @param exception an exception raised during the handshake, or {@code null} if none
     */@OverridepublicvoidafterHandshake(ServerHttpRequest request,ServerHttpResponse response,WebSocketHandler wsHandler,Exception exception){}}

2.5、WebSocket配置

将自定义处理器、拦截器以及WebSocket操作类依次注入到IOC容器中。

  • @EnableWebSocket:开启WebSocket功能
  • addHandler:添加处理器
  • addInterceptors:添加拦截器
  • setAllowedOrigins:设置允许跨域(允许所有请求来源)
packagecn.jt.websocket.spring;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.web.socket.config.annotation.WebSocketConfigurer;importorg.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年07月19日
 */@ConfigurationpublicclassWebSocketConfigurationimplementsWebSocketConfigurer{@BeanpublicDefaultWebSocketHandlerdefaultWebSocketHandler(){returnnewDefaultWebSocketHandler();}@BeanpublicWebSocketServicewebSocket(){returnnewWebSocketServiceImpl();}@BeanpublicWebSocketInterceptorwebSocketInterceptor(){returnnewWebSocketInterceptor();}@OverridepublicvoidregisterWebSocketHandlers(WebSocketHandlerRegistry registry){// 链接方式如下 ws://127.0.0.1:9085/globalWs/message?token=qwncjncwqqdnjncz.adqwascsvcgrgb.cbrtbfvb// 如果你设置了springboot的 contentPath 那就需要在:9085端口后面 加上contentPath 的值,在拼接上  globalWs/message?token=qwncjncwqqdnjncz.adqwascsvcgrgb.cbrtbfvb
        registry.addHandler(defaultWebSocketHandler(),"/globalWs/message").addInterceptors(webSocketInterceptor()).setAllowedOrigins("*");}}

2.6、SpringBoot 开启 WebSocket

@EnableWebSocket

在这里插入图片描述

2.7、链接

1、其中

thermal-api

是我的项目名称

在这里插入图片描述

2、链接路径如下

ws://127.0.0.1:9085/thermal-api/globalWs/message?token=qwncjncwqqdnjncz.adqwascsvcgrgb.cbrtbfvb

2.8、高并发时候的问题

1、如果在广播的时候,客户端很多,发送的消息也是很多,还是会出现和之前

第一种方式-原生注解(tomcat内嵌)

相同的问题,出现类似如下报错

The remote endpoint was in state [xxxx] which is an invalid state for calle

2、错误分析可以看 踩坑笔记 Spring websocket并发发送消息异常,写的很清楚。

2.8.1、解决方案一

1、和之前一样,加锁

@OverridepublicvoidbroadCast(String message)throwsIOException{
        clients.values().forEach(session ->{if(session.isOpen()){synchronized(session){try{
                        session.sendMessage(newTextMessage(message));}catch(IOException e){thrownewRuntimeException(e);}}}});}

2.8.2、解决方案二

1、使用 spring 的,Spring 的解决方案是把原来的 WebSocketSession 封了一层,即

org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator

3、代码稍微改一下,如下

@OverridepublicvoidhandleOpen(WebSocketSession session){// 这个时候就需要在建立 webSocket 时存储的 用户信息了Map<String,Object> attributes = session.getAttributes();ClientUserInfo clientUserInfo =(ClientUserInfo) attributes.get("clientUserInfo");
        
        clients.put(clientUserInfo.getId(),newConcurrentWebSocketSessionDecorator(session,10*1000,64000));
        log.info("a new connection opened,current online count:{}", clients.size());}

第三种方式-TIO

1、请上网了解,用的比较少,不做过多说明

第四种方式-STOMP

1、请上网了解,用的比较少,不做过多说明


本文转载自: https://blog.csdn.net/qq_38263083/article/details/131811502
版权归原作者 灵泽~ 所有, 如有侵权,请联系我们删除。

“springBoot使用webSocket的几种方式以及在高并发出现的问题及解决”的评论:

还没有评论