0


Spring Boot 集成 WebSocket(原生注解与Spring封装)

Spring Boot 集成 WebSocket

本章节将介绍 Spring Boot 集成 WebSocket 的两种主要方式:原生注解与Spring封装。

在线WebSocket测试工具

🤖 Spring Boot 2.x 实践案例(代码仓库)

原生注解

引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

配置文件

@ConfigurationpublicclassWebSocketConfiguration{/**
     *     注入ServerEndpointExporter,
     *     这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
     */@BeanpublicServerEndpointExporterserverEndpointExporter(){returnnewServerEndpointExporter();}}

处理消息

@Component@Slf4j@ServerEndpoint("/websocket/{userId}")publicclassWebSocket{/**
     * 线程安全的无序的集合
     */privatestaticfinalCopyOnWriteArraySet<Session>SESSIONS=newCopyOnWriteArraySet<>();/**
     * 存储在线连接数
     */privatestaticfinalMap<String,Session>SESSION_POOL=newHashMap<>();@OnOpenpublicvoidonOpen(Session session,@PathParam(value ="userId")String userId){try{SESSIONS.add(session);SESSION_POOL.put(userId, session);
            log.info("【WebSocket消息】有新的连接,总数为:"+SESSIONS.size());}catch(Exception e){
            e.printStackTrace();}}@OnClosepublicvoidonClose(Session session){try{SESSIONS.remove(session);
            log.info("【WebSocket消息】连接断开,总数为:"+SESSIONS.size());}catch(Exception e){
            e.printStackTrace();}}@OnMessagepublicvoidonMessage(String message){
        log.info("【WebSocket消息】收到客户端消息:"+ message);}/**
     * 此为广播消息
     *
     * @param message 消息
     */publicvoidsendAllMessage(String message){
        log.info("【WebSocket消息】广播消息:"+ message);for(Session session :SESSIONS){try{if(session.isOpen()){
                    session.getAsyncRemote().sendText(message);}}catch(Exception e){
                e.printStackTrace();}}}/**
     * 此为单点消息
     *
     * @param userId  用户编号
     * @param message 消息
     */publicvoidsendOneMessage(String userId,String message){Session session =SESSION_POOL.get(userId);if(session !=null&& session.isOpen()){try{synchronized(session){
                    log.info("【WebSocket消息】单点消息:"+ message);
                    session.getAsyncRemote().sendText(message);}}catch(Exception e){
                e.printStackTrace();}}}/**
     * 此为单点消息(多人)
     *
     * @param userIds 用户编号列表
     * @param message 消息
     */publicvoidsendMoreMessage(String[] userIds,String message){for(String userId : userIds){Session session =SESSION_POOL.get(userId);if(session !=null&& session.isOpen()){try{
                    log.info("【WebSocket消息】单点消息:"+ message);
                    session.getAsyncRemote().sendText(message);}catch(Exception e){
                    e.printStackTrace();}}}}}
  • @ServerEndpoint:将目前的类定义成一个websocket服务器端,注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
  • @OnOpen:当WebSocket建立连接成功后会触发这个注解修饰的方法。
  • @OnClose:当WebSocket建立的连接断开后会触发这个注解修饰的方法。
  • @OnMessage:当客户端发送消息到服务端时,会触发这个注解修改的方法。
  • @OnError:当WebSocket建立连接时出现异常会触发这个注解修饰的方法。

Spring封装

引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

自定义处理器

处理器作用类似于

@RequestMapping

注解,用于处理某一个路径的

WebSocket

连接,自定义处理器需要实现

WebSocketHandler

接口。

WebSocket操作类

