0


Spring/Spring Boot服务端主动推送技术【server send event】简称sse,看完不亏系列

服务端主动推送技术【server send event】

概述

SseEmitter类在Spring框架中用于实现服务器主动向客户端发送消息推送(Server-Sent Events,简称SSE)。SSE是一种允许服务器向客户端推送实时更新的技术,通常用于实现实时数据通信如股票行情、聊天室等场景。

SSE和websocket的主要区别在于SSE是单向的【只能从服务端到客户端】,而websocket是双向的

SseEmitter(ResponseBodyEmitter的子类)支持服务器发送的事件,
服务器发送的事件将按照W3C SSE规范进行格式化。
为了从控制器生成SSE流,需要返回SseEmitter。

学习官网

spring5.3.34对应的Spring Boot 版本是2.7.18
https://docs.spring.io/spring-framework/docs/5.3.34/reference/html/web.html#mvc-ann-async-sse

上代码【琢磨了一下午】

服务端代码

pom.xml【Spring boot 2.7.18 ,java 版本17 ,java 8也行】
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.18</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.ljh</groupId><artifactId>demo3</artifactId><version>0.0.1-SNAPSHOT</version><name>demo3</name><description>Demo project for Spring Boot</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><image><builder>paketobuildpacks/builder-jammy-base:latest</builder></image></configuration></plugin></plugins></build></project>

java 代码

packagecom.ljh.demo3;importorg.springframework.http.MediaType;importorg.springframework.web.bind.annotation.*;importorg.springframework.web.servlet.mvc.method.annotation.SseEmitter;importjava.io.IOException;importjava.util.Map;importjava.util.Objects;importjava.util.concurrent.ConcurrentHashMap;importjava.util.concurrent.Executors;/**
 * 服务端主动推送技术
 */@RestController@RequestMapping(value ="/sse")@CrossOriginpublicclassSeverSendEvent{privatestaticMap<String,SseEmitter> sseCache =newConcurrentHashMap<>();/**
     * 客户端发起订阅消息的方法
     * @param id 标识ID 可理解为通道ID
     * @return
     */@GetMapping(value ="/subscribe",produces ={MediaType.TEXT_EVENT_STREAM_VALUE})publicSseEmittersubscribe(@RequestParam(value ="id")String id){//创建一个具有自定义超时值的 SseEmitter。//默认情况下不设置超时值,在这种情况下,将使用 MVC Java 配置或 MVC 命名空间中配置的默认值;// 如果未设置超时值,则超时值取决于底层服务器的默认值:30秒。//SseEmitter在构造器中设置超时2秒,设置前端的重试时间为2秒,则一共需要等待4秒SseEmitter sseEmitter =newSseEmitter(2_000L);// 设置前端的重试时间:2秒reconnectTime(sseEmitter,2_000L);//放入map缓存中
        sseCache.put(id, sseEmitter);//注册相关回调//异步请求超时时调用:当前端重连接时,会触发请求超时回调
        sseEmitter.onTimeout(()->{System.out.println("触发请求超时!!!");
            sseCache.remove(id);});//注册完成时回调,以便在异步请求完成时调用。当异步请求因任何原因(包括超时和网络错误)完成时,容器线程会调用此方法。该方法可用于检测 ResponseBodyEmitter 实例是否不再可用
        sseEmitter.onCompletion(()->{System.out.println("完成!!!");//sseCache.remove(id);});//在异步请求处理过程中出现错误时调用
        sseEmitter.onError(error->{System.out.println("出现错误啦");//sseCache.remove(id);
            error.printStackTrace();});return sseEmitter;}/**
     * 设置前端的重试时间
     * @param sseEmitter
     * @param reconnectTimeMillis 单位毫秒
     */privatestaticvoidreconnectTime(SseEmitter sseEmitter,long reconnectTimeMillis){Objects.requireNonNull(sseEmitter,"sseEmitter对象为空啦");//开启另一个线程Executors.newFixedThreadPool(1).execute(()->{try{String dateTime =LocalDateTime.now().withNano(0).toString().replace("T"," ");
                sseEmitter.send(SseEmitter.event().reconnectTime(reconnectTimeMillis).data(String.format("连接成功:%s", dateTime)));}catch(IOException e){//sseEmitter.completeWithError(e);thrownewRuntimeException(e);}});}/**
     * 推送消息
     * @param id
     * @param content
     * @return
     * @throws IOException
     */@GetMapping(value ="/push")publicStringpush(@RequestParam(value ="id")String id,@RequestParam(value ="content")String content)throwsIOException{SseEmitter sseEmitter = sseCache.get(id);if(sseEmitter !=null){
            sseEmitter.send(content);}return"推送成功!";}/**
     * 服务器主动停止推送
     * @param id
     * @return
     */@GetMapping(value ="/stop")publicStringstop(@RequestParam(value ="id")String id){SseEmitter sseEmitter = sseCache.get(id);if(sseEmitter !=null){//通过向 servlet 容器执行分派来完成请求处理,Spring MVC 会在其中再次调用,并完成请求处理生命周期。//注意:此方法应由应用程序调用,以完成请求处理。它不应在容器相关事件(如发送时出错)发生后使用。
            sseEmitter.complete();
            sseCache.remove(id);}return"断开连接!";}}

客户端代码

在电脑桌面新建一个index.html, 内容如下:

<!doctypehtml><htmllang="en"><head><title>Sse测试文档</title></head><body><div>sse测试</div><divid="result"></div></body></html><script>//实际开发中id参数可以做成变量传参var source =newEventSource('http://127.0.0.1:8080/sse/subscribe?id=123');var result=document.getElementById('result');// 监听到服务端发来的消息回调
    source.onmessage=function(event){
        text = result.innerText;
        text +='\n'+ event.data;
        result.innerText = text;};//连接上服务端回调
    source.onopen=function(event){
        text = result.innerText;
        text +='\n 开启: ';
        console.log(event);
        result.innerText = text;};</script>

结果展示

开启后端服务,打开index.html 连接上后即显示连接成功
打开访问push接口,手动模拟推送数据,id为通道id,即在index.html中传递的id参数
停止发送后测试两次重连间隔时间
在这里插入图片描述

服务端控制台打印
在这里插入图片描述

如果看完有收获,欢迎点赞关注一波~!

标签: spring spring boot java

本文转载自: https://blog.csdn.net/ljh_learn_from_base/article/details/137798154
版权归原作者 ljh_learn_from_base 所有, 如有侵权,请联系我们删除。

“Spring/Spring Boot服务端主动推送技术【server send event】简称sse,看完不亏系列”的评论:

还没有评论