0


WebSocket vs SSE: 实时数据推送到前端的选择与实现(详细)

Websocket和Server-Sent Events 对比推送数据给前端及各自的实现

在现代 Web 应用程序中,实时数据推送给前端变得越来越重要。无论是实时聊天、实时通知还是仪表板上的实时更新,都需要一种有效的方式来将数据推送给前端。本文将介绍两种常用的实现方法:WebSocket 和 Server-Sent Events(SSE),并提供详细的实现步骤。

二者对比

WebSocket 和 Server-Sent Events (SSE) 都是用于实现实时数据推送的技术,但它们在设计、用途和实现上有一些重要的区别。让我们详细比较这两种技术。

WebSocket:

  1. 双向通信:- WebSocket 允许双向通信,客户端和服务器都可以在任何时候向对方发送数据。- 这使得 WebSocket 非常适用于需要双向交互的应用,如在线聊天、多人协作工具等。
  2. 持久连接:- WebSocket 建立持久连接,客户端和服务器之间的连接保持打开状态。- 这减少了与建立和关闭连接相关的开销,适用于频繁的数据交换。
  3. 低延迟:- 由于持久连接,WebSocket 可以实现低延迟的实时数据传输,适用于需要快速响应的应用。
  4. 复杂性:- 实现 WebSocket 可能相对复杂,需要更多的服务器资源和额外的协议处理。
  5. 跨域通信:- WebSocket 通常需要配置服务器以允许跨域通信,因为它们使用自定义协议。
  6. 浏览器支持:- WebSocket 在现代浏览器中得到广泛支持。

Server-Sent Events (SSE):

  1. 单向通信:- SSE 是一种单向通信,只允许服务器向客户端发送数据。客户端无法向服务器发送数据。
  2. HTTP 协议:- SSE 建立在 HTTP 协议之上,使用标准 HTTP 请求和响应。- 这使得 SSE 更容易部署,因为它与现有的 HTTP 基础设施兼容。
  3. 简单性:- SSE 的实现相对简单,服务器和客户端都不需要太多复杂的逻辑。
  4. 无需专用库:- SSE 不需要额外的库或协议处理,客户端可以使用浏览器的原生 EventSource API 来接收数据。
  5. 跨域通信:- SSE 支持跨域通信,可以通过 CORS(跨域资源共享)机制进行配置。
  6. 浏览器支持:- SSE 在现代浏览器中也得到广泛支持,但与 WebSocket 相比,它的历史要长一些。

选择 WebSocket 还是 SSE:

  • WebSocket 适用于需要双向通信和低延迟的场景,例如在线游戏、实时聊天应用等。
  • SSE 适用于单向服务器到客户端的实时数据推送,例如新闻更新、实时股票报价、天气预报等,特别是当你希望使用现有的 HTTP 基础设施时。
  • 在某些情况下,你甚至可以同时使用 WebSocket 和 SSE,根据不同的需求选择合适的技术。

无论选择哪种技术,都需要考虑你的应用程序的具体需求和复杂性。WebSocket 提供了更多的灵活性和功能,而 SSE 更加简单和易于部署。最终的选择取决于你的项目目标和资源。

Websocket 实现

使用原生 WebSocket API:

  1. 简单性:- Spring Boot 提供了对原生 WebSocket API 的支持,使得创建 WebSocket 应用相对简单。- 开发人员可以直接使用 Java 标准库中的 WebSocket 相关类来处理 WebSocket 通信。
  2. 依赖:- 原生 WebSocket 不需要额外的依赖,因为 WebSocket API 已经包含在 Java 标准库中。
  3. 性能:- 原生 WebSocket API 在性能方面表现良好,适用于大多数中小型应用。
  4. 生态系统:- 使用原生 WebSocket 可以更容易地集成到现有的 Spring Boot 生态系统中,例如 Spring Security 等。
  5. 简单应用:- 当你需要创建相对简单的 WebSocket 应用时,原生 WebSocket 是一个不错的选择。

使用 Netty 创建 WebSocket:

  1. 灵活性:- Netty 是一个高度可定制的异步事件驱动框架,它可以用于创建各种网络应用,包括 WebSocket。- Netty 提供了更多的灵活性和自定义选项,适用于复杂的 WebSocket 应用。
  2. 性能:- Netty 以其高性能和低延迟而闻名,适用于需要处理大量并发连接的应用。
  3. 协议支持:- Netty 支持多种协议,不仅限于 WebSocket。这意味着你可以在同一个应用程序中处理多种网络通信需求。
  4. 集成:- 尽管 Netty 可以集成到 Spring Boot 中,但其集成可能需要更多的配置和代码。
  5. 复杂应用:- 当你需要处理复杂的 WebSocket 场景,如高并发、自定义协议、复杂的消息处理等时,使用 Netty 是更好的选择。