publicinterfaceWebSocket{/**
     * 会话开始回调
     *
     * @param session 会话
     */voidhandleOpen(WebSocketSession session);/**
     * 会话结束回调
     *
     * @param session 会话
     */voidhandleClose(WebSocketSession session);/**
     * 处理消息
     *
     * @param session 会话
     * @param message 接收的消息
     */voidhandleMessage(WebSocketSession session,String message);/**
     * 发送消息
     *
     * @param session 当前会话
     * @param message 要发送的消息
     * @throws IOException 发送io异常
     */voidsendMessage(WebSocketSession session,String message)throwsIOException;/**
     * 发送消息
     *
     * @param userId  用户id
     * @param message 要发送的消息
     * @throws IOException 发送io异常
     */voidsendMessage(String userId,TextMessage message)throwsIOException;/**
     * 发送消息
     *
     * @param userId  用户id
     * @param message 要发送的消息
     * @throws IOException 发送io异常
     */voidsendMessage(String userId,String message)throwsIOException;/**
     * 发送消息
     *
     * @param session 当前会话
     * @param message 要发送的消息
     * @throws IOException 发送io异常
     */voidsendMessage(WebSocketSession session,TextMessage message)throwsIOException;/**
     * 广播
     *
     * @param message 字符串消息
     * @throws IOException 异常
     */voidbroadCast(String message)throwsIOException;/**
     * 广播
     *
     * @param message 文本消息
     * @throws IOException 异常
     */voidbroadCast(TextMessage message)throwsIOException;/**
     * 处理会话异常
     *
     * @param session 会话
     * @param error   异常
     */voidhandleError(WebSocketSession session,Throwable error);/**
     * 获得所有的 websocket 会话
     *
     * @return 所有 websocket 会话
     */Set<WebSocketSession>getSessions();/**
     * 得到当前连接数
     *
     * @return 连接数
     */intgetConnectionCount();}

WebSocket操作实现类

