0


Spring boot 项目作为客户端调用 服务端websocket

文章目录

java客户端请求websocket

Spring boot 导入包

pom.xml 导入

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

客户端调用方法

测试执行方法
  • connectWebSocket- 连接websocket 客户端,并携带过期时间等参数
  • HandshakeMessage- 发送消息对象
  • sendHandshake- 发送消息
  • WebSocketConfig.queue.take- 队列信息,等待数据返回,并消费,获取websocket 返回的消息数据
publicStringtest(){try{WebSocketConfig.connectWebSocket();HandshakeMessage handshakeMessage =newHandshakeMessage();
        handshakeMessage.setMessage("test");
        handshakeMessage.setClientId(UUID.randomUUID().toString());
        handshakeMessage.setType("handshake");WebSocketConfig.sendHandshake(handshakeMessage);String take =WebSocketConfig.queue.take();System.out.println("test:"+ take);WebSocketConfig.close();}catch(InterruptedException ex){System.out.println("连接异常"+ ex.getMessage());}returnnull;}
方法对应实体类
publicclassHandshakeMessage{privateString type;privateString clientId;privateString message;publicStringgetType(){return type;}publicvoidsetType(String type){this.type = type;}publicStringgetClientId(){return clientId;}publicvoidsetClientId(String clientId){this.clientId = clientId;}publicStringgetMessage(){return message;}publicvoidsetMessage(String message){this.message = message;}}
配置 yaml 资源
websocket:url: ws://localhost:8080/websocket
WebSocketConfig 配置类
注入配置websocketUrl:

使用@Value注解将websocket.url注入到类的私有成员变量websocketUrl中。确保在类初始化时就设置好静态变量。

LinkedBlockingQueue

LinkedBlockingQueue是Java并发集合框架中的一种线程安全的队列实现,它继承自BlockingQueue接口。LinkedBlockingQueue使用链表结构来存储元素,并且提供了阻塞操作,可以在队列为空或满时自动阻塞生产者或消费者线程,直到队列变为非空或非满。

  • LinkedBlockingQueue的特点- 线程安全性:LinkedBlockingQueue是线程安全的,可以在多线程环境中安全地使用。- 阻塞操作:提供了put和take等阻塞方法,当队列满时调用put会阻塞,当队列为空时调用take会阻塞。- 容量可配置:LinkedBlockingQueue可以被初始化为一个固定容量的队列,也可以是一个无界队列(默认情况下,如果未指定容量,则容量为Integer.MAX_VALUE)。
connectWebSocket 连接
  • URI- URI类可以帮助你处理和解析Web地址,并确保这些地址格式正确。
  • WebSocketClientHandler- 继承 WebSocketClient 类 ,实现一些 websocket 方法重写
  • connectBlocking方法- connectBlocking方法用于建立WebSocket连接,并且在连接建立之前会阻塞当前线程。这通常用于确保连接完全建立后再继续执行后续操作。connectBlocking方法是org.java_websocket.client.WebSocketClient的一个扩展方法,它允许开发者在连接建立之前等待一段时间。- ###### connectBlocking方法有两个参数:- timeout:指定连接建立的最大等待时间。- unit:指定时间单位(如毫秒、秒、分钟等)。
sendMessage

sendMessage方法用于向WebSocket服务器发送文本消息
send 方法

  • 检查WebSocket连接状态: - 如果WebSocket连接尚未打开(!this.isOpen()),则抛出WebsocketNotConnectedException异常。这是因为只有在连接建立后才能发送数据。
  • 检查参数有效性: - 如果传入的frames参数为null,则抛出IllegalArgumentException异常。这是为了确保传入的数据是有效的。
  • 准备发送的数据帧: - 创建一个新的ArrayList来存储即将发送的二进制帧。- 遍历frames集合中的每一个Framedata对象。- 对于每一个Framedata对象,调用draft.createBinaryFrame(f)方法将其转换为ByteBuffer,然后添加到outgoingFrames列表中。- 在遍历过程中,通过日志记录每一步操作的信息(如果启用了日志的trace级别)。
  • 发送数据帧: - 最后,调用write方法,将准备好的outgoingFrames列表作为参数传递进去,完成实际的数据发送操作。publicvoidsend(String text){if(text ==null){thrownewIllegalArgumentException("Cannot send 'null' data to a WebSocketImpl.");}else{this.send((Collection)this.draft.createFrames(text,this.role ==Role.CLIENT));}}privatevoidsend(Collection<Framedata> frames){if(!this.isOpen()){thrownewWebsocketNotConnectedException();}elseif(frames ==null){thrownewIllegalArgumentException();}else{ArrayList<ByteBuffer> outgoingFrames =newArrayList();Iterator var3 = frames.iterator();while(var3.hasNext()){Framedata f =(Framedata)var3.next();this.log.trace("send frame: {}", f); outgoingFrames.add(this.draft.createBinaryFrame(f));}this.write((List)outgoingFrames);}}
