Spring Boot 集成 WebSocket
本章节将介绍 Spring Boot 集成 WebSocket 的两种主要方式:原生注解与Spring封装。
在线WebSocket测试工具
🤖 Spring Boot 2.x 实践案例(代码仓库)
原生注解
引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
配置文件
@ConfigurationpublicclassWebSocketConfiguration{/**
* 注入ServerEndpointExporter,
* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
*/@BeanpublicServerEndpointExporterserverEndpointExporter(){returnnewServerEndpointExporter();}}
处理消息
@Component@Slf4j@ServerEndpoint("/websocket/{userId}")publicclassWebSocket{/**
* 线程安全的无序的集合
*/privatestaticfinalCopyOnWriteArraySet<Session>SESSIONS=newCopyOnWriteArraySet<>();/**
* 存储在线连接数
*/privatestaticfinalMap<String,Session>SESSION_POOL=newHashMap<>();@OnOpenpublicvoidonOpen(Session session,@PathParam(value ="userId")String userId){try{SESSIONS.add(session);SESSION_POOL.put(userId, session);
log.info("【WebSocket消息】有新的连接,总数为:"+SESSIONS.size());}catch(Exception e){
e.printStackTrace();}}@OnClosepublicvoidonClose(Session session){try{SESSIONS.remove(session);
log.info("【WebSocket消息】连接断开,总数为:"+SESSIONS.size());}catch(Exception e){
e.printStackTrace();}}@OnMessagepublicvoidonMessage(String message){
log.info("【WebSocket消息】收到客户端消息:"+ message);}/**
* 此为广播消息
*
* @param message 消息
*/publicvoidsendAllMessage(String message){
log.info("【WebSocket消息】广播消息:"+ message);for(Session session :SESSIONS){try{if(session.isOpen()){
session.getAsyncRemote().sendText(message);}}catch(Exception e){
e.printStackTrace();}}}/**
* 此为单点消息
*
* @param userId 用户编号
* @param message 消息
*/publicvoidsendOneMessage(String userId,String message){Session session =SESSION_POOL.get(userId);if(session !=null&& session.isOpen()){try{synchronized(session){
log.info("【WebSocket消息】单点消息:"+ message);
session.getAsyncRemote().sendText(message);}}catch(Exception e){
e.printStackTrace();}}}/**
* 此为单点消息(多人)
*
* @param userIds 用户编号列表
* @param message 消息
*/publicvoidsendMoreMessage(String[] userIds,String message){for(String userId : userIds){Session session =SESSION_POOL.get(userId);if(session !=null&& session.isOpen()){try{
log.info("【WebSocket消息】单点消息:"+ message);
session.getAsyncRemote().sendText(message);}catch(Exception e){
e.printStackTrace();}}}}}
- @ServerEndpoint:将目前的类定义成一个websocket服务器端,注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
- @OnOpen:当WebSocket建立连接成功后会触发这个注解修饰的方法。
- @OnClose:当WebSocket建立的连接断开后会触发这个注解修饰的方法。
- @OnMessage:当客户端发送消息到服务端时,会触发这个注解修改的方法。
- @OnError:当WebSocket建立连接时出现异常会触发这个注解修饰的方法。
Spring封装
引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
自定义处理器
处理器作用类似于
@RequestMapping
注解,用于处理某一个路径的
WebSocket
连接,自定义处理器需要实现
WebSocketHandler
接口。
WebSocket操作类
publicinterfaceWebSocket{/**
* 会话开始回调
*
* @param session 会话
*/voidhandleOpen(WebSocketSession session);/**
* 会话结束回调
*
* @param session 会话
*/voidhandleClose(WebSocketSession session);/**
* 处理消息
*
* @param session 会话
* @param message 接收的消息
*/voidhandleMessage(WebSocketSession session,String message);/**
* 发送消息
*
* @param session 当前会话
* @param message 要发送的消息
* @throws IOException 发送io异常
*/voidsendMessage(WebSocketSession session,String message)throwsIOException;/**
* 发送消息
*
* @param userId 用户id
* @param message 要发送的消息
* @throws IOException 发送io异常
*/voidsendMessage(String userId,TextMessage message)throwsIOException;/**
* 发送消息
*
* @param userId 用户id
* @param message 要发送的消息
* @throws IOException 发送io异常
*/voidsendMessage(String userId,String message)throwsIOException;/**
* 发送消息
*
* @param session 当前会话
* @param message 要发送的消息
* @throws IOException 发送io异常
*/voidsendMessage(WebSocketSession session,TextMessage message)throwsIOException;/**
* 广播
*
* @param message 字符串消息
* @throws IOException 异常
*/voidbroadCast(String message)throwsIOException;/**
* 广播
*
* @param message 文本消息
* @throws IOException 异常
*/voidbroadCast(TextMessage message)throwsIOException;/**
* 处理会话异常
*
* @param session 会话
* @param error 异常
*/voidhandleError(WebSocketSession session,Throwable error);/**
* 获得所有的 websocket 会话
*
* @return 所有 websocket 会话
*/Set<WebSocketSession>getSessions();/**
* 得到当前连接数
*
* @return 连接数
*/intgetConnectionCount();}
WebSocket操作实现类
@Slf4jpublicclassWebSocketImplimplementsWebSocket{/**
* 在线连接数(线程安全)
*/privatefinalAtomicInteger connectionCount =newAtomicInteger(0);/**
* 线程安全的无序集合(存储会话)
*/privatefinalCopyOnWriteArraySet<WebSocketSession> sessions =newCopyOnWriteArraySet<>();@OverridepublicvoidhandleOpen(WebSocketSession session){
sessions.add(session);int count = connectionCount.incrementAndGet();
log.info("a new connection opened,current online count:{}", count);}@OverridepublicvoidhandleClose(WebSocketSession session){
sessions.remove(session);int count = connectionCount.decrementAndGet();
log.info("a new connection closed,current online count:{}", count);}@OverridepublicvoidhandleMessage(WebSocketSession session,String message){// 只处理前端传来的文本消息,并且直接丢弃了客户端传来的消息
log.info("received a message:{}", message);}@OverridepublicvoidsendMessage(WebSocketSession session,String message)throwsIOException{this.sendMessage(session,newTextMessage(message));}@OverridepublicvoidsendMessage(String userId,TextMessage message)throwsIOException{Optional<WebSocketSession> userSession = sessions.stream().filter(session ->{if(!session.isOpen()){returnfalse;}Map<String,Object> attributes = session.getAttributes();if(!attributes.containsKey("uid"){returnfalse;}String uid =(String) attributes.get("uid");return uid.equals(userId);}).findFirst();if(userSession.isPresent()){
userSession.get().sendMessage(message);}}@OverridepublicvoidsendMessage(String userId,String message)throwsIOException{this.sendMessage(userId,newTextMessage(message));}@OverridepublicvoidsendMessage(WebSocketSession session,TextMessage message)throwsIOException{
session.sendMessage(message);}@OverridepublicvoidbroadCast(String message)throwsIOException{for(WebSocketSession session : sessions){if(!session.isOpen()){continue;}this.sendMessage(session, message);}}@OverridepublicvoidbroadCast(TextMessage message)throwsIOException{for(WebSocketSession session : sessions){if(!session.isOpen()){continue;}
session.sendMessage(message);}}@OverridepublicvoidhandleError(WebSocketSession session,Throwable error){
log.error("websocket error:{},session id:{}", error.getMessage(), session.getId());
log.error("", error);}@OverridepublicSet<WebSocketSession>getSessions(){return sessions;}@OverridepublicintgetConnectionCount(){return connectionCount.get();}}
自定义WebSocket处理器
publicclassDefaultWebSocketHandlerimplementsWebSocketHandler{@AutowiredprivateWebSocket webSocket;/**
* 建立连接
*
* @param session Session
*/@OverridepublicvoidafterConnectionEstablished(@NonNullWebSocketSession session){
webSocket.handleOpen(session);}/**
* 接收消息
*
* @param session Session
* @param message 消息
*/@OverridepublicvoidhandleMessage(@NonNullWebSocketSession session,@NonNullWebSocketMessage<?> message){if(message instanceofTextMessage){TextMessage textMessage =(TextMessage) message;
webSocket.handleMessage(session, textMessage.getPayload());}}/**
* 发生错误
*
* @param session Session
* @param exception 异常
*/@OverridepublicvoidhandleTransportError(WebSocketSession session,Throwable exception){
webSocket.handleError(session, exception);}/**
* 关闭连接
*
* @param session Session
* @param closeStatus 关闭状态
*/@OverridepublicvoidafterConnectionClosed(@NonNullWebSocketSession session,@NonNullCloseStatus closeStatus){
webSocket.handleClose(session);}/**
* 是否支持发送部分消息
*
* @return false
*/@OverridepublicbooleansupportsPartialMessages(){returnfalse;}}
自定义拦截器
自定义处理器需要实现
HandshakeInterceptor
接口
publicclassWebSocketInterceptorimplementsHandshakeInterceptor{@OverridepublicbooleanbeforeHandshake(@NonNullServerHttpRequest request,@NonNullServerHttpResponse response,@NonNullWebSocketHandler wsHandler,@NonNullMap<String,Object> attributes)throwsException{if(request instanceofServletServerHttpRequest){ServletServerHttpRequest servletServerHttpRequest =(ServletServerHttpRequest) request;// 模拟用户(通常利用JWT令牌解析用户信息)String userId = servletServerHttpRequest.getServletRequest().getParameter("uid");// TODO 判断用户是否存在
attributes.put("uid", userId);returntrue;}returnfalse;}@OverridepublicvoidafterHandshake(@NonNullServerHttpRequest request,@NonNullServerHttpResponse response,@NonNullWebSocketHandler wsHandler,Exception exception){}}
WebSocket 无法使用 header 传递参数,因此这里使用 url params 携带参数。
WebSocket配置项
将自定义处理器、拦截器以及WebSocket操作类依次注入到IOC容器中。
@Configuration@EnableWebSocketpublicclassWebSocketConfigurationimplementsWebSocketConfigurer{@BeanpublicDefaultWebSocketHandlerdefaultWebSocketHandler(){returnnewDefaultWebSocketHandler();}@BeanpublicWebSocketwebSocket(){returnnewWebSocketImpl();}@BeanpublicWebSocketInterceptorwebSocketInterceptor(){returnnewWebSocketInterceptor();}@OverridepublicvoidregisterWebSocketHandlers(@NonNullWebSocketHandlerRegistry registry){
registry.addHandler(defaultWebSocketHandler(),"ws/message").addInterceptors(webSocketInterceptor()).setAllowedOrigins("*");}}
- @EnableWebSocket:开启WebSocket功能
- addHandler:添加处理器
- addInterceptors:添加拦截器
- setAllowedOrigins:设置允许跨域(允许所有请求来源)
WebSocket测试
原生注解
Spring封装
版权归原作者 人人都在发奋 所有, 如有侵权,请联系我们删除。