@Slf4jpublicclassWebSocketImplimplementsWebSocket{/**
     * 在线连接数(线程安全)
     */privatefinalAtomicInteger connectionCount =newAtomicInteger(0);/**
     * 线程安全的无序集合(存储会话)
     */privatefinalCopyOnWriteArraySet<WebSocketSession> sessions =newCopyOnWriteArraySet<>();@OverridepublicvoidhandleOpen(WebSocketSession session){
        sessions.add(session);int count = connectionCount.incrementAndGet();
        log.info("a new connection opened,current online count:{}", count);}@OverridepublicvoidhandleClose(WebSocketSession session){
        sessions.remove(session);int count = connectionCount.decrementAndGet();
        log.info("a new connection closed,current online count:{}", count);}@OverridepublicvoidhandleMessage(WebSocketSession session,String message){// 只处理前端传来的文本消息,并且直接丢弃了客户端传来的消息
        log.info("received a message:{}", message);}@OverridepublicvoidsendMessage(WebSocketSession session,String message)throwsIOException{this.sendMessage(session,newTextMessage(message));}@OverridepublicvoidsendMessage(String userId,TextMessage message)throwsIOException{Optional<WebSocketSession> userSession = sessions.stream().filter(session ->{if(!session.isOpen()){returnfalse;}Map<String,Object> attributes = session.getAttributes();if(!attributes.containsKey("uid"){returnfalse;}String uid =(String) attributes.get("uid");return uid.equals(userId);}).findFirst();if(userSession.isPresent()){
            userSession.get().sendMessage(message);}}@OverridepublicvoidsendMessage(String userId,String message)throwsIOException{this.sendMessage(userId,newTextMessage(message));}@OverridepublicvoidsendMessage(WebSocketSession session,TextMessage message)throwsIOException{
        session.sendMessage(message);}@OverridepublicvoidbroadCast(String message)throwsIOException{for(WebSocketSession session : sessions){if(!session.isOpen()){continue;}this.sendMessage(session, message);}}@OverridepublicvoidbroadCast(TextMessage message)throwsIOException{for(WebSocketSession session : sessions){if(!session.isOpen()){continue;}
            session.sendMessage(message);}}@OverridepublicvoidhandleError(WebSocketSession session,Throwable error){
        log.error("websocket error:{},session id:{}", error.getMessage(), session.getId());
        log.error("", error);}@OverridepublicSet<WebSocketSession>getSessions(){return sessions;}@OverridepublicintgetConnectionCount(){return connectionCount.get();}}

自定义WebSocket处理器

publicclassDefaultWebSocketHandlerimplementsWebSocketHandler{@AutowiredprivateWebSocket webSocket;/**
     * 建立连接
     *
     * @param session Session
     */@OverridepublicvoidafterConnectionEstablished(@NonNullWebSocketSession session){
        webSocket.handleOpen(session);}/**
     * 接收消息
     *
     * @param session Session
     * @param message 消息
     */@OverridepublicvoidhandleMessage(@NonNullWebSocketSession session,@NonNullWebSocketMessage<?> message){if(message instanceofTextMessage){TextMessage textMessage =(TextMessage) message;
            webSocket.handleMessage(session, textMessage.getPayload());}}/**
     * 发生错误
     *
     * @param session   Session
     * @param exception 异常
     */@OverridepublicvoidhandleTransportError(WebSocketSession session,Throwable exception){
        webSocket.handleError(session, exception);}/**
     * 关闭连接
     *
     * @param session     Session
     * @param closeStatus 关闭状态
     */@OverridepublicvoidafterConnectionClosed(@NonNullWebSocketSession session,@NonNullCloseStatus closeStatus){
        webSocket.handleClose(session);}/**
     * 是否支持发送部分消息
     *
     * @return false
     */@OverridepublicbooleansupportsPartialMessages(){returnfalse;}}

自定义拦截器

自定义处理器需要实现

HandshakeInterceptor

接口

publicclassWebSocketInterceptorimplementsHandshakeInterceptor{@OverridepublicbooleanbeforeHandshake(@NonNullServerHttpRequest request,@NonNullServerHttpResponse response,@NonNullWebSocketHandler wsHandler,@NonNullMap<String,Object> attributes)throwsException{if(request instanceofServletServerHttpRequest){ServletServerHttpRequest servletServerHttpRequest =(ServletServerHttpRequest) request;// 模拟用户(通常利用JWT令牌解析用户信息)String userId = servletServerHttpRequest.getServletRequest().getParameter("uid");// TODO 判断用户是否存在
            attributes.put("uid", userId);returntrue;}returnfalse;}@OverridepublicvoidafterHandshake(@NonNullServerHttpRequest request,@NonNullServerHttpResponse response,@NonNullWebSocketHandler wsHandler,Exception exception){}}

WebSocket 无法使用 header 传递参数,因此这里使用 url params 携带参数。

WebSocket配置项

将自定义处理器、拦截器以及WebSocket操作类依次注入到IOC容器中。

@Configuration@EnableWebSocketpublicclassWebSocketConfigurationimplementsWebSocketConfigurer{@BeanpublicDefaultWebSocketHandlerdefaultWebSocketHandler(){returnnewDefaultWebSocketHandler();}@BeanpublicWebSocketwebSocket(){returnnewWebSocketImpl();}@BeanpublicWebSocketInterceptorwebSocketInterceptor(){returnnewWebSocketInterceptor();}@OverridepublicvoidregisterWebSocketHandlers(@NonNullWebSocketHandlerRegistry registry){
        registry.addHandler(defaultWebSocketHandler(),"ws/message").addInterceptors(webSocketInterceptor()).setAllowedOrigins("*");}}
  • @EnableWebSocket:开启WebSocket功能
  • addHandler:添加处理器
  • addInterceptors:添加拦截器
  • setAllowedOrigins:设置允许跨域(允许所有请求来源)

WebSocket测试

原生注解

原生注解-客户端
原生注解-服务端

Spring封装

Spring封装-客户端
Spring封装-服务端


本文转载自: https://blog.csdn.net/qq991658923/article/details/127022522
版权归原作者 人人都在发奋 所有, 如有侵权,请联系我们删除。

“Spring Boot 集成 WebSocket(原生注解与Spring封装)”的评论:

还没有评论