close

关闭连接

packagecom.dog.websocket;importcom.alibaba.fastjson2.JSONObject;importorg.java_websocket.client.WebSocketClient;importorg.java_websocket.handshake.ServerHandshake;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;importjava.net.URI;importjava.net.URISyntaxException;importjava.util.ArrayDeque;importjava.util.Queue;importjava.util.concurrent.LinkedBlockingQueue;importjava.util.concurrent.TimeUnit;@ComponentpublicclassWebSocketConfig{privatestaticString websocketUrl;publicstaticLinkedBlockingQueue<String> queue =newLinkedBlockingQueue<>(1000);@Value("${websocket.url}")publicvoidsetWebsocketUrl(String websocketUrl){WebSocketConfig.websocketUrl = websocketUrl;}privatestaticWebSocketClient client;publicstaticvoidconnectWebSocket(){try{URI uri =newURI(websocketUrl);
            client =newWebSocketClientHandler(uri);
            client.connectBlocking(4000,TimeUnit.MINUTES);}catch(URISyntaxException|InterruptedException ex){
            ex.printStackTrace();thrownewRuntimeException("websocket 连接异常");}}/**
     * 直接发送信息
     * @param sendMessage
     */publicstaticvoidsendMessage(String sendMessage){
        client.send(sendMessage);}publicstaticvoidsendHandshake(HandshakeMessage handshakeMessage){String sendMessage =JSONObject.toJSONString(handshakeMessage);System.out.println(sendMessage);
        client.send(sendMessage);}publicvoidsendByteMessage(byte[] bytes){
        client.send(bytes);}/**
     * 连接关闭
     */publicstaticvoidclose(){if(client !=null&& client.isOpen()){
            client.close();}}}
WebSocketClientHandler 配置类

继承WebSocketClient 并重写了几个关键的方法来处理WebSocket连接的不同生命周期事件

onOpen 方法
@OverridepublicvoidonOpen(ServerHandshake serverHandshake){System.out.println("连接websocket 状态:"+ serverHandshake.getHttpStatus());}

当WebSocket连接成功建立时,这个方法会被调用。它打印出连接的状态码。

