前言
WebScoket是Web应用程序的传输协议,它提供了双向的、按序到达的数据流。
他是一个HTML5协议,WebSocket的连接是持久的,他通过在客户端和服务器之间保持双工连接,服务器的更新可以被及时推送给客户端,而不需要客户端以一定时间间隔去轮询
- 建立在TCP协议之上,服务端的实现比较容易。
- 与HTTP协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用HTTP协议,因此握手时不容易屏蔽,能通过各种HTTP代理服务器。
- 数据格式比较轻量,性能开销小,通信高效。
- 可以发送文本,也可以发送二进制数据。
- 没有同源限制,客户端可以与任意服务器通信。
- 协议标识符是ws(如果加密,则为wss),服务器网址就是URL。
前端
<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><title>实时监控</title><style>.item{display: flex;border-bottom: 1px solid #000000;justify-content: space-between;width: 30%;line-height: 50px;height: 50px;}.item span:nth-child(2){margin-right: 10px;margin-top: 15px;width: 20px;height: 20px;border-radius: 50%;background: #55ff00;}.nowI{background: #ff0000 !important;}</style></head><body><divid="app"><divv-for="item in list"class="item"><span>{{item.id}}.{{item.name}}</span><span:class='item.state==-1?"nowI":""'></span></div></div><scriptsrc="https://cdn.jsdelivr.net/npm/[email protected]/dist/vue.min.js"></script><scripttype="text/javascript">var vm =newVue({el:"#app",data:{list:[{id:1,name:'张三',state:1},{id:2,name:'李四',state:1,},{id:3,name:'王五',state:1},{id:4,name:'韩梅梅',state:1},{id:5,name:'李磊',state:1},]}})var webSocket =null;7if('WebSocket'in window){var uid =getUUID()//创建WebSocket对象
webSocket =newWebSocket("ws://localhost:8080/websocket/"+ uid);//连接成功
webSocket.onopen=function(){
console.log("已连接");
webSocket.send("消息发送测试")
document.write(uid)}//接收到消息
webSocket.onmessage=function(msg){//处理消息var serverMsg = msg.data;var t_id =parseInt(serverMsg)//服务端发过来的消息,ID,string需转化为int类型才能比较for(var i =0; i < vm.list.length; i++){var item = vm.list[i];if(item.id === t_id){
item.state =-1;
vm.list.splice(i,1, item)break;}}};//关闭事件
webSocket.onclose=function(){
console.log("websocket已关闭");};//发生了错误事件
webSocket.onerror=function(){
console.log("websocket发生了错误");}}else{alert("很遗憾,您的浏览器不支持WebSocket!")}functiongetUUID(){//获取唯一的UUIDreturn'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g,function(c){var r = Math.random()*16|0,
v = c ==='x'? r :(r &0x3|0x8);return v.toString(16);});}</script></body></html>
后端
依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
1、配置application.yml
server:port:8080mySocket:myPwd: jae_123
2、WebSocketConfig配置类
注入ServerEndpointExporter
@ConfigurationpublicclassWebsocketConfig{/**
* 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
* @author Fang Ruichuan
* @date 2022/9/24 9:19
* @return ServerEndpointExporter
*/@BeanpublicServerEndpointExporterserverEndpointExporter(){returnnewServerEndpointExporter();}}
3、WebsocketServer类
用来进行服务端和客户端之间的交互
@Slf4j@Service@ServerEndpoint("/websocket/{uid}")publicclassWebSocketServer{privatestaticfinallong sessionTimeout =60000;// 用来存放每个客户端对应的WebSocketServer对象privatestaticMap<String,WebSocketServer> webSocketMap =newConcurrentHashMap<>();// 与某个客户端的连接会话,需要通过它来给客户端发送数据privateSession session;// 接收idprivateString uid;// 连接建立成功调用的方法@OnOpenpublicvoidonOpen(Session session,@PathParam("uid")String uid){
session.setMaxIdleTimeout(sessionTimeout);this.session = session;this.uid = uid;if(webSocketMap.containsKey(uid)){
webSocketMap.remove(uid);}
webSocketMap.put(uid,this);
log.info("websocket连接成功编号uid: "+ uid +",当前在线数: "+getOnlineClients());try{sendMessage(WebSocketMessageEnum.CONNECT_SUCCESS.getJsonValue().toJSONString());}catch(IOException e){
log.error("websocket发送连接成功错误编号uid: "+ uid +",网络异常!!!");}}// 连接关闭调用的方法@OnClosepublicvoidonClose(){try{if(webSocketMap.containsKey(uid)){
webSocketMap.remove(uid);}
log.info("websocket退出编号uid: "+ uid +",当前在线数为: "+getOnlineClients());}catch(Exception e){
log.error("websocket编号uid连接关闭错误: "+ uid +",原因: "+ e.getMessage());}}/**
* 收到客户端消息后调用的方法
* @author Fang Ruichuan
* @date 2022/9/24 9:44
* @param message 客户端发送过来的消息
* @param session
*/@OnMessagepublicvoidonMessage(String message,Session session){
log.info("websocket收到客户端编号uid消息: "+ uid +", 报文: "+ message);}/**
* 发生错误时调用
* @author Fang Ruichuan
* @date 2022/9/24 9:46
* @param session
* @param error
*/@OnErrorpublicvoidonError(Session session,Throwable error){
log.error("websocket编号uid错误: "+this.uid +"原因: "+ error.getMessage());
error.printStackTrace();}/**
* 单机使用,外部接口通过指定的客户id向该客户推送消息
* @author Fang Ruichuan
* @date 2022/9/24 9:49
* @param key
* @param message
* @return boolean
*/publicstaticbooleansendMessageByWayBillId(@NotNullString key,String message){WebSocketServer webSocketServer = webSocketMap.get(key);if(Objects.nonNull(webSocketServer)){try{
webSocketServer.sendMessage(message);
log.info("websocket发送消息编号uid为: "+ key +"发送消息: "+ message);returntrue;}catch(Exception e){
log.error("websocket发送消息失败编号uid为: "+ key +"消息: "+ message);returnfalse;}}else{
log.error("websocket未连接编号uid号为: "+ key +"消息: "+ message);returnfalse;}}// 群发自定义消息publicstaticvoidsendInfo(String message){
webSocketMap.forEach((k, v)->{WebSocketServer webSocketServer = webSocketMap.get(k);try{
webSocketServer.sendMessage(message);
log.info("websocket群发消息编号uid为: "+ k +",消息: "+ message);}catch(IOException e){
log.error("群发自定义消息失败: "+ k +",message: "+ message);}});}/**
* 服务端群发消息-心跳包
* @author Fang Ruichuan
* @date 2022/9/24 10:54
* @param message
* @return int
*/publicstaticsynchronizedintsendPing(String message){if(webSocketMap.size()<=0){return0;}StringBuffer uids =newStringBuffer();AtomicInteger count =newAtomicInteger();
webSocketMap.forEach((uid, server)->{
count.getAndIncrement();if(webSocketMap.containsKey(uid)){WebSocketServer webSocketServer = webSocketMap.get(uid);try{
webSocketServer.sendMessage(message);if(count.equals(webSocketMap.size()-1)){
uids.append("uid");return;// 跳出本次循环}
uids.append(uid).append(",");}catch(IOException e){
webSocketMap.remove(uid);
log.info("客户端心跳检测异常移除: "+ uid +",心跳发送失败,已移除!");}}else{
log.info("客户端心跳检测异常不存在: "+ uid +",不存在!");}});
log.info("客户端心跳检测结果: "+ uids +"连接正在运行");return webSocketMap.size();}// 实现服务器主动推送publicvoidsendMessage(String message)throwsIOException{this.session.getBasicRemote().sendText(message);}// 获取客户端在线数publicstaticsynchronizedintgetOnlineClients(){if(Objects.isNull(webSocketMap)){return0;}else{return webSocketMap.size();}}/**
* 连接是否存在
* @author Fang Ruichuan
* @date 2022/9/24 10:48
* @param uid
* @return boolean
*/publicstaticbooleanisConnected(String uid){if(Objects.nonNull(webSocketMap)&& webSocketMap.containsKey(uid)){returntrue;}else{returnfalse;}}}
4、控制器,用于进行接口测试
提示:实际开发有个密码校验
@RestControllerpublicclassWebSocketControllerimplementsInitializingBean{@Value("${mySocket.myPwd}")privateString myPwd;publicstaticString MY_PWD;@OverridepublicvoidafterPropertiesSet(){
MY_PWD = myPwd;}/**
* webSocket链接是否成功
* @author Fang Ruichuan
* @date 2022/9/24 15:18
* @param webSocketId
* @return ResponseWrapper<Boolean>
*/@GetMapping("/webSocketIsConnect/{webSocketId}")publicResponseWrapper<Boolean>webSocketIsConnect(@PathVariable("webSocketId")String webSocketId){returnResponseWrapper.success(WebSocketServer.isConnected(webSocketId));}/**
* webSocket发送客户端消息
* @author Fang Ruichuan
* @date 2022/9/24 15:18
* @param webSocketId
* @param message
* @return ResponseWrapper<Boolean>
*/@GetMapping("/sendMessageByWayBillId")publicResponseWrapper<Boolean>sendMessageByWayBillId(String webSocketId,String message,String pwd){boolean flag =false;if(MY_PWD.equals(pwd)){
flag =WebSocketServer.sendMessageByWayBillId(webSocketId, message);}returnResponseWrapper.success(flag);}/**
* 群发消息
* @author Fang Ruichuan
* @date 2022/9/24 16:18
* @param message
* @param pwd
*/@GetMapping("/broadSendInfo")publicvoidsendInfo(String message,String pwd){if(MY_PWD.equals(pwd)){WebSocketServer.sendInfo(message);}}}
心跳机制
心跳机制其实只要看词就能大概了解,就是类似一个轮询的机制,必要时向对方轮询情况的一种操作。
Websocket是前后端交互的长连接,前后端也都可能因为一些情况导致连接失效并且相互之间没有了反应。因此为了保证连接的可持续性和稳定性,WebSocket心跳机制就应运而生。
后端定时任务
@Component@Slf4j@EnableSchedulingpublicclassWebSocketTask{/**
* 每1秒进行一次websocket心跳检测
* @author Fang Ruichuan
* @date 2022/9/24 11:31
*/@Scheduled(cron ="0/4 * * * * ?")publicvoidclearOrders(){int num =0;try{JSONObject jsonObject =WebSocketMessageEnum.HEART_CHECK.getJsonValue();
num =WebSocketServer.sendPing(jsonObject.toJSONString());}finally{
log.info("websocket心跳检测结果,共【"+ num +"】个连接");}}}
测试
我们打开三个客户端
后端控制台运行结果
版权归原作者 心潮的滴滴 所有, 如有侵权,请联系我们删除。