一、WebSocket通信过程
客户端构建一个websocket实例,并且为它绑定一个需要连接到的服务器地址,当客户端连接服务端的候,会向服务端发送一个http get报文,告诉服务端需要将通信协议切换到websocket,服务端收到http请求后将通信协议切换到websocket,同时发给客户端一个响应报文,返回的状态码为101,表示同意客户端协议转请求,并转换为websocket协议。以上过程都是利用http通信完成的,称之为websocket协议握手(websocket Protocol handshake),经过握手之后,客户端和服务端就建立了websocket连接,以后的通信走的都是websocket协议了。
二、服务端实现
1.pom文件添加依赖
<!--webSocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.启用Springboot对WebSocket的支持
package com.lby.websocket.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author Liby
* @date 2022-04-25 16:18
* @description:
* @version:
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3.核心配置:WebSocketServer
因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的WebSocketServer其实就相当于一个ws协议的Controller
@ ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端, 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
新建一个ConcurrentHashMap webSocketMap 用于接收当前userId的WebSocket,方便传递之间对userId进行推送消息。
下面是具体业务代码:
package org.example.server;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
/*
启动项目后可通过 http://websocket.jsonin.com/ 进行测试
输入网址ws://localhost:8080/websocket/1211,检测是否能进行连接
*/
@ServerEndpoint(value = "/websocket/{userId}")
@Component
public class WebSocket {
private final static Logger logger = LogManager.getLogger(WebSocket.class);
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象
*/
private static ConcurrentHashMap<String, WebSocket> webSocketMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
private String userId;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
this.userId = userId;
//加入map
webSocketMap.put(userId, this);
addOnlineCount(); //在线数加1
logger.info("用户{}连接成功,当前在线人数为{}", userId, getOnlineCount());
try {
sendMessage(String.valueOf(this.session.getQueryString()));
} catch (IOException e) {
logger.error("IO异常");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
//从map中删除
webSocketMap.remove(userId);
subOnlineCount(); //在线数减1
logger.info("用户{}关闭连接!当前在线人数为{}", userId, getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
logger.info("来自客户端用户:{} 消息:{}",userId, message);
//群发消息
for (String item : webSocketMap.keySet()) {
try {
webSocketMap.get(item).sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 发生错误时调用
*
*/
@OnError
public void onError(Session session, Throwable error) {
logger.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 向客户端发送消息
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
//this.session.getAsyncRemote().sendText(message);
}
/**
* 通过userId向客户端发送消息
*/
public void sendMessageByUserId(String userId, String message) throws IOException {
logger.info("服务端发送消息到{},消息:{}",userId,message);
if(StrUtil.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
webSocketMap.get(userId).sendMessage("hello");
}else{
logger.error("用户{}不在线",userId);
}
}
/**
* 群发自定义消息
*/
public void sendInfo(String message) throws IOException {
for (String item : webSocketMap.keySet()) {
try {
webSocketMap.get(item).sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocket.onlineCount--;
}
}
三、客户端实现
1.pom文件添加依赖
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.3</version>
</dependency>
2.客户端实现代码
package org.example;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author kele
* @date 2024/2/19
**/
@Slf4j
public class MyWebSocketClient extends WebSocketClient {
public MyWebSocketClient(URI serverUri) {
super(serverUri);
}
@SneakyThrows
@Override
public void onOpen(ServerHandshake data) {
try {
log.info("WebSocket连接已打开。");
}catch (Exception e){
log.error("onOpen error :{}",e.getMessage());
}
}
@SneakyThrows
@Override
public void onMessage(String message) {
try {
if (message != null && !message.isEmpty()) {
log.info("收到消息: {}",message);
}
}catch (Exception e){
log.error("onMessage error : {}",message);
}
}
@Override
public void onClose(int code, String reason, boolean remote) {
log.info("WebSocket连接已关闭。");
}
@Override
public void onError(Exception ex) {
log.info("WebSocket连接发生错误:{}", ex.getMessage());
}
/**
* 连接定时检查
*/
public void startReconnectTask(long delay, TimeUnit unit) {
System.out.println("WebSocket 心跳检查");
log.info("WebSocket 心跳检查");
// 以下为定时器,建议使用自定义线程池,或交给框架处理(spring)
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleWithFixedDelay(() -> {
// 检查逻辑:判断当前连接是否连通
if (!this.isOpen()) {
System.out.println("WebSocket 开始重连......");
log.info("WebSocket 开始重连......");
// 重置连接
this.reconnect();
// 以下为错误示范
//this.close();
//this.connect();
}
}, 0, delay, unit);
}
}
3.开启客户端连接服务,并开启定时检查websocket连接
package org.example;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.URI;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class Init implements Runnable {
public static MyWebSocketClient myWebSocketClient;
@PostConstruct
public void run () {
try {
//启动连接
log.info("连接websocket服务端");
log.info("项目启动");
// 服务地址
URI uri = new URI("ws://127.0.0.1:8077/websocket/123");
log.info("服务地址 -{}", uri);
// 创建客户端
myWebSocketClient = new MyWebSocketClient(uri);
// 建立连接
myWebSocketClient.connect();
// 开启 定时检查
myWebSocketClient.startReconnectTask(5, TimeUnit.SECONDS);
}catch (Exception e){
e.printStackTrace();
}
}
}
四、可能遇到的问题及解决方案
1.websocket客户端重连机制
注意引入Java-websocket的版本要高于1.5.0才有reconnect()方法
2.在使用websocket连接传输Base64编码时出现错误
原因:Websocket中maxMessageSize默认是8K,接收的图片大于8K导致接收失败,解决方法:适当加大maxMessageSize。
- 注意不能设置太大,避免内存溢出。参考博客1,博客2,博客3
@OnMessage(maxMessageSize = 10240000)
public void onMessage(byte[] message) throws IOException {
log.info("收到图片了");
String str = Base64.encodeBase64String(message); // 图片base64
log.info(str.substring(0, 100));
}
版权归原作者 uni可乐 所有, 如有侵权,请联系我们删除。