更多SpringBoot3内容请关注我的专栏:《SpringBoot3》
期待您的点赞👍收藏⭐评论✍
Spring WebFlux之Reactor事件感知 API
在 Spring Boot 3 中,响应式编程通过 Reactor 库得到了广泛应用,提供了强大的流式数据处理能力。为了增强对流式数据流的调试和处理能力,Reactor 提供了一组非常重要的事件感知(side-effect)API,也就是我们常听到的
doOnXxx
系列方法。
这篇博客将详细介绍
doOnXxx
系列 API 的功能和用法,帮助大家更好地理解它们在响应式流中的作用,并展示其在实际开发中的一些应用场景。
1. 什么是
doOnXxx
系列 API?
doOnXxx
系列方法是 Reactor 提供的一组用于在流操作过程中执行副作用的 API。它们不会改变流的内容或数据流本身,而是允许我们在特定的生命周期事件发生时进行操作(如日志记录、调试、监控等)。
这些 API 名称中的
Xxx
代表不同的事件类型,比如:
doOnNext()
: 当下一个元素被发出时执行操作。doOnError()
: 当流中出现错误时执行操作。doOnComplete()
: 当流完成时执行操作。doOnSubscribe()
: 当订阅发生时执行操作。
这些方法非常适合用于监控、调试或者记录流的行为。
2.
doOnXxx
API 的常用方法
下面我们依次介绍常见的
doOnXxx
API,并通过简单的示例进行演示。
2.1
doOnNext()
doOnNext()
方法允许你在每个元素被发布时执行操作,通常用于对每个数据元素进行日志记录、调试或者进行某种副作用操作。
示例:
Flux<String> flux =Flux.just("Spring","Boot","3","Reactor").doOnNext(value ->System.out.println("Processing value: "+ value)).map(String::toUpperCase);
flux.subscribe(System.out::println);
输出:
在这个例子中,
doOnNext()
被用于每个元素发出时打印日志。这对于调试非常有用,可以清楚看到每个数据元素何时被处理。
2.2
doOnError()
doOnError()
方法允许你在流中出现异常时执行操作,通常用于记录异常信息、执行错误处理逻辑等。
示例:
Flux<Integer> fluxWithError =Flux.just(1,2,0).map(i ->10/ i)// 这里会抛出 ArithmeticException: / by zero.doOnError(e ->System.err.println("Error occurred: "+ e.getMessage()));
fluxWithError.subscribe(System.out::println,
error ->System.err.println("Subscriber received error: "+ error));
输出:
在这个例子中,Flux 被用来创建一个数据流,并且在这个数据流中执行了一些操作,包括可能抛出异常的操作。下面是对消费者和生产者异常捕获的区别:
生产者异常捕获:
- 在生产者端,可以使用 doOnError 方法来捕获并处理异常,这个方法会在数据流中发生错误时被调用。
- doOnError 可以用于记录日志或执行一些清理操作,它不会改变数据流的行为,但数据流会被终止。
消费者异常捕获:
- 在消费者端,可以通过 subscribe 方法的第二个参数(错误处理回调)来捕获并处理异常。
- 这个错误处理回调会在数据流中发生错误时被调用,可以用于记录日志或执行其他错误处理逻辑。
2.3
doOnComplete()
doOnComplete()
方法在流完成时(即没有更多元素发出)执行操作。你可以利用它在流结束时执行一些收尾工作,比如关闭资源、统计处理结果等。
示例:
Flux<String> flux =Flux.just("Spring","Boot","3","Reactor").doOnComplete(()->System.out.println("Stream completed"));
flux.subscribe(System.out::println);
输出:
这里,
doOnComplete()
用于在数据流结束时打印一条日志,通知处理完成。
2.4
doOnSubscribe()
doOnSubscribe()
允许你在流被订阅时执行操作。它通常用于监控订阅事件,适合用于统计订阅数或进行相关的初始化操作。
示例:
Flux<String> flux =Flux.just("A","B","C").doOnSubscribe(subscription ->System.out.println("Subscription started"));
flux.subscribe(System.out::println);
输出:
在这个例子中,当流被订阅时,
doOnSubscribe()
被调用,打印订阅开始的日志。
2.5
doOnCancel()
doOnCancel()
方法在取消订阅时执行操作。取消订阅通常是在消费者不再需要流数据时发生的(例如手动取消订阅或者发生超时等情况),可以用于处理一些资源释放的操作。
示例:
Flux<String> flux =Flux.just("A","B","C").doOnCancel(()->System.out.println("Subscription canceled")).take(2);// 只取前两个元素,第三个元素将被跳过(取消)
flux.subscribe(System.out::println);
输出:
这里
doOnCancel()
在流被取消时执行了取消订阅的操作。
2.6
doFinally()
doFinally()
是一个非常有用的方法,它在流结束时始终会被调用(无论是正常完成、错误还是取消订阅)。它类似于
try-finally
语句中的
finally
,适合做一些无论流如何结束都需要执行的操作,如清理资源等。
示例:
Flux<String> flux =Flux.just("A","B","C").doFinally(signalType ->System.out.println("Stream ended with signal: "+ signalType));
flux.subscribe(System.out::println);
输出:
doFinally()
可以捕捉到不同类型的信号,包括
onComplete
,
onError
和
onCancel
。
2.7
doOnTerminate()
doOnTerminate()
在流完成或出错时执行操作。它是
doOnComplete()
和
doOnError()
的组合,但不区分流是正常完成还是出现错误,只要流结束了,它就会被调用。
示例:
Flux<String> flux =Flux.just("A","B","C").doOnTerminate(()->System.out.println("Stream terminated"));
flux.subscribe(System.out::println);
输出:
它在流结束时总会执行,不管是否出现错误。
2.8
doOnEach()
doOnEach()
是一个非常通用的事件感知 API,它允许对流中的每一个信号(包括 onNext、onError、onComplete 和 onSubscribe)进行统一处理。这个方法会接收一个
Signal
对象,表示当前发生的事件类型,从而可以处理不同的信号类型。
示例:
Flux<String> flux =Flux.just("Spring","Boot","3","Reactor").doOnEach(signal ->{if(signal.isOnNext()){System.out.println("Element received: "+ signal.get());}elseif(signal.isOnError()){System.err.println("Error occurred: "+ signal.getThrowable().getMessage());}elseif(signal.isOnComplete()){System.out.println("Stream completed");}});
flux.subscribe(System.out::println);
输出:
2.9
doOnDiscard()
doOnDiscard()
方法用于处理被 丢弃的元素。当某些元素由于某种原因(例如
filter()
操作或上游取消)没有被使用时,可以通过
doOnDiscard()
来感知这些元素的丢弃,并执行相关的操作(如清理资源、记录日志等)。
可能使用
doOnDiscard
钩子的例子包括以下情况:
filter
: 不符合过滤器的项被视为 “丢弃”。skip
:跳过的项将被丢弃。buffer(maxSize, skip)
与maxSize < skip
:“丢弃的缓冲区” — 缓冲区之间的元素被丢弃。
示例:
Flux<String> flux =Flux.just("AA","BB","C","D","E").filter(s -> s.length()>1).doOnDiscard(String.class, discardedValue ->System.out.println("Discarded: "+ discardedValue));
flux.subscribe(System.out::println);
输出:
2.10
doOnRequest()
doOnRequest()
是一个用于处理 背压请求(request signals) 的 API,它允许你在下游请求元素时执行操作。响应式流中上游发送元素的数量通常由下游通过请求背压机制控制,因此
doOnRequest()
可以帮助我们监控下游对元素的需求。
示例:
Flux<Integer> flux =Flux.range(1,5).doOnRequest(request ->System.out.println("Request for: "+ request +" elements"));
flux.subscribe(newSubscriber<Integer>(){@OverridepublicvoidonSubscribe(Subscription s){
s.request(3);}@OverridepublicvoidonNext(Integer integer){System.out.println("Received: "+ integer);}@OverridepublicvoidonError(Throwable t){}@OverridepublicvoidonComplete(){}});// 请求 3 个元素
输出:
3.
doOnXxx
的应用场景
- 日志记录与调试:在流的不同阶段插入
doOnXxx
,帮助我们记录每个阶段的状态变化或异常情况,从而更好地调试响应式流。 - 监控和统计:我们可以使用
doOnSubscribe()
和doOnComplete()
结合监控系统来统计订阅的数量、完成的流数量,分析流的性能。 - 资源管理:使用
doFinally()
进行资源释放和清理,确保无论流如何结束都能进行相应的收尾工作。 - 错误处理:使用
doOnError()
可以在发生错误时记录日志、发送通知或者做出其他相应的处理。
4. 总结
Reactor 的
doOnXxx
系列 API 是在响应式流中进行事件感知和副作用处理的强大工具。它们的主要作用是让开发者能够在不干扰流式数据处理的情况下,插入额外的操作,如调试、监控、资源清理等。通过合理使用
doOnNext()
、
doOnError()
、
doFinally()
等方法,我们可以更好地理解和控制响应式流的执行过程,从而构建更加健壮和高效的应用程序。
希望这篇文章能帮助你更好地掌握
doOnXxx
系列方法。如果你有任何问题或建议,欢迎讨论!
版权归原作者 CoderJia_ 所有, 如有侵权,请联系我们删除。