0


带有 SseEmitter 的 Spring Boot 异步 REST 控制器

🧑 博主简介:历代文学网(PC端可以访问:历代文学,移动端可微信小程序搜索“历代文学”)总架构师,

15年

工作经验,精通

Java编程

高并发设计

Springboot和微服务

,熟悉

Linux

,**

ESXI虚拟化

以及

云原生Docker和K8s

**,热衷于探索科技的边界,并将理论知识转化为实际应用。保持对新技术的好奇心,乐于分享所学,希望通过我的实践经历和见解,启发他人的创新思维。在这里,我希望能与志同道合的朋友交流探讨,共同进步,一起在技术的世界里不断学习成长。

在这里插入图片描述

SseEmitterResponseBodyEmitter的一种特殊形式,用于发送Server-Sent Events。它有助于异步请求处理,其中将一个或多个对象写入响应,并且每个对象都使用兼容的 HttpMessageConverter写入。

*1. SseEmitter*

SseEmitter可以将事件从服务器传递到客户端。服务器发送的事件是从服务器到客户端的消息。它们具有“ *Content **-Type* ”标头

text/event-stream

这些事件非常简单,只有四个字段。
场地描述ID事件的 ID事件事件类型数据事件数据重试事件流的重新连接时间

2.如何使用SseEmitter

要从请求处理方法发送事件,您需要创建一个实例

SseEmitter

并从请求处理方法返回它。然后使用该

emitter.send()

方法将各个元素发送到客户端。

@RequestMapping(value="/resource-uri", method=RequestMethod.GET)
public SseEmitter handle()
{
     SseEmitter emitter = new SseEmitter();

     // Pass the emitter to a new thead for async processing...
     return emitter;
}

// in new thread
 emitter.send(dataset1);

 // and again
 emitter.send(dataset2);

 // and done
 emitter.complete();

如果要向事件添加更多信息,请使用 SseEventBuilder

event()

的工厂方法

SseEmitter

创建一个实例。使用它来填写 id 和 event 字段。

SseEventBuilder eventBuilder = SseEmitter.event();

emitter.send(
                  eventBuilder
                  .data(dataSet)
                  .name("dataSet-created")
                  .id(String.valueOf(dataSet.hashCode()))
            );

3. Springboot 实现 SSE 服务端推送事件

**

Sever Send Event

**,是

HTTP

协议中的一种,

Content-Type

text/event-stream

,能够保持长连接。

SSE只能发送文本消息(但也可通过**

base64

**等方法进行简单加密)。

3.1 服务端Spring Boot

@CrossOrigin
@RestController
public class SSEController {

    private final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    @PostConstruct
    private void init() {
        Executors.newScheduledThreadPool(10).scheduleWithFixedDelay(() -> {
            System.out.println("sseEmitterMap#" + sseEmitterMap);
            sseEmitterMap.forEach((s, sseEmitter) -> {
                try {
                    sseEmitter.send(SseEmitter.event()
                        .id(UUID.randomUUID().toString())
                        .data(MyEvent.builder().code(222).msg(s + "#" +
                            LocalDateTime.now() + "#" + Thread.currentThread().getName() + "#"
                            + Thread.currentThread().getState()).build())
                        .reconnectTime(3000L)
                        .comment("this is comment")
                    );
                    if (LocalDateTime.now().getSecond() % 2 == 0) {
                        sseEmitter.send(
                            SseEmitter.event()
                                .id("1")
                                .name("customEventName")
                                .data("customData")
                        );
                    }
                } catch (IOException e) {
                    System.out.println("Error#" + e);
//                    sseEmitter.completeWithError(e);
                }
            });
        }, 3, 3, TimeUnit.SECONDS);
    }