总结和选择:

选择原生 WebSocket 还是使用 Netty 创建 WebSocket 应取决于你的项目需求和复杂性:

  • 如果你的应用相对简单,对性能要求不是很高,可以考虑使用原生 WebSocket API,它更容易上手并且不需要额外的依赖。
  • 如果你的应用需要处理高并发、复杂的协议、自定义消息处理或需要最大程度的性能和灵活性,那么使用 Netty 创建 WebSocket 可能更合适。Netty 为你提供了更多的控制权和自定义选项。

无论你选择哪种方法,Spring Boot 都提供了良好的支持,使得在应用中集成 WebSocket 变得相对容易。因此,你可以根据具体的项目需求来选择适合你的方法。

Netty 实现 Websocket

  • 添加 maven 坐标<!-- netty --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-common</artifactId> <version>4.1.79.Final</version> </dependency>
  • 创建 NettyWebsocketServerpackagecom.todoitbo.baseSpringbootDasmart.netty.server;importcom.todoitbo.baseSpringbootDasmart.netty.handler.HeartbeatHandler;importcom.todoitbo.baseSpringbootDasmart.netty.handler.WebSocketHandler;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.*;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioServerSocketChannel;importio.netty.handler.codec.http.HttpObjectAggregator;importio.netty.handler.codec.http.HttpServerCodec;importio.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;importio.netty.handler.stream.ChunkedWriteHandler;importio.netty.handler.timeout.IdleStateHandler;importio.netty.handler.traffic.ChannelTrafficShapingHandler;/** * @author xiaobo * @date 2023/9/5 */publicclassNettyWebsocketServer{privatefinalint port;publicNettyWebsocketServer(int port){this.port = port;}publicvoidrun()throwsException{EventLoopGroup bossGroup =newNioEventLoopGroup(1);// 创建用于接受客户端连接的 boss 线程池 EventLoopGroup workerGroup =newNioEventLoopGroup();// 创建用于处理客户端请求的 worker 线程池 try{ServerBootstrap b =newServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(newChannelInitializer<SocketChannel>(){@OverridepublicvoidinitChannel(SocketChannel ch)throwsException{ChannelTrafficShapingHandler trafficShapingHandler =newChannelTrafficShapingHandler(1,// 读取速率限制(字节/秒) 1,// 写入速率限制(字节/秒) 1,// 流量检查时间间隔(毫秒) 1// 最大允许的时间窗口(毫秒) );ChannelPipeline pipeline = ch.pipeline();// 添加心跳检测处理器,3秒内没有读写事件将触发 IdleStateEvent,下面的顺序错了也会出现问题的 pipeline.addLast(newIdleStateHandler(30,0,0)); pipeline.addLast(newHeartbeatHandler()); pipeline.addLast(newHttpServerCodec());// 处理 HTTP 请求 pipeline.addLast(newChunkedWriteHandler());// 写大数据流的处理器 pipeline.addLast(newHttpObjectAggregator(8192));// 将 HTTP 消息聚合为 FullHttpRequest 或 FullHttpResponse // pipeline.addLast(new WebSocketFrameAggregator(8192)); // 将 HTTP 消息聚合为 FullHttpRequest 或 FullHttpResponse // pipeline.addLast(new WebSocketServerCompressionHandler()); // 消息压缩 pipeline.addLast(newWebSocketHandler());// 自定义 WebSocket 处理器 pipeline.addLast(newWebSocketServerProtocolHandler("/ws",null,true,65536*10));// 处理 WebSocket 升级握手和数据帧处理 }}).option(ChannelOption.SO_BACKLOG,128)// 设置服务器接受队列大小 .childOption(ChannelOption.SO_KEEPALIVE,true);// 开启 TCP 连接的 Keep-Alive 功能 // Bind and start to accept incoming connections. System.out.println("TCP server started successfully");ChannelFuture f = b.bind(port).sync();// 绑定端口并等待绑定完成 // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); // 阻塞直到服务器关闭 }finally{// 优雅地关闭线程池 workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully();}}}

这里需要注意一下,

pipeline.addLast

的顺序不一致可能会导致程序报错,运行时

  • 创建心跳 handlepackagecom.todoitbo.baseSpringbootDasmart.netty.handler;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandlerAdapter;importio.netty.handler.timeout.IdleState;importio.netty.handler.timeout.IdleStateEvent;publicclassHeartbeatHandlerextendsChannelInboundHandlerAdapter{int readTimeOut =0;@OverridepublicvoiduserEventTriggered(ChannelHandlerContext ctx,Object evt)throwsException{IdleStateEvent event =(IdleStateEvent) evt;if(event.state()==IdleState.READER_IDLE){ readTimeOut++;}if(readTimeOut >=3){System.out.println("超时超过3次,断开连接"); ctx.close();}}}
  • 创建WebSocketHandlerpackagecom.todoitbo.baseSpringbootDasmart.netty.handler;importcn.hutool.core.collection.CollectionUtil;importcom.todoitbo.baseSpringbootDasmart.netty.NamedChannelGroup;importio.netty.buffer.ByteBuf;importio.netty.buffer.Unpooled;importio.netty.channel.*;importio.netty.handler.codec.http.*;importio.netty.handler.codec.http.websocketx.*;importio.netty.util.AttributeKey;importio.netty.util.CharsetUtil;importlombok.extern.slf4j.Slf4j;importjava.util.HashMap;importjava.util.List;importjava.util.Map;/** * @author xiaobo */@Slf4jpublicclassWebSocketHandlerextendsSimpleChannelInboundHandler<Object>{privateWebSocketServerHandshaker handshaker;publicstaticfinalAttributeKey<String>USER_ID_KEY=AttributeKey.valueOf("userId");publicstaticfinalAttributeKey<String>GROUP_ID_KEY=AttributeKey.valueOf("groupId");privatestaticfinalMap<Channel,String>WORK_CHANNEL_MAP=newHashMap<Channel,String>();@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{ log.info("与客户端建立连接,通道开启!");// 添加到channelGroup通道组(广播) // 之后可以根据ip来进行分组 NamedChannelGroup.getChannelGroup("default").add(ctx.channel());}@OverridepublicvoidchannelInactive(ChannelHandlerContext ctx)throwsException{ log.info("与客户端断开连接,通道关闭!");// 从channelGroup通道组(广播)中删除 // 之后可以根据ip来进行分组 Channel channel = ctx.channel();NamedChannelGroup.getChannelGroup("default").remove(channel);WORK_CHANNEL_MAP.remove(channel);}publicbooleanuserAuthentication(ChannelHandlerContext ctx,FullHttpRequest req){// 提取URI参数 QueryStringDecoder queryStringDecoder =newQueryStringDecoder(req.uri());Map<String,List<String>> parameters = queryStringDecoder.parameters();// 根据参数进行处理 List<String> userId = parameters.get("userId");List<String> groupId = parameters.get("groupId");if(CollectionUtil.isNotEmpty(userId)&&CollectionUtil.isNotEmpty(groupId)){ ctx.channel().attr(USER_ID_KEY).set(userId.get(0)); ctx.channel().attr(GROUP_ID_KEY).set(groupId.get(0));returntrue;}else{returnfalse;}}privatevoidhandleHttpRequest(ChannelHandlerContext ctx,FullHttpRequest req){// 检查是否升级到WebSocket if(!req.decoderResult().isSuccess()||(!"websocket".equals(req.headers().get("Upgrade")))){// 如果不是WebSocket协议的握手请求,返回400 Bad Request响应 sendHttpResponse(ctx, req,newDefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));return;}// 构建握手响应 WebSocketServerHandshakerFactory wsFactory =newWebSocketServerHandshakerFactory(getWebSocketLocation(req),null,true); handshaker = wsFactory.newHandshaker(req);if(handshaker ==null){// 如果不支持WebSocket版本,返回HTTP 426 Upgrade Required响应 WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());}else{ handshaker.handshake(ctx.channel(), req);// 进行WebSocket握手 // 在认证成功后,设置用户ID到Channel属性中 boolean authentication =userAuthentication(ctx,req);// 这里需要实现用户认证逻辑 if(!authentication){// 用户认证失败,可能需要关闭连接或发送认证失败消息 // 1. 关闭连接: ctx.close();// 2. 发送认证失败消息给客户端: String failureMessage ="认证失败,请提供有效的身份验证信息。"; ctx.writeAndFlush(failureMessage);return;}// 其他逻辑... WORK_CHANNEL_MAP.put(ctx.channel(), ctx.channel().attr(GROUP_ID_KEY).get());}}privatevoidsendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,FullHttpResponse res){// 发送HTTP响应 if(res.status().code()!=200){ByteBuf buf =Unpooled.copiedBuffer(res.status().toString(),CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release();HttpUtil.setContentLength(res, res.content().readableBytes());}ChannelFuture future = ctx.channel().writeAndFlush(res);if(!HttpUtil.isKeepAlive(req)|| res.status().code()!=200){ future.addListener(ChannelFutureListener.CLOSE);}}privateStringgetWebSocketLocation(FullHttpRequest req){return"ws://"+ req.headers().get(HttpHeaderNames.HOST)+ req.uri();}privatevoidhandleWebSocketFrame(ChannelHandlerContext ctx,WebSocketFrame frame){// 处理WebSocket消息,可以根据实际需求进行处理 if(frame instanceofTextWebSocketFrame){// 处理文本消息 String text =((TextWebSocketFrame) frame).text();System.out.println("Received message: "+ text);// 可以在这里处理WebSocket消息并发送响应 // ... }elseif(frame instanceofBinaryWebSocketFrame){// 处理二进制WebSocket消息 // ... System.out.println("123");}elseif(frame instanceofCloseWebSocketFrame){// 处理WebSocket关闭请求 handshaker.close(ctx.channel(),(CloseWebSocketFrame) frame.retain());}elseif(frame instanceofPingWebSocketFrame){// 处理WebSocket Ping消息 System.out.println("cs"); ctx.channel().write(newPongWebSocketFrame(frame.content().retain()));}}@OverrideprotectedvoidchannelRead0(ChannelHandlerContext ctx,Object msg)throwsException{if(msg instanceofFullHttpRequest){// 处理HTTP握手请求 handleHttpRequest(ctx,(FullHttpRequest) msg);}elseif(msg instanceofWebSocketFrame){// 处理WebSocket消息 handleWebSocketFrame(ctx,(WebSocketFrame) msg);}}@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx,Throwable cause){// 发生异常时的处理 log.error(cause.getMessage()); ctx.close();}}
  • 创建NamedChannelGrouppackagecom.todoitbo.baseSpringbootDasmart.netty;importio.netty.channel.group.ChannelGroup;importio.netty.channel.group.DefaultChannelGroup;importio.netty.util.concurrent.GlobalEventExecutor;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;publicclassNamedChannelGroup{privateString groupName;publicstaticMap<String,ChannelGroup> channelGroupMap =newConcurrentHashMap<>();static{ channelGroupMap.put("default",newDefaultChannelGroup(GlobalEventExecutor.INSTANCE));}publicstaticvoidsetGroupName(String groupName){ channelGroupMap.put(groupName,newDefaultChannelGroup(GlobalEventExecutor.INSTANCE));}publicstaticChannelGroupgetChannelGroup(String groupName){return channelGroupMap.get(groupName);}}

Server-Sent Events (SSE)实现

创建DataManager

packagecom.todoitbo.baseSpringbootDasmart.sse;importorg.springframework.http.MediaType;importorg.springframework.stereotype.Component;importorg.springframework.web.servlet.mvc.method.annotation.SseEmitter;importjava.io.IOException;importjava.util.ArrayList;importjava.util.HashMap;importjava.util.List;importjava.util.Map;/**
 * 数据管理器用于管理Server-Sent Events (SSE) 的订阅和数据推送。
 * @author xiaobo
 */@ComponentpublicclassDataManager{privatefinalMap<String,List<SseEmitter>> dataEmitters =newHashMap<>();/**
     * 订阅特定数据类型的SSE连接。
     *
     * @param dataType 要订阅的数据类型
     * @param emitter  SSE连接
     */publicvoidsubscribe(String dataType,SseEmitter emitter){
        dataEmitters.computeIfAbsent(dataType, k ->newArrayList<>()).add(emitter);
        emitter.onCompletion(()->removeEmitter(dataType, emitter));
        emitter.onTimeout(()->removeEmitter(dataType, emitter));}/**
     * 推送特定数据类型的数据给所有已订阅的连接。
     *
     * @param dataType 要推送的数据类型
     * @param data     要推送的数据
     */publicvoidpushData(String dataType,String data){List<SseEmitter> emitters = dataEmitters.getOrDefault(dataType,newArrayList<>());
        emitters.forEach(emitter ->{try{
                emitter.send(SseEmitter.event().data(data,MediaType.TEXT_PLAIN));}catch(IOException e){removeEmitter(dataType, emitter);}});}privatevoidremoveEmitter(String dataType,SseEmitter emitter){List<SseEmitter> emitters = dataEmitters.get(dataType);if(emitters !=null){
            emitters.remove(emitter);}}}

接口实现

packagecom.todoitbo.baseSpringbootDasmart.controller;importcom.todoitbo.baseSpringbootDasmart.sse.DataManager;importorg.springframework.http.MediaType;importorg.springframework.http.ResponseEntity;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importorg.springframework.web.servlet.mvc.method.annotation.SseEmitter;importjavax.annotation.Resource;/**  
 * @author xiaobo  
 */@RestController@RequestMapping("/environment")publicclassEnvironmentController{@ResourceprivateDataManager dataManager;@GetMapping(value ="/subscribe", produces =MediaType.TEXT_EVENT_STREAM_VALUE)publicSseEmittersubscribe(){SseEmitter emitter =newSseEmitter();  
        dataManager.subscribe("environment", emitter);return emitter;}// 示例:推送环境监测数据给前端  @GetMapping("/push/{testText}")publicResponseEntity<String>pushEnvironmentData(@PathVariableString testText){  
        dataManager.pushData("environment", testText);returnResponseEntity.ok("Data pushed successfully.");}}

实现说明

每个不同类型的数据推送都需要一个对应的SSE订阅端点(subscribe)。每个数据类型都有一个对应的订阅端点,用于前端建立SSE连接,并在后端接收和处理特定类型的数据推送。

在你的后端应用中,对于每种数据类型,你需要创建一个对应的Controller或处理器来处理该数据类型的SSE订阅。每个Controller会有自己的SSE订阅端点,前端可以连接到不同的端点以接收相应类型的数据。

这种方式允许你将不同类型的数据推送逻辑分离,使代码更具可维护性和可扩展性。当有新的数据可用时,只需调用相应类型的数据推送方法,而不必修改通用的SSE管理逻辑。

前端实现

<!DOCTYPEhtml><html><head><title>SSE Data Receiver</title></head><body><h1>Real-time Data Display</h1><divid="data-container"></div><script>const dataContainer = document.getElementById('data-container');// 创建一个 EventSource 对象,指定 SSE 服务器端点的 URLconst eventSource =newEventSource('http://127.0.0.1:13024/environment/subscribe');// 根据你的控制器端点来设置URL// 添加事件处理程序,监听服务器端发送的事件
        eventSource.onmessage=(event)=>{const data = event.data;// 在这里处理从服务器接收到的数据// 可以将数据显示在页面上或进行其他操作const newDataElement = document.createElement('p');
            newDataElement.textContent = data;
            dataContainer.appendChild(newDataElement);};

        eventSource.onerror=(error)=>{// 处理连接错误
            console.error('Error occurred:', error);};</script></body></html>

弊端以及解决方案

如果你没什么处理的话,在它首次调用subscribe时候可能会出现连接超时的问题,因为这个是一个长连接,出现这种问题是因为,此时并没有数据产生,至此,除非你刷新页面,否则即使有数据产生前端也不会受到了

你希望前端在第一次订阅SSE连接后,即使后端没有数据产生,之后也能接收到数据。这可以通过以下方式来实现:

  1. 保持持久连接: 确保前端建立的SSE连接是持久性连接,不会在第一次连接成功后关闭。这可以让连接一直保持打开状态,即使后端没有即时数据产生。你可以在前端代码中使用以下方式来确保连接持久:const eventSource =newEventSource('/environment/subscribe');默认情况下,EventSource对象会自动重连,以保持连接的持久性。
  2. 定期发送心跳数据: 在后端定期发送一些心跳数据,以确保连接保持活跃。这可以防止连接超时关闭。你可以在后端定期发送一个包含无用信息的SSE事件,例如:@Scheduled(fixedRate =30000)// 每30秒发送一次心跳数据publicvoidsendHeartbeat(){ dataManager.pushData("heartbeat","Heartbeat data");}前端可以忽略这些心跳事件,但它们会保持连接处于活跃状态。
  3. 前端自动重连: 在前端代码中添加自动重连逻辑,以处理连接断开的情况。这样,如果连接由于某种原因断开,前端会自动尝试重新建立连接。示例:const eventSource =newEventSource('/environment/subscribe');eventSource.onerror=(error)=>{// 处理连接错误 console.error('Error occurred:', error);// 重新建立连接 eventSource.close();setTimeout(()=>{// 重新建立连接 eventSource =newEventSource('/environment/subscribe');},1000);// 1秒后重试};

通过结合上述方法,你可以确保前端能够建立并保持持久SSE连接,即使后端没有即时数据产生。这样,一旦后端有数据产生,前端也可以接收到数据而无需重新订阅。

标签: websocket springboot

本文转载自: https://blog.csdn.net/Mrxiao_bo/article/details/132779099
版权归原作者 一只牛博 所有, 如有侵权,请联系我们删除。

“WebSocket vs SSE: 实时数据推送到前端的选择与实现(详细)”的评论:

还没有评论