onMessage 方法
@OverridepublicvoidonMessage(String s){System.out.println("message: "+ s);try{// 尝试在一定时间内将消息放入队列if(!queue.offer(s,10,TimeUnit.SECONDS)){System.err.println("无法在规定时间内将消息放入队列");}}catch(InterruptedException e){Thread.currentThread().interrupt();System.err.println("向队列中添加消息时被中断");}}

当从WebSocket服务器接收到消息时,这个方法会被调用。它首先打印接收到的消息,然后尝试将消息放入WebSocketConfig.queue队列中。如果在向队列中添加消息时发生中断异常,则恢复中断状态并打印错误信息。

onClose 方法
@OverridepublicvoidonClose(int i,String s,boolean b){System.out.println("WebSocket连接已关闭: "+ s);}

当WebSocket连接关闭时,这个方法会被调用。它打印出关闭连接的原因。

onError 方法
@OverridepublicvoidonError(Exception ex){
    ex.printStackTrace();System.err.println("WebSocket发生错误: "+ ex.getMessage());}

当WebSocket连接发生错误时,这个方法会被调用。它打印出错误信息及其堆栈跟踪。

importorg.java_websocket.client.WebSocketClient;importorg.java_websocket.handshake.ServerHandshake;importorg.springframework.stereotype.Component;importjava.net.URI;importjava.util.concurrent.TimeUnit;importstaticcom.dog.websocket.WebSocketConfig.queue;publicclassWebSocketClientHandlerextendsWebSocketClient{publicWebSocketClientHandler(URI serverUri){super(serverUri);}@OverridepublicvoidonOpen(ServerHandshake serverHandshake){System.out.println("连接websocket 状态:"+ serverHandshake.getHttpStatus());}@OverridepublicvoidonMessage(String s){System.out.println("message: "+ s);try{// 尝试在一定时间内将消息放入队列if(!queue.offer(s,10,TimeUnit.SECONDS)){System.err.println("无法在规定时间内将消息放入队列");}}catch(InterruptedException e){Thread.currentThread().interrupt();System.err.println("向队列中添加消息时被中断");}}@OverridepublicvoidonClose(int i,String s,boolean b){System.out.println("WebSocket连接已关闭: "+ s);}@OverridepublicvoidonError(Exception ex){
        ex.printStackTrace();System.err.println("WebSocket发生错误: "+ ex.getMessage());}}

上面只是根据所需要自行调整

java服务端websocket

在上一篇博客已做详细简绍,不做补充
spring boot 项目 跟 JavaScript 简单 websocket 使用

WebSocketConfig 配置类
packagecom.ruoyi.common.utils.socket;importcom.ruoyi.common.utils.socket.handler.WebSocketHandler;importorg.springframework.context.annotation.Configuration;importorg.springframework.web.socket.config.annotation.EnableWebSocket;importorg.springframework.web.socket.config.annotation.WebSocketConfigurer;importorg.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;@Configuration@EnableWebSocketpublicclassWebSocketConfigimplementsWebSocketConfigurer{privatefinalWebSocketHandler webSocketHandler;publicWebSocketConfig(WebSocketHandler webSocketHandler){this.webSocketHandler = webSocketHandler;}@OverridepublicvoidregisterWebSocketHandlers(WebSocketHandlerRegistry registry){
        registry.addHandler(webSocketHandler,"/websocket").setAllowedOrigins("*");}}
WebSocketHandler 监听类
packagecom.ruoyi.common.utils.socket.handler;importcom.alibaba.fastjson2.JSON;importcom.alibaba.fastjson2.JSONObject;importcom.fasterxml.jackson.databind.ObjectMapper;importcom.ruoyi.common.utils.socket.HandshakeMessage;importorg.springframework.stereotype.Component;importorg.springframework.web.socket.CloseStatus;importorg.springframework.web.socket.TextMessage;importorg.springframework.web.socket.WebSocketSession;importorg.springframework.web.socket.handler.TextWebSocketHandler;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;@ComponentpublicclassWebSocketHandlerextendsTextWebSocketHandler{privatestaticfinalMap<String,WebSocketSession> clientSessions =newConcurrentHashMap<>();privatestaticfinalObjectMapper objectMapper =newObjectMapper();@OverridepublicvoidafterConnectionEstablished(WebSocketSession session)throwsException{super.afterConnectionEstablished(session);String sessionId = session.getId();System.out.println("WebSocket connection established with session ID: "+ sessionId);}@OverrideprotectedvoidhandleTextMessage(WebSocketSession session,TextMessage message)throwsException{String payload = message.getPayload();HandshakeMessage handshakeMessage = objectMapper.readValue(payload,HandshakeMessage.class);if("handshake".equals(handshakeMessage.getType())){String clientId = handshakeMessage.getClientId();String sessionId = session.getId();// 存储clientId与sessionId的映射关系
            clientSessions.put(clientId, session);
            handshakeMessage.setMessage("success");// 可以选择回复客户端确认握手成功的消息
            session.sendMessage(newTextMessage(JSON.toJSONString(handshakeMessage)));}}@OverridepublicvoidafterConnectionClosed(WebSocketSession session,CloseStatus status)throwsException{super.afterConnectionClosed(session, status);String sessionId = session.getId();System.out.println("WebSocket connection closed with session ID: "+ sessionId);// 移除会话
        clientSessions.values().removeIf(s -> s.getId().equals(sessionId));}publicvoidsendMessageToClient(String clientId,String message){WebSocketSession session = clientSessions.get(clientId);if(session !=null&& session.isOpen()){try{
                session.sendMessage(newTextMessage(message));}catch(Exception e){
                e.printStackTrace();}}}}

本文转载自: https://blog.csdn.net/ling_zhi_xin/article/details/141980618
版权归原作者 心之语歌 所有, 如有侵权,请联系我们删除。

“Spring boot 项目作为客户端调用 服务端websocket”的评论:

还没有评论