    @GetMapping("/test/sse")
    public SseEmitter sseEmitter(@RequestParam("uid") String uid) throws IOException {
        SseEmitter sseEmitter = new SseEmitter(-1L);
//        SseEmitter sseEmitter = new SseEmitter(5L);
        sseEmitter.send(SseEmitter.event().id("1").name("Connected").data(LocalDateTime.now()).reconnectTime(3000));
        sseEmitterMap.put(uid, sseEmitter);

        sseEmitter.onCompletion(() -> {
            System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on completion");
            sseEmitterMap.remove(uid);
        });
        sseEmitter.onTimeout(() -> System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on timeout#" + sseEmitter.getTimeout()));
        sseEmitter.onError(throwable -> System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on error#" + throwable.toString()));
        return sseEmitter;
    }
}

3.2 客户端 JavaScript

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <script>
        window.onload = function () {
            let connectBtn = document.getElementById("connectSSE");
            let disconnectBtn = document.getElementById("disconnectSSE");
            let userIdElement = document.getElementById("userId");
            let userIdInfoElement = document.getElementById("userIdInfo");
            let sse;
            connectBtn.onclick = function () {
                if (!userIdElement.value) {
                    userIdInfoElement.innerText = "userId is empty";
                    console.log("userId is empty")
                    return;
                }
                userIdInfoElement.innerText = userIdElement.value;
                const eventSource = new EventSource('http://localhost:18080/test/sse?uid=' + userIdElement.value);
                eventSource.onopen = (event) => {
                    console.log("onopen", event.readyState, event.target);
                    sse = event.target;
                    let element = document.getElementById("onOpenInfo");
                    element.innerText = JSON.stringify(event.target);
                };
                eventSource.onmessage = (event) => {
                    let element = document.getElementById("onMessageInfo");
                    element.innerText = event.data;
                };
                eventSource.onerror = (event) => {
                    console.log("onerror", event);
                    if (event.readyState === EventSource.CLOSED) {
                        console.log('connection is closed');
                    } else {
                        console.log("Error occured", event);
                    }
                    event.target.close();
                    let element = document.getElementById("onErrorInfo");
                    element.innerText = JSON.stringify(event);
                };
                eventSource.addEventListener("customEventName", (event) => {
                    console.log("Message id is " + event.lastEventId);
                });
            };

            disconnectBtn.onclick = function () {
                if (sse) {
                    sse.close();
                }
            };

        };
    </script>
</head>
<body>

<div>
    <input id="userId" type="text">
    <button id="connectSSE">Connect</button>
    <button id="disconnectSSE">Disconnect</button>
</div>

<div>
    userId: <span id="userIdInfo"></span>
</div>

<div>
    onOpen: <span id="onOpenInfo"></span>
</div>

<div>
    onMessage: <span id="onMessageInfo"></span>
</div>

<div>
    onError: <span id="onErrorInfo"></span>
</div>

</body>
</html>
SpringMVC

封装的

SSE

实现,

Controller

中直接返回

SseEmitter

,不调用

complete()

方法,即可保持长链接。

3.3 超时时间

SseEmitter()

:无参构造,默认超时时间依赖于

Web

容器,容器为

Tomcat

则超时时间为

30

秒。

SseEmitter(Long timeout)

:有参构造,设置超时时间。传入

-1L

表示没有超时时间。

无参构造可通过配置

mvc

属性来设置超时时间,单位毫秒:

spring:
  mvc:
    async:
      request-timeout: 15000

注意:客户端关闭了连接,不管是调用了

event.target.close()

还是关闭了网页,服务端不会触发任何回调。直到服务端调用

send

后才会触发

onError

onCompletion

回调。

服务端触发了

onCompletion

回调后,连接就自动断开了。

3.4 重试机制

浏览器会保持连接一直打开。服务端可以通过调用

complete

completeWithError

方法关闭连接,这两个事件会触发客户端的

error

回调。

当服务端关闭连接或网络错误时,如果客户端不调用

event.target.close()

关闭连接的话,浏览器会发起重新连接。

浏览器默认会等待

3

秒再尝试重新建立连接,并且浏览器会保持重试知道获得

HTTP

请求返回的

200

状态码。

服务端可以通过发送

retry

标志位更改默认

3

秒的等待时间。服务端可以设置标志位为

0

,表示连接关闭则立即发起重试,没有等待时间。

4. 使用 SseEmitter 的异步 REST 控制器

在给定的控制器方法中,我们正在访问数据集(使用您自己的域数据类型)。

  • 有一种数据服务可以从数据库或任何其他来源返回数据集。
  • 然后处理每个数据集(例如从另一个源检索相关信息),这需要时间。这是通过调用方法使用人为延迟来模拟的thread.sleep()
  • 然后使用方法将每个数据集添加到SseEmitter对象emitter.send()
  • 最后emitter.complete()调用来标记请求处理已完成,以便负责发送响应的线程可以完成请求并释放时间来处理下一个响应。
  • 如果在处理请求时遇到任何错误,请通过 完成该过程emitter.completeWithError()。异常将通过 Spring MVC 的正常异常处理,然后完成响应。
@RestController
public class DataSetController {

    private final DataSetService dataSetService;

    public DataSetController(DataSetService dataSetService) {
        this.dataSetService = dataSetService;
    }

    @GetMapping("/emit-data-sets")
    public SseEmitter emitDataSets() {

        SseEmitter emitter = new SseEmitter();

        ExecutorService executor = Executors.newSingleThreadExecutor();

        //Creating a new thread for async processing
        executor.execute(() -> {
            List<DataSet> dataSets = dataSetService.findAll();
            try {
                for (DataSet dataSet : dataSets) {
                    randomDelay();
                    emitter.send(dataSet);
                }
                emitter.complete();
            } catch (IOException e) {
                emitter.completeWithError(e);
            }
        });
        executor.shutdown();

        return emitter;
    }

    private void randomDelay() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

5. 异步配置

为了覆盖默认的异步行为(例如线程池和超时),我们可以实现WebMvcConfigurer接口并覆盖其*configureAsyncSupport()*方法。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
@EnableAsync
public class AsyncConfig implements WebMvcConfigurer {

  @Override
  public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
    
    configurer.setTaskExecutor(mvcTaskExecutor());
    configurer.setDefaultTimeout(30_000);
  }

  @Bean
  public ThreadPoolTaskExecutor mvcTaskExecutor() {
    
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(10);
    threadPoolTaskExecutor.setThreadNamePrefix("mvc-task-");
    return threadPoolTaskExecutor;
  }
}

6. 测试

6.1. JUnit 测试

为了测试上述控制器方法,我使用了springboot发行版附带的mockito

@WebMvcTest(DataSetController.class)
public class DataSetControllerTest {

    @Autowired
    private MockMvc mockMvc;

    @MockBean
    private DataSetService dataSetService;

    @Test
    public void testFetchData() throws Exception {

        Mockito.when(dataSetService.findAll())
            .thenReturn(Arrays.asList(new DataSet(BigInteger.valueOf(1), "data")));

        MvcResult mvcResult = mockMvc.perform(get("/emit-data-sets"))
            .andExpect(request().asyncStarted())
            .andDo(MockMvcResultHandlers.log())
            .andReturn();

        mockMvc.perform(asyncDispatch(mvcResult))
            .andDo(MockMvcResultHandlers.log())
            .andExpect(status().isOk())
            .andExpect(content().json("{\"id\":1,\"name\":\"data\"}"));
    }
}

Test 输出:

MockHttpServletResponse:
           Status = 200
    Error message = null
          Headers = {Content-Type=}
     Content type = text/event-stream;charset=UTF-8
             Body = data:{"id":1,"name":"data"}

    Forwarded URL = null
   Redirected URL = null
          Cookies = []

6.2 浏览器测试

要在浏览器中测试,请使用类SpringAsyncExampleApplication启动应用程序并在浏览器中点击 URL:

http://localhost:8080/emit-data-sets

检查以事件形式返回的服务器响应以及事件之间的延迟。

请注意,

Content-Type

标题的值为,

text/event-stream

表示我们获得了一个事件流。该流可以保持打开状态,它将接收事件通知。每个写入的对象都转换为

JSON

带有

HttpMessageConverter

。每个对象都作为事件数据写入数据标签中。

如果您在 Spring boot 3 中使用 SseEmitter 执行此异步休息控制器示例时遇到任何错误,请在评论区留言!

标签: spring boot 后端 java

本文转载自: https://blog.csdn.net/lilinhai548/article/details/142095127
版权归原作者 月下独码 所有, 如有侵权,请联系我们删除。

“带有 SseEmitter 的 Spring Boot 异步 REST 控制器”的评论:

还没有评论