服务器发送事件(Server-Sent Events),简称 SSE。这是一种服务器端到客户端的单向消息推送。SSE 基于 HTTP 协议的,SSE 在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息
后端代码:
import cn.hutool.core.util.IdUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@Slf4j
@Component
public class SseUtil {
private static final Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 创建连接
*/
public SseEmitter connect(Long userId, Consumer<Throwable> errorCallback,Runnable timeOutCallback) {
if (sseEmitterMap.containsKey(userId)) {
SseEmitter sseEmitter =sseEmitterMap.get(userId);
sseEmitterMap.remove(userId);
sseEmitter.complete();
}
try {
// 设置超时时间,0表示不过期。默认30秒
SseEmitter sseEmitter = new SseEmitter(5*60*1000L);
sseEmitter.send(SseEmitter.event().id(IdUtil.simpleUUID()).reconnectTime(1*60*1000L).data(""));
// 注册回调
sseEmitter.onCompletion(() -> {
});
sseEmitter.onError(errorCallback);
sseEmitter.onTimeout(timeOutCallback);
sseEmitterMap.put(userId, sseEmitter);
log.info("创建sse连接完成,当前用户:{}", userId);
return sseEmitter;
} catch (Exception e) {
log.info("创建sse连接异常,当前用户:{}", userId);
}
return null;
}
/**
* 给指定用户发送消息
*
*/
public boolean sendMessage(Long userId,String messageId, String message) {
if (sseEmitterMap.containsKey(userId)) {
SseEmitter sseEmitter = sseEmitterMap.get(userId);
try {
sseEmitter.send(SseEmitter.event().id(messageId).reconnectTime(1*60*1000L).data(message));
log.info("用户{},消息id:{},推送成功:{}", userId,messageId, message);
return true;
}catch (Exception e) {
sseEmitterMap.remove(userId);
log.info("用户{},消息id:{},推送异常:{}", userId,messageId, e.getMessage());
sseEmitter.complete();
return false;
}
}else {
log.info("用户{}未上线", userId);
}
return false;
}
/**
* 断开
* @param userId
*/
public void removeUser(Long userId){
if (sseEmitterMap.containsKey(userId)) {
SseEmitter sseEmitter = sseEmitterMap.get(userId);
sseEmitterMap.remove(userId);
sseEmitter.complete();
}else {
log.info("用户{} 连接已关闭",userId);
}
}
}
细节:
- 创建SseEmitter 对象时需要返回给客户端,且不能二次包装
- 建立连接后,浏览器会处于加载中的状态,直到SseEmitter发送消息,或者连接超时和关闭,所以连接后就像进行了一次空消息的发送,避免浏览器一直处于加载中
- 在
SseEmitter
中,timeout
属性表示SseEmitter
在发送 SSE 事件到客户端时的超时时间。也就是说,当您使用SseEmitter
的send()
方法来发送 SSE 事件时,如果超过了timeout
属性指定的时间,则将抛出AsyncRequestTimeoutException
异常。客户端会进行自动重连,这个异常最好直接交给spring处理,因为这个请求是text/event-stream,全局异常处理可能会报错 - 在
SseEventBuilder
中,timeout
属性表示当前正在构建的 SSE 事件的超时时间。也就是说,当您调用SseEventBuilder
的build()
方法来构建 SSE 事件时,如果超过了timeout
属性指定的时间,则将抛出SseEventTimeoutException
异常
前端代码:
<script>
if (window.EventSource) {
// 建立连接
source = new EventSource(http://localhost:8080/test/sse?id=1);
source.onopen = function (event) {
console.log('SSE链接成功');
}
source.onmessage = function (event) {
if(event.data){
console.log('后端返回的数据:', data.value);
}
}
source.onerror = (error) => {
console.log('SSE链接失败');
};
} else {
alert("你的浏览器不支持SSE");
}
</script>
版权归原作者 #585858 所有, 如有侵权,请联系我们删除。