🧑 博主简介:历代文学网(PC端可以访问:历代文学,移动端可微信小程序搜索“历代文学”)总架构师,
15年工作经验,精通
Java编程,
高并发设计,
Springboot和微服务,熟悉
Linux,**
ESXI虚拟化以及
云原生Docker和K8s**,热衷于探索科技的边界,并将理论知识转化为实际应用。保持对新技术的好奇心,乐于分享所学,希望通过我的实践经历和见解,启发他人的创新思维。在这里,我希望能与志同道合的朋友交流探讨,共同进步,一起在技术的世界里不断学习成长。


SseEmitter是ResponseBodyEmitter的一种特殊形式,用于发送Server-Sent Events。它有助于异步请求处理,其中将一个或多个对象写入响应,并且每个对象都使用兼容的 HttpMessageConverter写入。
*1. SseEmitter类*
SseEmitter可以将事件从服务器传递到客户端。服务器发送的事件是从服务器到客户端的消息。它们具有“ *Content **-Type* ”标头
text/event-stream
。
这些事件非常简单,只有四个字段。
场地描述ID事件的 ID事件事件类型数据事件数据重试事件流的重新连接时间
2.如何使用SseEmitter?

要从请求处理方法发送事件,您需要创建一个实例
SseEmitter
并从请求处理方法返回它。然后使用该
emitter.send()
方法将各个元素发送到客户端。
@RequestMapping(value="/resource-uri", method=RequestMethod.GET)
public SseEmitter handle()
{
SseEmitter emitter = new SseEmitter();
// Pass the emitter to a new thead for async processing...
return emitter;
}
// in new thread
emitter.send(dataset1);
// and again
emitter.send(dataset2);
// and done
emitter.complete();
如果要向事件添加更多信息,请使用 SseEventBuilder 。它
event()
的工厂方法
SseEmitter
创建一个实例。使用它来填写 id 和 event 字段。
SseEventBuilder eventBuilder = SseEmitter.event();
emitter.send(
eventBuilder
.data(dataSet)
.name("dataSet-created")
.id(String.valueOf(dataSet.hashCode()))
);
3. Springboot 实现 SSE 服务端推送事件
**
Sever Send Event
**,是
HTTP
协议中的一种,
Content-Type
为
text/event-stream
,能够保持长连接。
SSE只能发送文本消息(但也可通过**
base64
**等方法进行简单加密)。
3.1 服务端Spring Boot
@CrossOrigin
@RestController
public class SSEController {
private final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
@PostConstruct
private void init() {
Executors.newScheduledThreadPool(10).scheduleWithFixedDelay(() -> {
System.out.println("sseEmitterMap#" + sseEmitterMap);
sseEmitterMap.forEach((s, sseEmitter) -> {
try {
sseEmitter.send(SseEmitter.event()
.id(UUID.randomUUID().toString())
.data(MyEvent.builder().code(222).msg(s + "#" +
LocalDateTime.now() + "#" + Thread.currentThread().getName() + "#"
+ Thread.currentThread().getState()).build())
.reconnectTime(3000L)
.comment("this is comment")
);
if (LocalDateTime.now().getSecond() % 2 == 0) {
sseEmitter.send(
SseEmitter.event()
.id("1")
.name("customEventName")
.data("customData")
);
}
} catch (IOException e) {
System.out.println("Error#" + e);
// sseEmitter.completeWithError(e);
}
});
}, 3, 3, TimeUnit.SECONDS);
}
@GetMapping("/test/sse")
public SseEmitter sseEmitter(@RequestParam("uid") String uid) throws IOException {
SseEmitter sseEmitter = new SseEmitter(-1L);
// SseEmitter sseEmitter = new SseEmitter(5L);
sseEmitter.send(SseEmitter.event().id("1").name("Connected").data(LocalDateTime.now()).reconnectTime(3000));
sseEmitterMap.put(uid, sseEmitter);
sseEmitter.onCompletion(() -> {
System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on completion");
sseEmitterMap.remove(uid);
});
sseEmitter.onTimeout(() -> System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on timeout#" + sseEmitter.getTimeout()));
sseEmitter.onError(throwable -> System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on error#" + throwable.toString()));
return sseEmitter;
}
}
3.2 客户端 JavaScript
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<script>
window.onload = function () {
let connectBtn = document.getElementById("connectSSE");
let disconnectBtn = document.getElementById("disconnectSSE");
let userIdElement = document.getElementById("userId");
let userIdInfoElement = document.getElementById("userIdInfo");
let sse;
connectBtn.onclick = function () {
if (!userIdElement.value) {
userIdInfoElement.innerText = "userId is empty";
console.log("userId is empty")
return;
}
userIdInfoElement.innerText = userIdElement.value;
const eventSource = new EventSource('http://localhost:18080/test/sse?uid=' + userIdElement.value);
eventSource.onopen = (event) => {
console.log("onopen", event.readyState, event.target);
sse = event.target;
let element = document.getElementById("onOpenInfo");
element.innerText = JSON.stringify(event.target);
};
eventSource.onmessage = (event) => {
let element = document.getElementById("onMessageInfo");
element.innerText = event.data;
};
eventSource.onerror = (event) => {
console.log("onerror", event);
if (event.readyState === EventSource.CLOSED) {
console.log('connection is closed');
} else {
console.log("Error occured", event);
}
event.target.close();
let element = document.getElementById("onErrorInfo");
element.innerText = JSON.stringify(event);
};
eventSource.addEventListener("customEventName", (event) => {
console.log("Message id is " + event.lastEventId);
});
};
disconnectBtn.onclick = function () {
if (sse) {
sse.close();
}
};
};
</script>
</head>
<body>
<div>
<input id="userId" type="text">
<button id="connectSSE">Connect</button>
<button id="disconnectSSE">Disconnect</button>
</div>
<div>
userId: <span id="userIdInfo"></span>
</div>
<div>
onOpen: <span id="onOpenInfo"></span>
</div>
<div>
onMessage: <span id="onMessageInfo"></span>
</div>
<div>
onError: <span id="onErrorInfo"></span>
</div>
</body>
</html>
SpringMVC
封装的
SSE
实现,
Controller
中直接返回
SseEmitter
,不调用
complete()
方法,即可保持长链接。
3.3 超时时间
SseEmitter()
:无参构造,默认超时时间依赖于
Web
容器,容器为
Tomcat
则超时时间为
30
秒。
SseEmitter(Long timeout)
:有参构造,设置超时时间。传入
-1L
表示没有超时时间。
无参构造可通过配置
mvc
属性来设置超时时间,单位毫秒:
spring:
mvc:
async:
request-timeout: 15000
注意:客户端关闭了连接,不管是调用了
event.target.close()
还是关闭了网页,服务端不会触发任何回调。直到服务端调用
send
后才会触发
onError
和
onCompletion
回调。
服务端触发了
onCompletion
回调后,连接就自动断开了。
3.4 重试机制
浏览器会保持连接一直打开。服务端可以通过调用
complete
和
completeWithError
方法关闭连接,这两个事件会触发客户端的
error
回调。
当服务端关闭连接或网络错误时,如果客户端不调用
event.target.close()
关闭连接的话,浏览器会发起重新连接。
浏览器默认会等待
3
秒再尝试重新建立连接,并且浏览器会保持重试知道获得
HTTP
请求返回的
200
状态码。
服务端可以通过发送
retry
标志位更改默认
3
秒的等待时间。服务端可以设置标志位为
0
,表示连接关闭则立即发起重试,没有等待时间。
4. 使用 SseEmitter 的异步 REST 控制器
在给定的控制器方法中,我们正在访问数据集(使用您自己的域数据类型)。
- 有一种数据服务可以从数据库或任何其他来源返回数据集。
- 然后处理每个数据集(例如从另一个源检索相关信息),这需要时间。这是通过调用方法使用人为延迟来模拟的
thread.sleep()。 - 然后使用方法将每个数据集添加到
SseEmitter对象emitter.send()。 - 最后
emitter.complete()调用来标记请求处理已完成,以便负责发送响应的线程可以完成请求并释放时间来处理下一个响应。 - 如果在处理请求时遇到任何错误,请通过 完成该过程
emitter.completeWithError()。异常将通过 Spring MVC 的正常异常处理,然后完成响应。
@RestController
public class DataSetController {
private final DataSetService dataSetService;
public DataSetController(DataSetService dataSetService) {
this.dataSetService = dataSetService;
}
@GetMapping("/emit-data-sets")
public SseEmitter emitDataSets() {
SseEmitter emitter = new SseEmitter();
ExecutorService executor = Executors.newSingleThreadExecutor();
//Creating a new thread for async processing
executor.execute(() -> {
List<DataSet> dataSets = dataSetService.findAll();
try {
for (DataSet dataSet : dataSets) {
randomDelay();
emitter.send(dataSet);
}
emitter.complete();
} catch (IOException e) {
emitter.completeWithError(e);
}
});
executor.shutdown();
return emitter;
}
private void randomDelay() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
5. 异步配置
为了覆盖默认的异步行为(例如线程池和超时),我们可以实现WebMvcConfigurer接口并覆盖其*configureAsyncSupport()*方法。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
@EnableAsync
public class AsyncConfig implements WebMvcConfigurer {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setTaskExecutor(mvcTaskExecutor());
configurer.setDefaultTimeout(30_000);
}
@Bean
public ThreadPoolTaskExecutor mvcTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setThreadNamePrefix("mvc-task-");
return threadPoolTaskExecutor;
}
}
6. 测试
6.1. JUnit 测试
为了测试上述控制器方法,我使用了springboot发行版附带的mockito。
@WebMvcTest(DataSetController.class)
public class DataSetControllerTest {
@Autowired
private MockMvc mockMvc;
@MockBean
private DataSetService dataSetService;
@Test
public void testFetchData() throws Exception {
Mockito.when(dataSetService.findAll())
.thenReturn(Arrays.asList(new DataSet(BigInteger.valueOf(1), "data")));
MvcResult mvcResult = mockMvc.perform(get("/emit-data-sets"))
.andExpect(request().asyncStarted())
.andDo(MockMvcResultHandlers.log())
.andReturn();
mockMvc.perform(asyncDispatch(mvcResult))
.andDo(MockMvcResultHandlers.log())
.andExpect(status().isOk())
.andExpect(content().json("{\"id\":1,\"name\":\"data\"}"));
}
}
Test 输出:
MockHttpServletResponse:
Status = 200
Error message = null
Headers = {Content-Type=}
Content type = text/event-stream;charset=UTF-8
Body = data:{"id":1,"name":"data"}
Forwarded URL = null
Redirected URL = null
Cookies = []
6.2 浏览器测试
要在浏览器中测试,请使用类SpringAsyncExampleApplication启动应用程序并在浏览器中点击 URL:
http://localhost:8080/emit-data-sets
检查以事件形式返回的服务器响应以及事件之间的延迟。

请注意,
Content-Type
标题的值为,
text/event-stream
表示我们获得了一个事件流。该流可以保持打开状态,它将接收事件通知。每个写入的对象都转换为
JSON
带有
HttpMessageConverter
。每个对象都作为事件数据写入数据标签中。
如果您在 Spring boot 3 中使用 SseEmitter 执行此异步休息控制器示例时遇到任何错误,请在评论区留言!
版权归原作者 月下独码 所有, 如有侵权,请联系我们删除。