0


SpringBoot使用SSE进行实时通知前端

SpringBoot使用SSE进行实时通知前端

说明

项目有个需求是要实时通知前端,告诉前端这个任务加载好了。然后想了2个方案,一种是用websocket进行长连接,一种是使用SSE(Sever Send Event),是HTTP协议中的一种,Content-Type为text/event-stream,能够保持长连接。
websocket是前端既能向后端发送消息,后端也能向前端发送消息。
SSE是只能后端向前端发送消息。
因为只需要后端通知,所以我这里选择了使用SSE实现。
这里先做个笔记,怕以后忘记怎么使用。

maven依赖

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.3</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.project</groupId><artifactId>test</artifactId><version>0.0.1-SNAPSHOT</version><name>test</name><description>test</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--web依赖,内嵌入tomcat,SSE依赖于该jar包,只要有该依赖就能使用SSE--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--lombok依赖,用来对象省略写set、get方法--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

SSE工具类代码

packagecom.etone.project.utils;importlombok.extern.slf4j.Slf4j;importorg.springframework.http.MediaType;importorg.springframework.web.servlet.mvc.method.annotation.SseEmitter;importjava.io.IOException;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;importjava.util.Set;importjava.util.concurrent.ConcurrentHashMap;importjava.util.concurrent.atomic.AtomicInteger;importjava.util.function.Consumer;@Slf4jpublicclassSseEmitterServer{/**
     * 当前连接数
     */privatestaticAtomicInteger count =newAtomicInteger(0);privatestaticMap<String,SseEmitter> sseEmitterMap =newConcurrentHashMap<>();publicstaticSseEmitterconnect(String userId){//设置超时时间,0表示不过期,默认是30秒,超过时间未完成会抛出异常SseEmitter sseemitter =newSseEmitter(0L);//注册回调
        sseemitter.onCompletion(completionCallBack(userId));//这个onError在springbooot低版本没有这个方法,公司springboot1.4.2版本,没有这个方法,可以进行注释。
        sseemitter.onError(errorCallBack(userId));
        sseemitter.onTimeout(timeoutCallBack(userId));
        sseEmitterMap.put(userId,sseemitter);//数量+1
        count.getAndIncrement();
        log.info("create new sse connect ,current user:{}",userId);return sseemitter;}/**
     * 给指定用户发消息
     */publicstaticvoidsendMessage(String userId,String message){if(sseEmitterMap.containsKey(userId)){try{
                sseEmitterMap.get(userId).send(message);}catch(IOException e){
                log.error("user id:{}, send message error:{}",userId,e.getMessage());
                e.printStackTrace();}}}/**
     * 想多人发送消息,组播
     */publicstaticvoidgroupSendMessage(String groupId,String message){if(sseEmitterMap!=null&&!sseEmitterMap.isEmpty()){
            sseEmitterMap.forEach((k,v)->{try{if(k.startsWith(groupId)){
                        v.send(message,MediaType.APPLICATION_JSON);}}catch(IOException e){
                    log.error("user id:{}, send message error:{}",groupId,message);removeUser(k);}});}}publicstaticvoidbatchSendMessage(String message){
        sseEmitterMap.forEach((k,v)->{try{
                v.send(message,MediaType.APPLICATION_JSON);}catch(IOException e){
                log.error("user id:{}, send message error:{}",k,e.getMessage());removeUser(k);}});}/**
     * 群发消息
     */publicstaticvoidbatchSendMessage(String message,Set<String> userIds){
        userIds.forEach(userid->sendMessage(userid,message));}//移除用户publicstaticvoidremoveUser(String userid){
        sseEmitterMap.remove(userid);//数量-1
        count.getAndDecrement();
        log.info("remove user id:{}",userid);}publicstaticList<String>getIds(){returnnewArrayList<>(sseEmitterMap.keySet());}publicstaticintgetUserCount(){return count.intValue();}privatestaticRunnablecompletionCallBack(String userId){return()->{
            log.info("结束连接,{}",userId);removeUser(userId);};}privatestaticRunnabletimeoutCallBack(String userId){return()->{
            log.info("连接超时,{}",userId);removeUser(userId);};}privatestaticConsumer<Throwable>errorCallBack(String userId){return throwable ->{
            log.error("连接异常,{}",userId);removeUser(userId);};}}

Controller测试代码

