0


Spring Boot 整合 SSE(Server Sent Event)

服务器发送事件(Server-Sent Events),简称 SSE。这是一种服务器端到客户端的单向消息推送。SSE 基于 HTTP 协议的,SSE 在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息

后端代码:

import cn.hutool.core.util.IdUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

@Slf4j
@Component
public class SseUtil {

    private static final Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * 创建连接
     */
    public SseEmitter connect(Long userId, Consumer<Throwable> errorCallback,Runnable timeOutCallback) {
        if (sseEmitterMap.containsKey(userId)) {
            SseEmitter sseEmitter =sseEmitterMap.get(userId);
            sseEmitterMap.remove(userId);
             sseEmitter.complete();
        }
        try {
            // 设置超时时间,0表示不过期。默认30秒
            SseEmitter sseEmitter = new SseEmitter(5*60*1000L);
            sseEmitter.send(SseEmitter.event().id(IdUtil.simpleUUID()).reconnectTime(1*60*1000L).data(""));
            // 注册回调
            sseEmitter.onCompletion(() -> {
            });
            sseEmitter.onError(errorCallback);
            sseEmitter.onTimeout(timeOutCallback);
            sseEmitterMap.put(userId, sseEmitter);
            log.info("创建sse连接完成,当前用户:{}", userId);
            return sseEmitter;
        } catch (Exception e) {
            log.info("创建sse连接异常,当前用户:{}", userId);
        }
        return null;
    }

    /**
     * 给指定用户发送消息
     *
     */
    public boolean sendMessage(Long userId,String messageId, String message) {
        if (sseEmitterMap.containsKey(userId)) {
            SseEmitter sseEmitter = sseEmitterMap.get(userId);
            try {
                sseEmitter.send(SseEmitter.event().id(messageId).reconnectTime(1*60*1000L).data(message));
                log.info("用户{},消息id:{},推送成功:{}", userId,messageId, message);
                return true;
            }catch (Exception e) {
                sseEmitterMap.remove(userId);
                log.info("用户{},消息id:{},推送异常:{}", userId,messageId, e.getMessage());
                sseEmitter.complete();
                return false;
            }
        }else {
            log.info("用户{}未上线", userId);
        }
        return false;
    }

    /**
     * 断开
     * @param userId
     */
    public void removeUser(Long userId){
        if (sseEmitterMap.containsKey(userId)) {
            SseEmitter sseEmitter = sseEmitterMap.get(userId);
            sseEmitterMap.remove(userId);
            sseEmitter.complete();
        }else {
            log.info("用户{} 连接已关闭",userId);
        }

    }

}

细节:

  1. 创建SseEmitter 对象时需要返回给客户端,且不能二次包装
  2. 建立连接后,浏览器会处于加载中的状态,直到SseEmitter发送消息,或者连接超时和关闭,所以连接后就像进行了一次空消息的发送,避免浏览器一直处于加载中
  3. SseEmitter 中, timeout 属性表示 SseEmitter 在发送 SSE 事件到客户端时的超时时间。也就是说,当您使用 SseEmittersend() 方法来发送 SSE 事件时,如果超过了 timeout 属性指定的时间,则将抛出 AsyncRequestTimeoutException 异常。客户端会进行自动重连,这个异常最好直接交给spring处理,因为这个请求是text/event-stream,全局异常处理可能会报错
  4. SseEventBuilder 中, timeout 属性表示当前正在构建的 SSE 事件的超时时间。也就是说,当您调用 SseEventBuilderbuild() 方法来构建 SSE 事件时,如果超过了 timeout 属性指定的时间,则将抛出 SseEventTimeoutException 异常

前端代码:

<script>
if (window.EventSource) {
            // 建立连接
            source = new EventSource(http://localhost:8080/test/sse?id=1);

            source.onopen = function (event) {
                console.log('SSE链接成功');
            }

            source.onmessage = function (event) {
 
                if(event.data){
                   
                   console.log('后端返回的数据:', data.value);
                }
              
            }
            source.onerror = (error) => {
                console.log('SSE链接失败');
            };

        } else {
            alert("你的浏览器不支持SSE");
        }
</script>
标签: java

本文转载自: https://blog.csdn.net/hehe8881/article/details/130508600
版权归原作者 #585858 所有, 如有侵权,请联系我们删除。

“Spring Boot 整合 SSE(Server Sent Event)”的评论:

还没有评论