packagecom.project.test.controller;importcom.hjl.test.util.SseEmitterServer;importorg.springframework.web.bind.annotation.*;importorg.springframework.web.servlet.mvc.method.annotation.SseEmitter;importjava.util.HashMap;importjava.util.Map;@RestController@RequestMapping(value ="/test")publicclassTestController{//sse连接接口@GetMapping(value ="/sse/connect/{id}")publicSseEmitterconnect(@PathVariableString id){returnSseEmitterServer.connect(id);}//sse向指定用户发送消息接口@GetMapping(value ="/sse/send/{id}")publicMap<String,Object>send(@PathVariableString id,@RequestParam(value ="message", required =false)String message){Map<String,Object> returnMap =newHashMap<>();//向指定用户发送信息SseEmitterServer.sendMessage(id,message);
        returnMap.put("message","向id为"+id+"的用户发送:"+message+"成功!");
        returnMap.put("status","200");
        returnMap.put("result",null);return returnMap;}//sse向所有已连接用户发送消息接口@GetMapping(value ="/sse/batchSend")publicMap<String,Object>batchSend(@RequestParam(value ="message", required =false)String message){Map<String,Object> returnMap =newHashMap<>();//向指定用户发送信息SseEmitterServer.batchSendMessage(message);
        returnMap.put("message",message+"消息发送成功!");
        returnMap.put("status","200");
        returnMap.put("result",null);return returnMap;}//sse关闭接口@GetMapping(value ="/sse/close/{id}")publicMap<String,Object>close(@PathVariableString id){Map<String,Object> returnMap =newHashMap<>();//移除idSseEmitterServer.removeUser(id);System.out.println("当前连接用户id:"+SseEmitterServer.getIds());
        returnMap.put("message","连接关闭成功!");
        returnMap.put("status","200");
        returnMap.put("result",null);return returnMap;}}

测试结果如下:

这里测试SSE连接,就像正常接口那样请求就行。
本地调用接口/sse/connect/1如下:
这里我连接2个用户,用来模拟向指定用户id发送信息和批量向已连接的用户发送消。
在这里插入图片描述
在这里插入图片描述
后端服务打印如下:
在这里插入图片描述

本地调用接口/sse/send/1如下:
在这里插入图片描述
用户1的结果如下,发现它收到了消息:
在这里插入图片描述
用户2没有收到结果,如下:
在这里插入图片描述

本地调用接口/sse/batchSend如下:
批量向所有已经连接的用户发送消息。
在这里插入图片描述
用户1结果如下,发现接收到了消息:
在这里插入图片描述
用户2结果如下,发现也接收到了消息:
在这里插入图片描述
测试结果都符合预期。
点击postman的close按钮,关闭连接:
在这里插入图片描述
在这里插入图片描述
发现前端连接虽然关闭了,但是后端实际还在连接中,根本没有移除用户的提示:
在这里插入图片描述
所以这里还需要自己手动写关闭接口测试。
本地调用接口/sse/close/1如下:
在这里插入图片描述
可以看到把用户id为1的给移除了,只剩用户2还在连接中。
在这里插入图片描述
这里所有测试完成,结果符合预期。

注意

将超时时间由原来的0改为30毫秒(L在整数后面表示Long占8字节),会报错。

在这里插入图片描述
测试结果如下:
在这里插入图片描述
在这里插入图片描述
这里直接出现了一个异常:org.springframework.web.context.request.async.AsyncRequestTimeoutException
甚至连接都断开了。

将springboot降为低版本如1.4.2.RELEASE。

使用postman进行测试的时候,发现它不是一直在请求中:如下:
将Springboot降为1.4.2.RELEASE
在这里插入图片描述

springboot的1.4.2.RELEASE版本没有onError方法,需要注释掉。在这里插入图片描述
postman测试如下:
低版本测试的时候发现它有一个这个连接可以直接看到,而使用springboot版本2.x版本就发现它一直处于发送请求的状态,什么时候后端向前端发送了消息,它就显示这个。
springboot的1.4.2.RELEASE版本结果:
在这里插入图片描述
springboot的2.7.3版本结果:
在这里插入图片描述

这里先将这种情况先记录下来先,等后面有时间再研究。怎么高版本就不能向低版本那样返回这个连接信息呢?所以SpringBoot高版本使用SSE连接的时候一直处于Sending request这种情况,这种情况是正常的吗?有没有大佬告知下,谢谢。

标签: spring boot 前端 java

本文转载自: https://blog.csdn.net/weixin_48040732/article/details/131000339
版权归原作者 谁不想飞舞青春 所有, 如有侵权,请联系我们删除。

“SpringBoot使用SSE进行实时通知前端”的评论:

还没有评论