响应式流和reactor框架进阶
响应式流创建、转换、处理
本文档主要介绍在响应式编程中如何从流中获取数据并处理。
前提条件
假设您已经能掌握Java基础、Maven使用、Lamda表达式、响应式编程等基础。
如何获取流中数据
🌏 说明
1、不要试图从流中获取数据出来,而是先思考需要对流中元素做什么,响应式代码需要使用响应式> 方法(如subscribe())来订阅数据流并触发异步处理。
2、需要对流中的数据进行操作时,都应该使用对应操作符来处理,根据Mono/Flux等提供的操作符> API进行组合操作。
3、如下图reactor官方marble diagrams图示意,我们需要做的就是编写operator部分,而> operator即为Mono/Flux等提供的各类操作符如.map()、.flatMap()等方法。
4、关于操作符的API如果不明白含义时可以看marble diagrams示意图,鼠标放在操作符上即可。
5、响应式里面可以操作非响应式的方法,但非响应式方法内无法返回响应式结果。


流数据产生的时机
官方释义:
在 Reactor 中,当你创建了一条 Publisher 处理链,数据还不会开始生成。事实上,你是创建了 一种抽象的对于异步处理流程的描述(从而方便重用和组装)。
当真正“订阅(subscribe)”的时候,你需要将 Publisher 关联到一个 Subscriber 上,然后 才会触发整个链的流动。这时候,Subscriber 会向上游发送一个 request 信号,一直到达源头 的 Publisher。
🌏 说明
1后端代码直接声明响应式流时需要显示声明.subscribe()才能订阅到数据。
2前端接口调用则无需显示声明.subscribe()(前端本身即为订阅者)。
注意事项
‼️ 警告
在响应式编程中,在任何时候执行业务代码时都不要使用block()、blockFirst()、blockXX()方法,
为了避免使用block(),我们应该尽可能地使用响应式操作符(如map、flatMap、filter等)对数> 据流进行转换和处理。
使用block()方法可能引发以下问题:
1、阻塞线程:调用block()方法会阻塞当前线程,导致无法处理其他并发请求。这会降低系统的吞吐> 量和响应性能。
2、死锁风险:如果在处理响应式流时使用了block()方法,而其中某些操作也依赖于同一个线程的结> 果,则可能导致死锁。
3、内存资源浪费:阻塞调用将持续占用线程,而每个线程都需要额外的内存资源。如果应用程序中 同时有大量的阻塞操作,可能导致线程池耗尽和内存资源浪费。
示例代码
创建 Flux 或 Mono 并订阅它的简单方法
创建Mono流并订阅
Mono<String> noData =Mono.empty();Mono<String> data =Mono.just("foo");//1.
noData.subscribe();//2.
data.subscribe();//3.
代码说明:
1、将String对象转成Mono流
2、订阅Mono流,empty()什么也不会返回,控制台不会打印任何数据。
3、订阅Mono流,打印foo。
Mono更多说明:
●发出一个 T,我已经有了:just
○基于一个 Optional:Mono#justOrEmpty(Optional)
○基于一个可能为 null 的 T:Mono#justOrEmpty(T)
●发出一个 T,且还是由 just 方法返回
○但是“懒”创建的:使用 Mono#fromSupplier 或用 Mono#defer 包装 just
Flux<String> seq1 =Flux.just("foo","bar","foobar");List<String> iterable =Arrays.asList("foo","bar","foobar");//Flux<String> seq2 =Flux.fromIterable(iterable);//1.
seq1.subscribe();//2.
seq2.subscribe();
代码说明:
1、将List对象转成Flux流
2、订阅Flux流,使Flux流开始产生数据,并依次打印foo、bar、foobar。
Flux更多说明:
●发出许多 T,这些元素我可以明确列举出来:Flux#just(T…)
●基于迭代数据结构:
○一个数组:Flux#fromArray
○一个集合或 Iterable类型数据:Flux#fromIterable
○一个Stream类型:Flux#fromStream(Supplier)
○一个连续的数字区间:Flux#range
对序列进行转换
1-1的转换
●1对1地转化(比如字符串转化为它的长度):map()
○类型转换:cast()
○获取流中每个元素的序号:Flux#index
Flux<String> source =Flux.fromIterable(Arrays.asList("blue","green","orange","purple")).map(String::toUpperCase);
source.subscribe(d ->System.out.println("Subscriber: "+d));
1-n的转换
●丢弃流中一些数据empty
Flux.fromIterable(Arrays.asList("blue","green","orange","purple","gray")).flatMap(item ->Mono.justOrEmpty(item.toUpperCase())).flatMap(item ->{if(item.startsWith("g")){returnMono.empty();}returnMono.just(item);}).subscribe();
●对每一个元素执行一个异步操作flatMap
Flux.fromIterable(Arrays.asList("blue","green","orange","purple")).flatMap(item ->Mono.justOrEmpty(item.toUpperCase())).subscribe();
●基于Mono结果流返回多个元素的序列flatMapMany
Mono.just("foo").flatMapMany(foo ->Flux.fromIterable(Arrays.asList("blue","green","orange","purple")).flatMap(item ->Mono.justOrEmpty(item.toUpperCase())).map(upperItem ->newStringBuilder(upperItem).append(" ").append("with").append(" ").append(foo).toString())).subscribe();
●自定义转化方法和/或状态:handle
💡 注意
响应式方法里面不能返回null,如果上一操作符返回的数据内包含null值,响应流会抛出异常,如果一定要处理null值,可以使用handle方法,Mono/Flux均含有handle方法.
1、映射到字母。
2、如果返回的是 null …
3、就不会调用 sink.next 从而过滤掉
publicStringalphabet(int letterNumber){if(letterNumber <1|| letterNumber >26){returnnull;}int letterIndexAscii ='A'+ letterNumber -1;return""+(char) letterIndexAscii;}Flux<String> alphabet =Flux.just(-1,30,13,9,20).handle((i, sink)->{String letter =alphabet(i);//1.if(letter !=null)//2.
sink.next(letter);//3.});
alphabet.subscribe(System.out::println);
创建Flux流并订阅
仅对数据做打印或赋值
不对序列造成改变的情况下,得到通知或执行一些操作
●发出元素:doOnNext
Map<String,Object> map =newHashMap<>();
map.put("demo","value");Mono.just(map).doOnNext(time -> map.put("time",LocalDateTime.now())).subscribe(time ->System.out.println("map:"+ map));
●因错误终止:doOnError
Flux.<String>error(newIllegalArgumentException()).doOnError(System.out::println).subscribe();
●取消:doOnCancel
●订阅时:doOnSubscribe
●请求时:doOnRequest
●序列完成:Flux#doOnComplete,Mono#doOnSuccess
●所有类型的信号(Signal):Flux#doOnEach
●所有结束的情况(完成complete、错误error、取消cancel):doFinally
●记录日志:log
将Flux转成Mono流
●只取第一个元素放到 Mono 中返回:Flux#next()
●最多只取 1 个元素:
○给定序号:Flux#elementAt
○最后一个:.takeLast(1)
■如果为序列空则发出错误信号:Flux#last()
■如果序列为空则返回默认值:Flux#last(T)
●我只想要一个元素(如果多于一个就返回错误)…
○如果序列为空,发出错误信号:Flux#single()
○如果序列为空,发出一个缺省值:Flux#single(T)
○如果序列为空就返回一个空序列:Flux#singleOrEmpty
响应式流中空序列处理
defaultIfEmpty:空流时想要一个默认值来代替
myMethod.emptySequenceForKey("a")// 这个方法返回一个空的 Mono<String>.defaultIfEmpty("")// 将空序列转换为包含字符串 "" 的序列.zipWhen(aString -> myMethod.process("b"))// 当 "" 发出时被调用.subscribe();
switchIfEmpty:空流时响应一个默认流来代替
userService.getFavorites(userId).flatMap(favoriteService::getDetails).switchIfEmpty(suggestionService.getSuggestions()).subscribe();
响应式流中异常处理
创建一个错误序列
创建Flux.error()流
//Flux.error(Throwable):创建一个发出指定错误的 Flux。Flux<Object> fluxWithError =Flux.error(newRuntimeException("Something went wrong"));
创建Mono.error()流
//Mono.error(Throwable):创建一个发出指定错误的 Mono。Mono<Object> monoWithError =Mono.error(newRuntimeException("Something went wrong"));
如果元素超时未发出
模拟一个Flux流中的信号延迟5s才发送,但监听整个流超过3s时会抛出超时异常
TimeoutException
。
Flux<Object> fluxWithTimeout =Flux.just("value").delayElements(Duration.ofSeconds(5)).timeout(Duration.ofSeconds(3));//如果元素在指定时间内未发出,则会超时。
try/catch的表达方式
抛出异常
//Flux.error(Throwable):创建一个发出指定错误的 Flux。Flux<Object> fluxWithError =Flux.error(newRuntimeException("Something went wrong"));
捕获异常并返回默认值
//onErrorReturn(T):捕获异常并返回一个默认值。Mono<Object> monoWithDefault =Mono.error(newRuntimeException("Something went wrong")).onErrorReturn("Default value");
获异常并返回另一个Flux或Mono
//onErrorResume(Function):捕获异常并返回另一个 Flux 或 Mono。Flux<Object> fluxWithFallback =Flux.error(newRuntimeException("Something went wrong")).onErrorResume(e ->Flux.just("Fallback value"));
包装异常后再抛出:
//onErrorMap(Function):捕获异常并对异常进行包装后再抛出。Flux<Object> fluxWithErrorMapping =Flux.just("value").flatMap(value ->{try{//某些操作可能引发异常returnMono.just(value.toUpperCase());}catch(Exception e){returnMono.error(newRuntimeException("Error occurred", e));}});
finally代码块
//doFinally(Consumer):在序列完成时执行 finally 代码块,可以根据 SignalType 进行不同的处理。Mono<Object> monoWithFinally =Mono.just("value").doFinally(signalType ->{if(signalType ==SignalType.ON_COMPLETE){System.out.println("Finally block: Completed");}elseif(signalType ==SignalType.ON_ERROR){System.out.println("Finally block: Error occurred");}});
处理错误
返回一个默认的值:onErrorReturn
//使用 onErrorReturn 操作符来捕获异常并返回一个默认值。Mono<Object> monoWithDefault =Mono.error(newRuntimeException("Something went wrong")).onErrorReturn("Default value");
返回另一个Publisher:onErrorResume
//使用 onErrorResume 操作符来捕获异常并返回另一个 Flux 或 Mono。Flux<Object> fluxWithFallback =Flux.error(newRuntimeException("Something went wrong")).onErrorResume(e ->Flux.just("Fallback value"));
重试:retry
//使用 retry 操作符来在遇到错误时重试。Flux<Object> fluxWithRetry =Flux.just("value").concatWith(Flux.error(newRuntimeException("Something went wrong"))).retry(3);// 重试三次
伴随触发:retryWhen
//使用 retryWhen 操作符来在遇到错误时根据自定义的重试策略重试。importreactor.util.retry.Retry;importjava.time.Duration;Flux<Object> fluxWithRetryWhen =Flux.just("value").concatWith(Flux.error(newRuntimeException("Something went wrong"))).retryWhen(Retry.backoff(3,Duration.ofSeconds(1)));// 指数退避重试,最多重试三次,间隔一秒
处理背压错误
🌏****说明
处理回压错误是确保上游和下游之间的流量控制,以避免潜在的资源耗尽或系统不稳定。Reactor 提供了几种处理回压错误的策略,包括抛出异常、丢弃元素、保留最新元素、以及缓存元素等。以下是示例:
抛出 IllegalStateException:Flux#onBackpressureError
//onBackpressureError():抛出 IllegalStateException。Flux.range(1,1000).onBackpressureError().subscribe(System.out::println);
丢弃策略:Flux#onBackpressureDrop
//onBackpressureDrop():丢弃元素。Flux.range(1,1000).onBackpressureDrop().subscribe(System.out::println);
不丢弃最后一个元素:Flux#onBackpressureLatest
//onBackpressureLatest():保留最新元素。Flux.range(1,1000).onBackpressureLatest().subscribe(System.out::println);
缓存策略(有限或无限):Flux#onBackpressureBuffer
//onBackpressureBuffer():缓存元素。Flux.range(1,1000).onBackpressureBuffer().subscribe(System.out::println);
有限缓存空间并应用给定策略:Flux#onBackpressureBuffer 带有策略 BufferOverflowStrategy
//onBackpressureBuffer(int capacity, BufferOverflowStrategy strategy):指定缓存空间大小和溢出策略。importreactor.core.publisher.BufferOverflowStrategy;Flux.range(1,1000).onBackpressureBuffer(10,BufferOverflowStrategy.DROP_OLDEST).subscribe(System.out::println);
常见响应式操作符
基本概念
Publisher
:发布者(数据流),发布者负责向订阅者发送数据项,并管理数据流的发布。
Subscriber
:订阅者,通过订阅者,可以实现响应式编程中的数据流控制和处理逻辑。
Mono
: 包含0-1个数据的发布者,实现了
Publisher
。
Flux
: 包含0-n个数据的发布者,实现了
Publisher
。
Operator
: 操作符,表示对数据流中的数据的操作描述。用于改变发布者的行为。
🌏****说明
当发布者被订阅时,发布者才开始生产消息:编写代码实际上是使用操作符来一个描述数据处理逻辑,当发布者被订阅时才会执行这些处理逻辑。
常用操作符
**
map
:转换上游数据**
map
操作符用于对流中的每个元素进行转换,并返回一个新的流。它可以将一个类型的流转换为另一个类型的流,或者对原始流中的元素进行修改。
功能描述
- 对流中的每个元素应用指定的转换函数。
- 返回一个包含转换后的元素的新流。
- 保持了原来流的顺序,但元素的类型可能不同。
使用场景
- 数据的转换和映射:通过定义转换函数,将流中的元素从一种类型转换为另一种类型。
- 数据的处理和加工:对流中的元素进行处理,例如提取特定字段、计算属性等。
- 数据的规范化和标准化:对流中的元素进行规范化、标准化或格式化操作。
返回值
map
操作符返回一个新的流,其中包含经过转换函数转换后的元素。
使用示例
假设有一个数据流
source
,包含整数数据,我们使用
map
方法对数据流中的每个整数进行平方操作
importreactor.core.publisher.Flux;publicclassMapOperatorExample{publicstaticvoidmain(String[] args){Flux<Integer> source =Flux.just(1,2,3,4);Flux<Integer> squaredStream = source.map(num -> num * num);
squaredStream.subscribe(
squaredNum ->System.out.println("Squared number: "+ squaredNum));}}
**
mapNotNull
:转换上游数据,并忽略
null
值**
mapNotNull
操作符是用于转换上游数据并忽略其中的
null
值的操作。它类似于
map
操作符,但会自动过滤掉转换后的结果为
null
的元素。
功能描述
- 对流中的每个元素应用指定的转换函数。
- 自动过滤后转换后的结果为
null的元素。 - 返回一个包含非
null转换结果的新流。
使用场景
- 数据的转换和映射:通过定义转换函数,将流中的元素从一种类型转换为另一种类型,同时过滤掉转换结果为
null的元素。
返回值
mapNotNull
方法返回一个新的流,其中包含经过转换函数转换后的非
null
元素。
使用示例
假设有一个数据流
source
,包含字符串数据,我们使用
mapNotNull
操作符对数据流中的每个字符串进行转换操作,并过滤掉映射结果为null的元素。
importreactor.core.publisher.Flux;publicclassMapNotNullOperatorExample{publicstaticvoidmain(String[] args){Flux<String> source =Flux.just("apple","","banana",null,"cherry");Flux<String> mappedStream = source.mapNotNull(str ->{if(str !=null&&!str.isEmpty()){return str.toUpperCase();}else{returnnull;}});
mappedStream.subscribe(
mappedStr ->System.out.println("Mapped string: "+ mappedStr));}}
**
flatMap
:转换上游数据,但是结果是一个数据流,并将这个数据流平铺**
flatMap
操作符用于将上游数据进行转换,并将结果作为一个新的数据流展开(平铺)。它可以将一个元素转换为多个元素的流,并将这些流合并成一个单一的流。
功能描述
- 对流中的每个元素应用指定的转换函数。
- 转换函数返回一个流作为结果。
- 将所有转换后的流合并成一个单一的流。
使用场景
- 数据的转换和拆解:通过定义转换函数,将一个元素转换为多个元素的流,并将这些流合并成单一的流。
- 扁平化数据结构:将嵌套的数据结构展开为扁平的数据流。
返回值
flatMap
操作符返回一个新的流,其中包含转换后的展开(平铺)元素。
使用示例
假设有一个数据流
source
,包含字符串数据,我们使用
flatMap
操作符将数据流中的每个字符串拆分为单个字符,并将这些字符作为新的数据流发送给下游。
importreactor.core.publisher.Flux;publicclassFlatMapOperatorExample{publicstaticvoidmain(String[] args){Flux<String> source =Flux.just("hello","world","reactive");Flux<Character> flatMappedStream = source.flatMap(str ->Flux.fromArray(str.split("")));
flatMappedStream.subscribe(
ch ->System.out.println("Character: "+ ch));}}
**
flatMapMany
:转换
Mono
中的元素为
Flux
(1个转多个)**
flatMapMany
操作符用于将
Mono
中的元素转换为一个包含多个元素的
Flux
。它是对
flatMap
方法的特化,适用于将单个元素转换为多个元素的场景。
功能描述
- 对
Mono中的元素应用指定的转换函数。 - 转换函数返回一个
Flux作为结果。 - 将转换后的Flux中的所有元素合并成一个单一的
Flux。
使用场景
- 单个元素转多个元素:当需要将
Mono中的单个元素转换为多个元素时,可以使用flatMapMany操作符。 - 扩展和拆解数据:将一个包含单个元素的
Mono扩展为包含多个元素的Flux,并进行进一步处理。
返回值
flatMapMany
操作符返回一个新的
Flux
,其中包含转换后的多个元素。
使用示例
假设有一个
Mono
对象
sourceMono
,包含一个字符串数据,我们使用
flatMapMany
操作符将
Mono
中的字符串拆分为单个字符,并作为一个新的
Flux
数据流发送给下游。
importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;publicclassFlatMapManyOperatorExample{publicstaticvoidmain(String[] args){Mono<String> sourceMono =Mono.just("hello");Flux<Character> flatMappedStream = sourceMono.flatMapMany(str ->Flux.fromArray(str.split("")));
flatMappedStream.subscribe(
ch ->System.out.println("Character: "+ ch));}}
filter:过滤元素
filter
操作符用于根据给定的条件过滤流中的元素。它允许只保留符合条件的元素,而过滤掉不符合条件的元素。
功能描述
- 根据给定的条件过滤流中的元素。
- 只保留符合条件的元素,过滤掉不符合条件的元素。
使用场景
- 数据筛选:当需要从数据流中筛选出符合特定条件的元素时,可以使用
filter操作符。 - 数据过滤:用于过滤掉不需要的数据,只保留满足条件的数据。
返回值
filter操作符返回一个包含符合条件的元素的新的Flux流。
使用示例
假设有一个数据流
source
,包含整数数据,我们使用
filter
操作符过滤出数据流中大于
5
的元素。
importreactor.core.publisher.Flux;publicclassFilterOperatorExample{publicstaticvoidmain(String[] args){Flux<Integer> source =Flux.just(3,7,2,9,5);Flux<Integer> filteredStream = source.filter(num -> num >5);
filteredStream.subscribe(
filteredNum ->System.out.println("Filtered number: "+ filteredNum));}}
filterWhen
:异步过滤**
filterWhen
操作符是用于异步过滤数据流中元素的操作符,在进行过滤时可以异步地根据给定条件来判断是否保留元素。
功能描述
- 异步过滤操作:
filterWhen操作符允许使用异步的条件来过滤数据流中的元素。 - 条件判断:每个元素都可以通过异步条件判断来确定是否应该保留在数据流中。
- 保留规则:当条件为
true时,保留元素;当条件为false时,丢弃元素。
使用场景
- 异步条件过滤:当需要根据异步条件来过滤数据流中的元素时,可以使用
filterWhen操作符。 - 延迟判断:适用于需要等待异步操作完成后才能确定是否保留元素的场景。
返回值
filterWhen操作符返回与原始数据流中符合条件的元素类型相同。
使用示例
假设有一个数据流
source
,包含整数数据,我们使用
filterWhen
操作符根据异步条件来过滤出数据流中大于
5
的元素。
importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;publicclassFilterWhenOperatorExample{publicstaticvoidmain(String[] args){Flux<Integer> source =Flux.just(3,7,2,9,5);Flux<Integer> filteredStream = source.filterWhen(num ->Mono.fromCallable(()-> num >5));
filteredStream.subscribe(
filteredNum ->System.out.println("Filtered number: "+ filteredNum));}}
**
concat
:将多个流连接在一起组成一个流(按顺序订阅)**
concat
操作符用于将多个流按照顺序连接在一起,形成一个新的流。它会先订阅并处理第一个流的元素,然后再处理下一个流的元素,以此类推,保持了流的顺序。
功能描述
- 将多个流按照顺序连接在一起。
- 按照连接的顺序依次订阅和处理每个流的元素。
使用场景
- 合并多个流:当需要将多个流合并成一个单一的流,并保持原始流的顺序时,可以使用
concat操作符。 - 依赖前后顺序:当需要确保流的订阅和处理按照特定的顺序进行时,可以使用
concat操作符。
返回值
concat
操作符返回一个新的流,该流是多个流按照顺序连接后的结果。
使用示例
创建两个整数数据流,使用concat操作符按顺序连接它们,并通过订阅处理输出连接后的全部元素的过程。
importreactor.core.publisher.Flux;publicclassConcatOperatorExample{publicstaticvoidmain(String[] args){Flux<Integer> flux1 =Flux.just(1,2,3);Flux<Integer> flux2 =Flux.just(4,5,6);Flux.concat(flux1, flux2).subscribe(System.out::println);}}// 输出结果为:1, 2, 3, 4, 5, 6
**
concatWith
:组合流。(原始流追加另一个流后的结果。)**
concatWith
操作符用于连接两个流,将它们合并为一个新的流。它允许按照顺序将一个流的元素追加到另一个流的末尾。
功能描述
- 连接两个流,将它们合并为一个新的流。
- 将一个流的元素追加到另一个流的末尾。
使用场景
- 当需要将两个流按照顺序合并为一个流时,可以使用
concatWith操作符。 - 适用于需要按照特定的顺序连接多个流的情况。
返回值
concatWith
操作符返回一个新的流,包含合并后的元素,该流是原始流追加另一个流后的结果。
使用示例
通过
concatWith
操作符将
flux2
追加到
flux1
的末尾,形成一个新的
Flux
。最后通过订阅这个新的
Flux
,输出的结果为按顺序连接后的所有元素:
1, 2, 3, 4, 5, 6。
importreactor.core.publisher.Flux;publicclassConcatWithOperatorExample{publicstaticvoidmain(String[] args){Flux<Integer> flux1 =Flux.just(1,2,3);Flux<Integer> flux2 =Flux.just(4,5,6);
flux1.concatWith(flux2).subscribe(System.out::println);}}// 输出结果为:1, 2, 3, 4, 5, 6
**
merge
:将多个流合并在一起,同时订阅流**
merge
操作符用于将多个流合并在一起,同时订阅这些流。它会同时订阅所有的流,并按照元素的到达顺序进行处理。
功能描述
- 将多个流合并在一起。
- 同时订阅所有的流。
- 按照元素的到达顺序依次处理这些流的元素。
使用场景
- 合并多个流:当需要将多个流合并成一个单一的流时,可以使用
merge操作符。 - 并发处理多个流:当需要并发地处理多个流中的元素时,可以使用
merge操作符。
返回值
merge
操作符返回一个新的流,其中包含合并后的多个原始流中的所有元素。
使用示例
通过
merge
操作符将这两个
Flux
合并为一个新的
Flux
,并通过订阅这个新的
Flux
,同时处理
flux1
和
flux2
的元素并输出。由于
merge
是并行合并多个
Flux
,所以输出的结果可能是乱序的。
importreactor.core.publisher.Flux;publicclassMergeOperatorExample{publicstaticvoidmain(String[] args){Flux<Integer> flux1 =Flux.just(1,2,3);Flux<Integer> flux2 =Flux.just(4,5,6);Flux.merge(flux1, flux2).subscribe(System.out::println);}}
**
zip
:压缩多个流中的元素**
zip
操作符用将多个流中的元素进行压缩,即按照索引位置一对一地组合这些流的元素。它会从每个流中取出相同索引位置的元素,并将它们合并成一个新的元素。
功能描述
- 将多个流中的元素按照索引位置进行压缩。
- 从每个流中取出相同索引位置的元素,并将他们合并成一个新的元素。
- 新的元素按照原始流的顺序排列。
使用场景
- 元素一对一组合:当需要将多个流中的元素按照索引位置进行一对一的组合时,可以使用
zip操作符。 - 数据聚合:当需要将多个相关流的元素聚合到一个新的数据结构中时,可以使用
zip操作符。
返回值
zip
操作符返回一个新的流,其中包含原始流中按照索引位置压缩后的元素。
使用示例
通过
zip
操作符将
flux1
和
flux2
中对应位置的元素进行乘法操作,生成一个新的
Flux
,并通过订阅这个新的
Flux
,输出的结果为对应位置元素相乘的结果:
10, 40, 90。
importreactor.core.publisher.Flux;publicclassZipOperatorExample{publicstaticvoidmain(String[] args){Flux<Integer> flux1 =Flux.just(1,2,3);Flux<Integer> flux2 =Flux.just(10,20,30);Flux.zip(flux1, flux2,(num1, num2)-> num1 * num2).subscribe(System.out::println);}}
**
then
:上游流完成后执行其他的操作.**
then
操作符用于在上游流完成后执行其他的操作。它允许在流完成时触发一些额外的逻辑,而不是处理流中的元素。
功能描述
- 在上游流完成后执行其他的操作。
- 不关心上游流中的具体元素,只关注流的完成事件。
- 返回一个新的
Publisher,在上游流完成后触发指定的操作。
使用场景
- 执行清理操作:当需要在上游流完成后执行一些清理逻辑,如关闭资源或释放锁等,可以使用
then操作符。 - 触发异步操作:当需要在上游流完成后触发一些异步操作,如发送通知或触发其他的流,可以使用
then操作符。
返回值
then
操作符返回一个新的
Publisher
,它会在上游流完成后触发指定的操作。
使用示例
创建了一个包含字符串"
Hello
"的
Mono
,当流成功完成时会打印一条消息。然后使用
then
操作符,在流完成后添加了一个新的
Mono
,包含字符串"
World
"。在
then
操作符执行后,会打印一条消息表示执行了
then
操作,并输出"
World
"。最终通过订阅执行整个流程。
importreactor.core.publisher.Mono;publicclassThenOperatorExample{publicstaticvoidmain(String[] args){Mono.just("Hello").doOnSuccess(value ->System.out.println("Source stream completed with value: "+ value)).then(Mono.just("World")).doOnSuccess(value ->System.out.println("Then operator executed with value: "+ value)).subscribe();}}
doOnNext:流中产生数据时执行.
doOnNext
操作符用于在流中产生数据时执行一些额外的操作。它允许我们观察到流中每个元素的生成,并在每个元素到达时执行指定的逻辑。
功能描述
- 在流中产生数据时执行额外的操作。
- 对每个元素进行观察和处理,而不会改变流的内容。
- 不影响流的传递,只是在每个数据项上附加附加操作。
使用场景
- 调试和日志记录:当需要跟踪流中每个元素的值,或者在特定条件下记录日志时,可以使用
doOnNext操作符。 - 副作用触发:当需要在流中产生数据时触发其他副作用操作,如发送通知、更新状态等,可以使用
doOnNext操作符。
返回方法
doOnNext
操作符返回原始的数据流,不会改变流中的数据元素。
使用示例
通过
doOnNext
操作符,在每次处理元素时输出一条日志记录。然后通过
map
操作符对每个元素进行乘以10的操作。最终通过订阅输出经过处理后的结果。
importreactor.core.publisher.Flux;publicclassDoOnNextOperatorExample{publicstaticvoidmain(String[] args){Flux.just(1,2,3).doOnNext(num ->System.out.println("Processing element: "+ num)).map(num -> num *10).subscribe(System.out::println);}}
**
doOnError
:发送错误时执行.**
doOnError
操作符用于在流发送 错误时执行一些额外的操作。它允许观察到流中发生的错误,并在错误发生时执行指定的逻辑。
功能描述
- 当流遇到错误时,执行指定的逻辑。
- 对每个错误进行观察和处理,而不会改变流的内容。
- 可以用于调试、记录日志或执行其他副作用操作。
使用场景
- 错误处理和记录:当需要捕获和处理流中发生的错误,并记录相关信息时,可以使用
doOnError操作符。 - 调试和故障排除:当需要观察和诊断流中发生的错误,以便进行调试和故障排查时,可以使用
doOnError操作符。
返回值
doOnError
操作符返回与原始流相同类型的新流,其中包含了原始流中的所有元素(包括错误)。
使用示例
通过
map
操作符对每个元素进行除法操作,当遇到除以
0
的情况时会产生错误。通过
doOnError
操作符,在遇到错误时输出错误信息。然后使用
onErrorResume
操作符来处理错误情况,将错误替换为
-1
。最终通过订阅输出处理后的结果。
importreactor.core.publisher.Flux;publicclassErrorHandlingExample{publicstaticvoidmain(String[] args){Flux.just(1,2,0,4).map(num ->10/ num).doOnError(error ->System.err.println("Error occurred: "+ error.getMessage())).onErrorResume(e ->Flux.just(-1)).subscribe(System.out::println);}}
**
doOnCancel
:流被取消时执行**
doOnCancel
操作符用于在流被取消时执行一些额外的操作。它允许观察到流被取消的事件,并在取消发生时执行指定的逻辑。
功能描述
- 当流被取消时,执行指定的逻辑。
- 可以用于资源释放、清理操作或记录相关信息。
- 对取消事件进行观察和处理,而不会改变流的内容。
- 不影响流的传递,只是在取消发生时附加额外的操作。
使用场景
- 资源释放和清理:当需要再流被取消时执行一些资源释放或者清理的操作,如关闭数据库连接、停止定时任务等,可以使用
doOnCancel操作符。 - 日志记录和统计:当需要观察和记录流被取消的次数,时间等信息时,可以使用
doOnCancel操作符。
返回值
doOnCancel
操作符返回与原始流相同类型的新流,其中包含了原始流中的所有元素(如果有)。
使用示例
importreactor.core.publisher.Mono;importjava.time.Duration;publicclassDoOnCancelOperatorExample{publicstaticvoidmain(String[] args)throwsInterruptedException{Mono<String> requestMono =Mono.delay(Duration.ofSeconds(5)).doOnCancel(()->System.out.println("Client disconnected")).map(i ->"Response");// 模拟客户端发送请求System.out.println("Sending HTTP request");
requestMono.subscribe(response ->{System.out.println("Received response: "+ response);});// 模拟客户端在未收到响应之前断开连接Thread.sleep(2000);System.out.println("Client disconnected");}}
**
doOnComplete
:用于在流完成时执行指定的操作。**
doOnComplete
操作符用于在流完成时执行指定的操作。它允许在流正常终止时执行一些附加的逻辑。
功能描述
doOnComplete操作符订阅一个流,并在该流正常终止时执行给定的操作。- 通常用于流完成后执行一些清理操作、记录日志或者发通知等。
使用场景
- 当需要在流完成时执行一些额外的操作时,可以使用
doOnComplete操作符。 - 适用于需要在流结束后进行附加处理的情况。
返回值
doOnComplete
操作符返回一个新的流,该流与原始流相同。
使用示例
创建了一个 Flux 数据流 source,包含了整数 1 到 5。然后使用 map 操作符对每个整数进行乘以 2 的操作。接下来,使用 doOnComplete() 操作符在流完成时打印一条消息。最后,订阅处理过的数据流,并打印输出每个经过处理的数值。
importreactor.core.publisher.Flux;publicclassDoOnCompleteOperatorExample{publicstaticvoidmain(String[] args){Flux<Integer> source =Flux.range(1,5);Flux<Integer> processedStream = source
.map(num -> num *2).doOnComplete(()->System.out.println("Stream completed"));
processedStream.subscribe(
processedNum ->System.out.println("Processed number: "+ processedNum));}}
**
onErrorContinue
:流发生错误时,继续处理数据而不是终止整个流.**
onErrorContinue
操作符的作用是在流发生错误时,继续处理数据而不是立即终止整个流。它允许在遇到错误时进行特定的处理,并继续处理后续的数据。
功能描述
- 在流发生错误时,继续处理数据而不是终止整个流。
- 允许我们对错误进行特定的处理,并尝试继续处理后续的数据项。
- 在错误处理期间可以选择忽略、替换或转换错误的数据项。
使用场景
- 容错和错误处理:当需要对流中的错误进行特定的处理,并继续处理后续的数据项时,可以使用
onErrorContinue操作符。 - 部分失败处理:当流中的某些数据项可能会导致错误,但希望继续处理其他数据项时,可以使用
onErrorContinue操作符。
返回值
onErrorContinue
操作符返回与原始流相同类型的新流,其中包含了原始流中的所有元素,包括经过特定处理的错误数据项。
使用示例
创建了一个
Flux
数据流
source
,使用
map
操作符对每个整数进行除法操作,当遇到除以
0
的情况时会抛出异常。接着使用
onErrorContinue
方法来处理遇到的异常,输出错误信息并继续使用默认值。最后订阅处理过的数据流,分别输出每个经过处理的数值和最终的错误信息。
importreactor.core.publisher.Flux;publicclassOnErrorContinueOperatorExample{publicstaticvoidmain(String[] args){Flux<Integer> source =Flux.just(1,2,0,4,5);Flux<Integer> processedStream = source
.map(num ->10/ num).onErrorContinue((error, value)->{System.out.println("Error occurred: "+ error.getMessage()+". Using default value instead.");});
processedStream.subscribe(
processedNum ->System.out.println("Processed number: "+ processedNum),
error ->System.out.println("Final error: "+ error.getMessage()));}}
**
onErrorResume
用于处理流中发生的错误,并返回一个备用的流来继续处理。**
功能描述
onErrorResume操作符用于捕获流中的错误,并根据需要返回一个备用的流来替代原始的错误流。当源流发生错误时,onErrorResume可以将控制流转到备用流上,从而避免流终止并提供容错机制。
使用场景
- 当我们希望在流中出现错误时进行容错处理,例如返回默认值、重试请求、切换到备用数据源等情况时,可以使用
onErrorResume操作符。
返回值
onErrorResume操作符返回一个新的流,该流可能是原始流的修改版本或者备用流。
使用示例
创建了一个
Flux
数据流
source
,使用
map
操作符对每个整数进行除法操作,当遇到除以 0 的情况时会抛出异常。接着使用
onErrorResume
操作符来处理遇到的异常,输出错误信息并切换到备用的数据流 。最后订阅处理过的数据流,分别输出每个经过处理的数值和最终的错误信息。
importreactor.core.publisher.Flux;publicclassOnErrorResumeOperatorExample{publicstaticvoidmain(String[] args){Flux<Integer> source =Flux.just(1,2,0,4,5);Flux<Integer> processedStream = source
.map(num ->10/ num).onErrorResume(error ->{System.out.println("Error occurred: "+ error.getMessage()+". Switching to default stream.");returnFlux.just(-1,-2,-3);});
processedStream.subscribe(
processedNum ->System.out.println("Processed number: "+ processedNum),
error ->System.out.println("Final error: "+ error.getMessage()));}}
**
defaultIfEmpty
:当流为空时,使用默认值.**
defaultIfEmpty
操作符的作用是在流为空时使用默认值。它允许定义一个默认值,在流为空的情况下返回该默认值。
功能描述
- 当流为空时,使用指定的默认值替代。
- 允许在流为空时返回一个默认值,以避免特殊处理空流的情况。
- 可以用于设置默认结果、避免空指针异常等场景。
使用场景
- 默认结果设置:当流可能为空,但需要返回一个默认结果时,可以使用
defaultIfEmpty操作符。 - 空值处理:当需要处理可能为空的流,并避免出现空指针异常时,可以使用
defaultIfEmpty操作符。
返回值
defaultIfEmpty
操作符返回与原始流相同类型的新流,其中包含了原始流中的所有元素,或者是指定的默认值。
使用示例
创建了一个空的
Flux
数据流
source
。然后使用
defaultIfEmpty
操作符来处理空数据流的情况,返回一个包含默认值
-1
的新数据流。最后订阅处理过的数据流,如果原始数据流为空,则输出默认值
-1
,否则不会有任何输出。
importreactor.core.publisher.Flux;publicclassDefaultIfEmptyOperatorExample{publicstaticvoidmain(String[] args){Flux<Integer> source =Flux.empty();Flux<Integer> processedStream = source
.defaultIfEmpty(-1);
processedStream.subscribe(
processedNum ->System.out.println("Processed number: "+ processedNum),
error ->System.out.println("Final error: "+ error.getMessage()));}}
**
switchIfEmpty
:当流为空时,切换为另外一个流.**
switchIfEmpty
操作符的作用是在流为空时切换为另一个流。它允许定义一个备选流,在原始流为空的情况下使用备选流作为替代。
功能描述
- 当流为空时,切换到备选流。
- 允许在流为空的情况下,使用备选流替代原始流的处理逻辑。
- 可以用于设置备选数据源、提供默认值等场景。
使用场景
- 备选数据源:当需要再原始流为空时切换到备选数据源时,可以使用
switchIfEmpty操作符。 - 提供默认值:当需要为空流提供默认值或备选结果时,可以使用
switchIfEmpty操作符。
返回值
switchIfEmpty
操作符返回一个新的流,根据原始流是否为空进行切换后的结果。
使用示例
创建了一个空的
Flux
数据流
source
,以及一个备用的
Flux
数据流
backupStream
,使用
switchIfEmpty
操作符来处理空数据流的情况,切换到备用数据流
backupStream
。最后订阅处理过的数据流,如果原始数据流为空,则输出备用数据流中的数据;如果原始数据流不为空,则会输出原始数据流中的数据。
importreactor.core.publisher.Flux;publicclassSwitchIfEmptyOperatorExample{publicstaticvoidmain(String[] args){Flux<Integer> source =Flux.empty();Flux<Integer> backupStream =Flux.just(-1,-2,-3);Flux<Integer> processedStream = source
.switchIfEmpty(backupStream);
processedStream.subscribe(
processedNum ->System.out.println("Processed number: "+ processedNum),
error ->System.out.println("Final error: "+ error.getMessage()));}}
**
as
:将流作为参数,转为另外一个结果**
as
操作符用于将流作为参数,转换为另外一个结果。它允许对流进行转换操作,并返回一个新的结果对象。
功能描述
- 将流作为参数,转换为另外一个结果。
- 允许对流进行转换操作,并返回一个新的结果对象。
使用场景
- 当需要对流进行转换操作,并将结果用于其他用途时,可以使用
as操作符。 - 适用于需要将流转换为其他类型或结果的情况。
返回值
as
操作符返回一个新的结果对象,根据具体使用情况而定。
使用示例
假设有一个从数据库中获取用户信息的流,我们可以使用
as
操作符将流中的数据转换为
User
对象的示例。
importreactor.core.publisher.Flux;publicclassUserProcessor{publicstaticvoidmain(String[] args){Flux<Integer> userIds =Flux.just(1,2,3);Flux<User> users = userIds
.flatMap(userId ->getUserInfoFromDatabase(userId)).map(userInfo ->mapToUserObject(userInfo)).as(userType -> userType);
users.subscribe(user ->System.out.println(user.toString()));}}
**
collectList
:收集元素转换为list集合**
功能描述
- 收集流中的所有元素,并将它们组成一个
List集合。
使用场景
- 当需要将流中的所有元素收集到一个
List集合中时,可以使用collectList操作符。 - 适用于需要对流进行聚合操作的场景。
返回值
collectList
操作符返回一个
Mono<List<T>>
对象,该对象表示一个包含所有收集到的元素的
List
集合。
使用示例
collectList
操作符将源流中的所有整数元素收集到一个列表中。在订阅后,将打印出收集到的列表内容。
importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;importjava.util.List;publicclassCollectListExample{publicstaticvoidmain(String[] args){Flux<Integer> numbers =Flux.just(1,2,3,4,5);Mono<List<Integer>> listMono = numbers.collectList();
listMono.subscribe(list ->System.out.println("Collected List: "+ list));}}
**
cast
:用于将流中的元素类型转换为指定的类型。(父类转子类)**
cast
操作符用于将流中的元素类型转换为指定的类型。它允许我们对流中的元素进行类型转换操作。
功能描述
cast操作符将流中的元素强制转换为指定的类型。- 它可以用于处理具有父子类关系的元素类型之间的转换。即将父类的元素转换为子类的元素。
使用场景
- 当需要将流中的元素类型转换为目标类型时,可以使用
cast操作符。 - 适用于处理具有父子类关系的元素类型之间的转换。
返回值
cast
操作符返回一个新的流,其中的元素类型已经被转换为指定的类型。
使用示例
cast
操作符将包含不同类型数字的
Flux<Number>
流转换为
Flux<Integer>
流。订阅后,将打印出转换后的整数元素。
importreactor.core.publisher.Flux;publicclassCastExample{publicstaticvoidmain(String[] args){Flux<Number> numbers =Flux.just(1,2.5,3,4.7,5);Flux<Integer> integerFlux = numbers.cast(Integer.class);
integerFlux.subscribe(number ->System.out.println("Number as Integer: "+ number));}}
响应式流调试
在命令式世界,调试通常都是非常直观的:直接看 stack trace 就可以找到问题出现的位置, 以及:是否问题责任全部出在你自己的代码?问题是不是发生在某些库代码?如果是, 那你的哪部分代码调用了库,是不是传参不合适导致的问题?
开启流的日志记录
在响应式流上添加
log()
操作符。
将其加到操作链上之后,它会读(只读,peek)流中的事件(包括 onNext、onError、 onComplete, 以及 订阅(subscribe)、 取消)(cancel)、和 请求(request))。
🌍 说明
log操作符通过 SLF4J 使用类似 Log4J 和 Logback 这样的公共的日志工具, 如果 SLF4J 不存在的话,则直接将日志输出到控制台。
控制台使用 System.err 记录 WARN 和 ERROR 级别的日志,使用 System.out 记录其他级别的日志。
编程方式记录
1、
doOnNext
:通常用于在数据流中的每个元素被处理之前执行一些操作,例如日志记录或调试信息的打印。
Observable.just(1,2,3).doOnNext(data -> log.info("Processing data:,{}",data)).subscribe();
2、
doOnError
:通常用于在发生错误时执行一些清理操作或记录错误信息。
Observable.error(newRuntimeException("Something went wrong!")).doOnError(error -> log.error("Error occurred:,{}",error.getMessage()).subscribe();
3、
doOnEach
:可以用于在任何事件发生时执行一些通用的操作。
Observable.just(1,2,3).doOnEach(notification ->{if(notification.isOnNext()){
log.info("Received data:,{}",notification.getValue());}elseif(notification.isOnError()){
log.info("Error occurred:,{}",notification.getError().getMessage());}elseif(notification.isOnComplete()){
log.info("Stream completed successfully");}}).subscribe();
响应式流统计、聚合
统计
count:统计元素个数
- 用于计算一个
Flux中包含的元素数量。
publicvoidcountExample(){Flux<Integer> num =Flux.just(1,2,3,4);
num.count().subscribe(count ->System.out.println("count:"+ count));}// 输出结果为:count:4
reduce:累积操作
- 用于将流中的元素按照给定的逻辑进行累积计算,最终返回一个单一的结果。
//输出流中最大值publicvoidreduceExample(){Flux<Integer> num =Flux.just(5,6,8,4,62,3);
num.reduce((max,next)->Math.max(max,next)).subscribe(max->System.out.println("最大值:"+max));}// 输出结果为:最大值:62//对流中元素到进行累乘publicvoidreduceExample2(){Flux<Integer> range =Flux.range(1,10);
range.reduce((now,next)->now*next).subscribe(mul->System.out.println("1-10的累乘积为:"+mul));}//输出结果为:3628800
Flux 转化为集合
collect:转化为自定义集合
- 用于将
Flux转换为自定义集合。例如List、Set、Map等。
//使用collect将流转成ListpublicvoidcollectToListExample(){Flux<String> flux =Flux.just("abc","张三","apple","123","_");
flux.collect(Collectors.toList()).subscribe(list->System.out.println("Flux流转List:"+list));}//输出结果为:Flux流转List:[abc, 张三, apple, 123, _]//使用collect将流转成MappublicvoidcollectToMapExample(){Flux<String> flux =Flux.just("abc","张三","apple","1234","_");
flux.collect(Collectors.toMap(k->k.length(),v->v))//使用元素长度为key,值为value.subscribe(map->System.out.println("Flux流转Map:"+map));}//输出结果为:Flux流转Map:{1=_, 2=张三, 3=abc, 4=1234, 5=apple}
collectList:转化为一个List集合。
collectList会遍历流中的每个元素,并将它们转为List集合。在收集元素到List时,会保留它们在流中的顺序。
//流转化为 ListpublicvoidcollectListExample(){Flux<String> flux =Flux.just("f","a","b","c","d","e");
flux.collectList().subscribe(list->System.out.println("Flux转换为List集合:"+list));}//输出结果为:Flux转换为List集合:[f, a, b, c, d, e]
collectSortList:转化为一个List集合并排序。
//流转化为 List(顺序)publicvoidcollectSortListExample(){Flux<String> flux =Flux.just("f","a","b","c","d","e");
flux.collectSortedList().subscribe(list->System.out.println("Flux转换为排序List集合(顺序):"+list));}//输出结果为:Flux转换为排序List集合(顺序):[a, b, c, d, e, f]//流转化为 List(逆序)publicvoidcollectSortListExample2(){Flux<String> flux =Flux.just("f","a","b","c","d","e");
flux.collectSortedList(Comparator.reverseOrder()).subscribe(list->System.out.println("Flux转换为排序List集合(逆序):"+list));}//输出结果为:Flux转换为排序List集合(顺序):[f, e, d, c, b, a]
collectMap:转换为一个Map集合
- 可以通过提供两个
Function对象来指定如何从流中的元素中提取键和值。第一个Function用于提取键,第二个Function用于提取值。如果不提供第二个Function,则默认值为流中的元素本身。
//流转化为 MappublicvoidcollectMapExample(){Flux<String> fruit =Flux.just("banana","perry","apple");
fruit.collectMap(k->k)//map的key就元素本身.subscribe(map->System.out.println("Flux转换为Map集合:"+map));}//输出结果为:Flux转换为Map集合:{banana=banana, apple=apple, perry=perry}
collectMultimap:转换为一个Map集合并分组
collectMultimap会遍历流中的每个元素,并将它们按照指定的键进行分组,将每个键对应的元素收集到一个集合中。collectMultimap是用于将流中的元素收集到一个Map中,一个key有多个value。- 可以通过提供两个 Function 对象来指定如何从流中的元素中提取键和值。第一个
Function用于提取键,第二个Function用于提取值。如果不提供第二个Function,则默认值为流中的元素本身。
//流转化为 MappublicvoidcollectMultiMapExample(){Flux<String> fruit =Flux.just("123","12","1342","123456","321","35");
fruit.collectMultimap(k->k.length())//以元素长度为key.subscribe(map->System.out.println("Flux转换为Map集合:"+map));}//输出结果为:Flux转换为Map集合:{2=[12, 35], 3=[123, 321], 4=[1342], 6=[123456]}
聚合
merge:合并流
- 用于将多个流合并在一起,同时订阅这些流。并按照元素的到达顺序依次处理这些流的元素,先到先合并。
publicvoidmergeExample(){Flux<Integer> flux1 =Flux.just(1,2,5);Flux<Integer> flux2 =Flux.just(5,6,3);Flux.merge(flux1,flux2).subscribe(System.out::println);}// 输出结果:输出新流 1 2 5 5 6 3
concat:按序连接
- 用于将多个流合并在一起,形成一个新的流。它会先订阅并处理第一个流的元素,然后再处理下一个流的元素,以此类推,保持了流的顺序。
publicvoidconcatExample(){Flux<Integer> flux1 =Flux.just(1,2,5);Flux<Integer> flux2 =Flux.just(5,6,3);Flux.concat(flux2,flux1).subscribe(System.out::println);}// 输出结果:输出新流 5 6 3 1 2 5
concatWith:连接两个流
- 用于将当前的
Flux流与指定的Flux流连接起来,形成一个新的Flux流。 - 连接后的新
Flux流中,当前Flux流的元素将排在前面,指定的Flux流的元素将排在后面,保持它们的顺序不变。
publicvoidconcatWithExample(){Flux<Integer> flux1 =Flux.just(1,2,5);Flux<Integer> flux2 =Flux.just(5,6,3);
flux2.concatWith(flux1).subscribe(System.out::println);}//将flux连到flux2后面//输出结果: 5 6 3 1 2 5
zip:照索引位置一对一地组合这些流的元素
zip会从每个输入的Flux流中取出相同位置的元素,并将它们按照指定的组合函数进行组合,生成一个新的元素。- 可以组合任意数量的
Flux流,但要求它们的长度必须相同,否则在长度最短的流结束后,zip操作将停止组合。
// 把两个流中元素组合publicvoidzipExample(){Flux<Integer> flux1 =Flux.just(1,2,5,5);Flux<Integer> flux2 =Flux.just(5,6,3);Flux.zip(flux1,flux2,(x,y)->x+y).subscribe(System.out::println);}//流的数量必须相等,否则短的流结束后,zip停止操作//输出结果6 8 8// 把三个流中元素组成publicvoidzipExample2(){Flux<Integer> flux1 =Flux.just(1,2,5);Flux<Integer> flux2 =Flux.just(5,6,3);Flux<Integer> flux3 =Flux.just(5,6,4);Flux.zip(flux1,flux2,flux3).map(tuple->tuple.getT1()+tuple.getT2()+tuple.getT3()).subscribe(System.out::println);}// 输出结果:11 14 12
zipWith:将两个数据流中的元素一一配对,并将它们组合成新的元素
publicvoidzipWithExample(){Flux<Integer> flux1 =Flux.just(1,2,5);Flux<Integer> flux2 =Flux.just(5,6,3);
flux1.zipWith(flux2,(x,y)->x+y).subscribe(System.out::println);}// 输出结果:6 8 8
combineLatest:合并新元素
combineLatest会持续跟踪多个Flux流中最新的元素,并且在任何一个Flux流产生新元素时,会取所有Flux流中最新的元素进行组合。
publicvoidcombineLatestExample(){Flux<Integer> flux1 =Flux.just(1,2,3);Flux<Integer> flux2 =Flux.just(4,5,6);Flux.combineLatest(flux1, flux2,(a, b)-> a + b).subscribe(System.out::println);}//flux1中最新的元素3去和flux2里的元素组合// 输出结果 7 8 9
响应式定时操作
添加时间操作
elapsed:添加间隔时间
- 用于为每个元素添加自订阅开始以来到元素发出的时间间隔信息。
publicvoidelapsedExample(){Flux<Integer> flux =Flux.just(1,2,3,4,5).delayElements(Duration.ofSeconds(1));// 延迟每个元素发出 1 秒Flux<Tuple2<Long,Integer>> elapsedFlux = flux.elapsed();
elapsedFlux.subscribe(tuple ->{Long interval = tuple.getT1();// 获取时间间隔Integer value = tuple.getT2();// 获取原始值System.out.println("Interval: "+ interval +", Value: "+ value);});// 为了让程序持续运行,让主线程休眠一段时间try{Thread.sleep(10000);// 让程序运行 10 秒}catch(InterruptedException e){
e.printStackTrace();}}
timestamp:添加时间戳
- 用于为每个元素添加时间戳信息。
publicvoidtimesTampExample(){Flux<Integer> flux =Flux.just(1,2,3,4,5);Flux<Tuple2<Long,Integer>> timestampFlux = flux.timestamp();// 使用timestamp操作符添加时间信息
timestampFlux.subscribe(tuple ->{Long timestamp = tuple.getT1();// 获取时间戳Integer value = tuple.getT2();// 获取原始值System.out.println("Timestamp: "+ timestamp +", Value: "+ value);});}
超时操作
timeout
- 用于设置一个超时时间,如果在指定时间内没有收到新的数据项或者完成信号,就会触发超时错误。
- 这个操作符对于处理需要及时响应的场景非常有用,比如网络请求超时、等待用户输入超时等。
假设我们有一个需求:从一个数据源获取数据,并在一定时间内如果没有新的数据到达,则认为超时。我们可以使用
timeout
操作符来实现这个功能。
publicvoidtimeoutExample(){// 模拟数据源,每隔一段时间发送一个数据项Flux<Integer> dataSource =Flux.just(1,2,3,4).delayElements(Duration.ofSeconds(2));// 每隔 2 秒发送一个数据项// 在数据源上应用 timeout 操作符,设置超时时间为 1 秒Flux<Integer> timeoutFlux = dataSource.timeout(Duration.ofSeconds(1));// 订阅数据流,并处理超时事件
timeoutFlux.subscribe(// 处理正常数据项
item ->System.out.println("Received item: "+ item),// 处理超时事件
error ->System.out.println("Timeout error occurred: "+ error));// 为了让程序持续运行,让主线程休眠一段时间try{Thread.sleep(10000);// 让程序运行 10 秒}catch(InterruptedException e){
e.printStackTrace();}}
定时操作
interval :定时任务或者周期性操作
- 用于创建一个周期性地发射递增的
Long类型数据序列的Flux。 - 在指定的时间间隔内生成一个递增的序列,从 0 开始,每次递增 1,并且发送给订阅者。
publicvoidintervalExample(){// 创建一个每隔一秒发射一个递增的 Long 类型数据序列的 FluxFlux<Long> intervalFlux =Flux.interval(Duration.ofSeconds(1));// 订阅数据流,并处理数据项
intervalFlux.subscribe(// 处理数据项
item ->System.out.println("Received item: "+ item));// 为了让程序持续运行,让主线程休眠一段时间try{Thread.sleep(10000);// 让程序运行 10 秒}catch(InterruptedException e){
e.printStackTrace();}}
延迟操作
delay:整个流的延迟发射
- 用于创建一个延迟发射单个值(或者在一段时间后发射错误或完成信号)的操作符。
- 指定的延迟时间后,发射一个值或者完成信号。
- 这个操作符通常用于创建一个在未来某个时间点触发的事件,比如在执行某个异步操作后的一段时间内。
publicvoiddelayExample(){// 创建一个延迟1秒后发射值的MonoMono<String> delayedMono =Mono.delay(Duration.ofSeconds(1)).map(ignore ->"延迟之后");// 订阅Mono
delayedMono.subscribe(
result ->System.out.println("Received value: "+ result),
error ->System.err.println("Error occurred: "+ error),()->System.out.println("Completed"));// 阻塞主线程,以便观察输出try{Thread.sleep(2000);}catch(InterruptedException e){
e.printStackTrace();}}
Mono#delayElement :单个元素的延迟
- 用于在
Mono发射的单个元素上添加延迟。 - 当
Mono发射一个元素时,延迟一段时间后再将该元素发射出去。
publicvoidmonoDelayElementExample(){Mono.just("Hello").delayElement(Duration.ofSeconds(3))// 在发射元素 "Hello" 后延迟 3 秒再发射.doOnNext(System.out::println).subscribe();}
Flux#delayElements :多个元素的延迟
- 用于在 Flux 发射的每个元素上添加延迟。
- 当 Flux 发射一个元素时,延迟一段时间后再将该元素发射出去。
delaySubscription:延迟订阅
- 用于在调用
subscribe方法时,延迟一段时间后再开始实际的订阅操作。 - 比如在订阅前执行一些预处理操作,或者在特定条件下延迟订阅等。
publicvoiddelaySubscriptionExample(){Flux.range(1,5).delaySubscription(Duration.ofSeconds(2))// 延迟 2 秒后开始订阅.subscribe(System.out::println);// 主线程休眠 3 秒以确保延迟订阅生效try{Thread.sleep(3000);}catch(InterruptedException e){
e.printStackTrace();}}
注意事项
delaySubscription方法只会影响订阅操作的时机,不会影响数据流中元素的发射时间。- 如果在调用
subscribe方法前已经开始了数据流的发射,则延迟订阅可能会错过一部分数据。
响应式数据分组
分组
grouping
分组能够根据
key
将源
Flux<T>
拆分为多个批次。对应的操作符是
groupBy
。
每一组用
GroupedFlux<T>
类型表示,使用它的
key()
方法可以得到该组的
key
。
publicclassGroupingDemo{publicstaticvoidmain(String[] args){Flux.just(1,2,3,4,5,6,7,8,9,10).groupBy(num ->(num %2)==1?"odd":"even").concatMap(g -> g.defaultIfEmpty(-1).map(String::valueOf).startWith(g.key())).subscribe(System.out::println);}}
窗口
windowing
window
操作是 根据个数、时间等条件,或能够定义边界的发布者(
boundary-defining Publisher
), 把源
Flux<T>
拆分为
windows
。对应的操作符有
window
、
windowTimeout
、
windowUntil
、
windowWhile
,以及
windowWhen
。
以个数为界:
window(int)
会出现重叠或丢弃的情况:
window(int,int)
StepVerifier.create(Flux.range(1,10).window(5,3)//overlapping windows.concatMap(g -> g.defaultIfEmpty(-1)))//将 windows 显示为 -1.expectNext(1,2,3,4,5).expectNext(4,5,6,7,8).expectNext(7,8,9,10).expectNext(10).verifyComplete();
🌏****说明
如果将两个参数的配置反过来(
maxSize<
skip),序列中的一些元素就会被丢弃掉, 而不属于任何
window。
以时间为界:
window(Duration)
会出现重丢弃或丢弃的情况:
window(Duration,Duration)
StepVerifier.create(Flux.just(1,3,5,2,4,6,11,12,13).windowWhile(i -> i %2==0).concatMap(g -> g.defaultIfEmpty(-1))).expectNext(-1,-1,-1)//分别被奇数 1 3 5 触发.expectNext(2,4,6)// 被 11 触发.expectNext(12)// 被 13 触发.expectNext(-1)// 空的 completion window,如果 onComplete 前的元素能够匹配上的话就没有这个了.verifyComplete();
缓存
buffering
缓存操作之后会发出
buffer
(类型为
Collection<T>
, 默认是 List)。缓存的操作符与窗口的操作符是对应的:
buffer
、
bufferTimeout
、
bufferUntil
、
bufferWhile
, 以及
bufferWhen
。
缓存操作也会有丢弃元素或内容重叠的情况
StepVerifier.create(Flux.range(1,10).buffer(5,3)// 缓存重叠).expectNext(Arrays.asList(1,2,3,4,5)).expectNext(Arrays.asList(4,5,6,7,8)).expectNext(Arrays.asList(7,8,9,10)).expectNext(Collections.singletonList(10)).verifyComplete();
bufferUntil
和
bufferWhile
不会发出空的
buffer
StepVerifier.create(Flux.just(1,3,5,2,4,6,11,12,13).bufferWhile(i -> i %2==0)).expectNext(Arrays.asList(2,4,6))// 被 11 触发.expectNext(Collections.singletonList(12))// 被 13 触发.verifyComplete();
bufferTimeout
publicclassBufferingDemo{publicstaticvoidmain(String[] args)throwsInterruptedException{// 创建一个每隔一秒发射一个元素的 FluxFlux<Integer> sourceFlux =Flux.range(1,10).delayElements(Duration.ofSeconds(1));// 使用 bufferTimeout 操作符,在每个 3 秒的时间间隔内收集元素Flux<List<Integer>> bufferedFlux = sourceFlux.bufferTimeout(3,Duration.ofSeconds(3));// 订阅并输出收集到的缓冲区
bufferedFlux.subscribe(System.out::println);}}
响应式重试动作
retry:重试
对出现错误的序列进行重试,对于上游
Flux
是基于重订阅(re-subscribing)的方式。但实际上已经是一个不同的序列了, 发出错误信号的序列仍然是终止了的。
Flux.interval(Duration.ofMillis(250)).map(input ->{if(input <3)return"tick "+ input;thrownewRuntimeException("boom");}).elapsed().retry(1).subscribe(System.out::println,System.err::println);Thread.sleep(2100);
retryWhen:条件重试
条件重试是一个包含
Flux<Throwable>
作为
retryWhen
的唯一参数被传递给一个
Function
,由开发者自行处理
Flux<Throwable>
流,并声明一段处理函数传递给
Function
并返回一个新的
Publisher<?>
, 从而实现对重试操作的配置。
与
retry
的区别
retryWhen返回的是Flux.empty()retry返回的是error信号.
使用retryWhen实现一个retry(3)
Flux.<String>error(newIllegalArgumentException()).retryWhen(Retry.from(companion -> companion
.zipWith(Flux.range(1,4),(retrySignal, index)->{if(index <4)return index;elsethrowExceptions.propagate(retrySignal.failure());}))).subscribe(i ->System.out.println(System.currentTimeMillis()),System.out::println);
使用retryWhen进行延迟重试
Flux.<String>error(newIllegalArgumentException()).retryWhen(Retry.from(companion -> companion
.doOnNext(s ->System.out.println(s +" at "+LocalTime.now())).zipWith(Flux.range(1,4),(retrySignal, index)->{if(index <4)return index;elsethrowExceptions.propagate(retrySignal.failure());}).flatMap(index ->Mono.delay(Duration.ofMillis(index *100))).doOnNext(s ->System.out.println("retried at "+LocalTime.now())))).subscribe();
- 第一次重试延迟大约 100ms
- 第二次重试延迟大约 200ms
- 第三次重试延迟大约 300ms
响应式过滤操作
替换if-else为filter
命令式编程
int[] ints =newint[10]{1,2,3,4,5,6,7,8,9,10};for(int i=0;i<ints.length;i++){if(ints[i]%2==0){System.out.println("当前数据:"+ints[i])}}
响应式编程
int[] ints =newint[10]{1,2,3,4,5,6,7,8,9,10};Flux.fromXX(ints).filter(i->i%2==2).doOnNext(System.out::println)
取集合中第N个元素
过滤一个序列
filter:过滤元素
- 筛选出符合指定条件的元素,生成一个新的数据流。
- 接收一个谓词函数
(Predicate),用于判断元素是否符合条件。
//filter过滤掉奇数publicvoidfilterExample(){Flux<Integer> flux =Flux.just(1,2,3,4,5,6,7,8,9,10);
flux.filter(i -> i %2==0)//留下偶数,去掉奇数.subscribe(System.out::print);}//输出结果:2 4 6 8 10
filterWhen:过滤元素
filterWhen操作符和filter类似,都用于筛选出符合特定条件的元素。区别在于filterWhen是异步。
//filterWhen过滤掉奇数publicvoidfilterWhenExample(){Flux<Integer> flux =Flux.just(1,2,3,4,5,6,7,8,9,10);
flux.filterWhen(num ->{returnFlux.just(num %2==0);// 这里可以是任何异步操作,返回的是一个 Mono<Boolean>}).subscribe(System.out::println);}//输出结果:2 4 6 8 10
ofType:选择需要的类型
- 过滤源数据流中的元素,只保留指定类型的元素。
//ofType只留Integer类型数据publicvoidofTypeExample(){Flux<Object> flux =Flux.just(1,5,12,"abc","jetlinks",2.0,4); flux.ofType(Integer.class)//只保留Integer类型.filter(i -> i %2==0)//将过滤出来的Interger类型数据再过滤,只留偶数.subscribe(System.out::println);}//输出结果:12 4
ignoreElements:忽略所有元素
- 用于忽略源数据流中的所有元素,不需要处理元素本身,只关心数据流的结束状态的情况。
// 忽略所有元素,只保留序列的完成信号publicvoidignoreElementsExample(){Flux<Integer> flux =Flux.range(1,10);
flux.ignoreElements().doOnTerminate(()->System.out.println("成功"))//flux在成功完成时执行.subscribe();}//输出结果: 成功
distinct:元素去重
//去重并排序Flux<Integer> flux =Flux.just(1,2,3,3,5,6,7,6,9,10,1,2,5,4,5,6,11,8,9,10);
flux.distinct().sort().subscribe(num->System.out.print(num+"\t"));}//输出结果: 1 2 3 4 1 5 6 3 6 7 8 9 10 11
distinctUntilChanged:过滤连续重复的元素
// 去掉连续重复的元素publicvoiddistinctUntilChangedExample(){Flux<Integer> flux =Flux.just(1,1,2,3,4,1,5,5,6,3,6,7,8,9,9,10,11);
flux.distinctUntilChanged().subscribe(num ->System.out.print(num +"\t"));}//输出结果: 1 2 3 4 1 5 6 3 6 7 8 9 10 11
只要一部分序列
take:取指定数量元素
// 只取前面3个元素publicvoidtakeExample1(){Flux<Integer> flux =Flux.just(1,2,3,4,5,6,7,8,9,10);
flux.take(3).subscribe(num ->System.out.print(num +"\t"));}//输出结果: 2 3 4// 取一段时间内发出的元素publicvoidtakeExample2(){Flux<Integer> flux =Flux.just(1,2,3,4,5,6,7,8,9,10);
flux.take(Duration.ofSeconds(1)).subscribe(num ->System.out.println(num +"\t"));}
next:取第一个元素
//只取第一个元素publicvoidnextExample(){Flux<Integer> flux =Flux.range(1,10);
flux.next().subscribe(num ->System.out.println(num +"\t"));}//输出结果:1
limitRequest :限制请求数量
limitRequest方法用于限制在处理Flux或Mono流时的请求数量。它允许你指定每次订阅时可以处理的元素数量。
//只取前面3个publicvoidlimitRequestExample(){Flux<Integer> flux =Flux.just(1,2,3,4,5,6,7,8,9,10);
flux.limitRequest(3).subscribe(num ->System.out.print(num +"\t"));}// 输出结果: 1 2 3
takeLast:从末尾取指定数量元素
// 只取前面3个元素publicvoidtakeExample1(){Flux<Integer> flux =Flux.just(2,3,4,5,6,7,8,9,10);
flux.take(3).subscribe(num ->System.out.print(num +"\t"));}//输出结果: 8 9 10 // 取一段时间内发出的元素publicvoidtakeExample2(){Flux<Integer> flux =Flux.just(1,2,3,4,5,6,7,8,9,10);
flux.take(Duration.ofSeconds(1)).subscribe(num ->System.out.println(num +"\t"));}
takeUtil:取元素直到满足某个条件(包含)
//直到满足某个条件(包含)publicvoidtakeUtilExample(){Flux<Integer> flux =Flux.just(1,2,3,4,5,6,7,8,9,10);
flux.takeUntil(num->num==5).subscribe(num->System.out.print(num+"\t"));}//输出结果:1 2 3 4 5
takeWhile:取元素直到满足某个条件(不包含)
//直到满足某个条件(不包含)publicvoidtakeWhileExample(){Flux<Integer> flux =Flux.just(1,2,3,4,5,6,7,8,9,10);
flux.takeWhile(num -> num <5).subscribe(System.out::print);}//输出结果:1 2 3 4
最多只取 1 个元素
elementAt:通过下标取元素
//通过下标只取一个值publicvoidelementAtExample(){Flux<Integer> flux =Flux.just(1,2,3,4,5,6,7,8,9,10);
flux.elementAt(3).subscribe(System.out::print);}//输出结果:4
last:序列为空时
//如果为序列空则发出错误信号publicvoidlastExample1(){Flux.empty().last().subscribe(System.out::print);}//输出结果:报NoSuchElementException异常//如果序列为空则返回默认值publicvoidlastExample2(){Flux.empty().last("序列为空").subscribe(System.out::println);}//输出结果:序列为空
跳过一些元素
skip:跳过开始n个元素
//跳过前三个元素publicvoidskipExample(){Flux<Integer> flux =Flux.just(1,2,3,4,5,6,7,8,9,10);
flux.skip(3).subscribe(System.out::print);}//输出结果:4 5 6 7 8 9 10
skipLast:跳过最后的 n 个元素
//跳过最后三个元素publicvoidskipLastExample(){Flux<Integer> flux =Flux.just(1,2,3,4,5,6,7,8,9,10);
flux.skipLast(3).subscribe(System.out::print);}//输出结果:1 2 3 4 5 6 7
skipUntil:跳过元素直到满足条件(包含)
//跳过元素直到值等于5publicvoidskipUntilExample(){Flux<Integer> flux =Flux.just(1,2,3,4,5,6,7,8,9,10);
flux.skipUntil(num -> num ==5).subscribe(System.out::print);}//输出结果:5 6 7 8 9 10
skipWhile:跳过元素直到满足条件(不包含)
//跳过元素直到值等于5publicvoidskipWhileExample(){Flux<Integer> flux =Flux.range(1,10);
flux.skipWhile(num -> num <=5).subscribe(System.out::print);}//输出结果: 6 7 8 9 10
采样
sample:给定采样周期来采样
//每2s采集一个元素publicvoidsampleExample(){Flux<Integer> flux =Flux.just(1,2,3,4,5,6,7,8,9);
flux.delayElements(Duration.ofSeconds(1))// 延迟每个元素的发射时间.sample(Duration.ofSeconds(2)).subscribe(System.out::print);try{Thread.sleep(15000);//为了能看到结果,等待一段时间}catch(InterruptedException e){thrownewRuntimeException(e);}}//输出结果: 1 3 5 7 9
sampleFirst:取采样周期里的第一个元素
//取采样周期里的第一个元素publicvoidsampleFirstExample(){Flux<String> flux =Flux.just("A","B","C","D","E","F","G","H","I","J");
flux.sampleFirst(Duration.ofSeconds(2)).subscribe(System.out::println);}//输出结果:A
single:只取一个元素
//我只想要一个元素(如果多于一个就返回错误)publicvoidsingleExample1(){Flux<Integer> flux =Flux.just(1,2,3,4,5,6,7,8,9);
flux.single().subscribe(System.out::print);}//输出结果:返回错误publicvoidsingleExample2(){Flux<Integer> flux =Flux.just(1);
flux.single().subscribe(System.out::print);}//输出结果:1//如果序列为空,发出错误信号publicvoidsingleExample3(){Flux.empty().single().subscribe(System.out::print);}//输出结果:报NoSuchElementException异常//如果序列为空,发出一个缺省值publicvoidsingleExample4(){Flux.empty().single("序列为空").subscribe(System.out::print);}//输出结果:序列为空
singleOrEmpty:只取一个元素,序列为空就返回空序列
//只取一个元素,序列为空就返回空序列publicvoidsingleOrEmptyExample2(){Flux.just("A").singleOrEmpty().subscribe(System.out::print);}//输出结果:ApublicvoidsingleOrEmptyExample(){Flux.empty().singleOrEmpty().subscribe(System.out::print);}//输出结果:空序列
响应式编程常见问题
我写的操作看上去是正确的,但是没有执行.
有以下几种可能:上游流为空,多个流未组合在一起,在不支持响应式的地方使用了响应式
1. 没有使用return关键字返回
错误
publicMono<Response>handleRequest(Request request){// 没有returnthis.findOldData(request);}
正确
publicMono<Response>handleRequest(Request request){returnthis.findOldData(request);}
2. 上游流为空
publicMono<Response>handleRequest(Request request){returnthis.findOldData(request).flatMap(old ->{//这里为什么不执行? return....})}
🌏****说明
当
findOldData返回的流为空时,下游的
flatMap等操作符需要操作流中元素的操作符是不会执行的。 可以通过
switchIfEmpty操作符来处理空流的情况。
3. 多个流未组合在一起
- 只要方法返回值是Mono或者Flux,都不能单独行动。
- 只要方法中调用了任何响应式操作,那这个方法也应该是响应式。(返回Mono或者Flux)
classService{Mono<Void>handleRequest(request);}//错误示例,handleRequest是响应式的,但是此方法没有使用响应式操作。publicResulthandleRequest(Request request){
service.handleRequest(request);return ok;}//正确示例publicMono<Result>handleRequest(Request request){return service
//处理请求.handleRequest(request)//返回结果.thenReturn(ok);}
4. 在不支持响应式的操作符中使用响应式
publicMono<Void>saveLog(Request req,Response resp){...}publicMono<Result>handleRequest(Request request){return service
//处理请求.handleRequest(request)//记录日志 此为错误的用法,saveLog是响应式的,但是doOnNext并不支持响应式操作.doOnNext(response->saveLog(request,response))//返回结果.thenReturn(ok);}
从
doOnNext
方法的语义以及参数
Consumer<T>
可知,此方法是不支持响应式的(
Consumer<T>
只有参数没有返回值)。因此不能在此方法中使用响应式操作。
return service
//处理请求.handleRequest(request)//记录日志.flatMap(response->saveLog(request,response))//返回结果.thenReturn(ok);
5. 在流内部订阅终止了整个流
publicMono<Void>saveLog(Request req,Response resp){...}//错误publicMono<Response>handleRequest(Request request){return service
//处理请求.handleRequest(request)//记录日志 此为错误的用法.flatMap(response->{saveLog(request,response).subscribe();returnMono.emtpy();})//返回结果.thenReturn(ok);}//正确publicMono<Response>handleRequest(Request request){return service
//处理请求.handleRequest(request)//记录日志.flatMap(response->{returnsaveLog(request,response);})//返回结果.thenReturn(ok);}
6. 订阅时机不对
7. 用在 Flux 上的操作符好像没起作用,为啥?
Flux<String> flux =Flux.just("foo","chain");
flux.map(secret -> secret.replaceAll(".","*"));//①
flux.subscribe(next ->System.out.println("Received: "+ next));
错误原因:问题在①, flux 变量并没有改变。
Flux<String> flux =Flux.just("foo","chain");
flux = flux.map(secret -> secret.replaceAll(".","*"));//分开写得 这里记得返回!!
flux.subscribe(next ->System.out.println("Received: "+ next));//或者您可以尝试下面的写法Flux<String> flux =Flux.just("foo","chain").map(secret -> secret.replaceAll(".","*"));
flux.subscribe(next ->System.out.println("Received: "+ next));//如果不存在别的地方需要订阅flux 下面的方法可以尝试Flux.just("foo","chain").map(secret -> secret.replaceAll(".","*")).subscribe(next ->System.out.println("Received: "+ next));
我想获取流中的元素怎么办
不要试图从流中获取数据出来,而是先思考需要对流中元素做什么。
需要对流中的数据进行操作时,都应该使用对应操作符来处理,根据Flux或者Mono提供的操作符API进行组合操作。
publicList<Book>getAllBooks(){List<BookEntity> bookEntities = repository.findAll();List<Book> books =newArrayList(bookEntities.size());for(BookEntity entity : bookEntities){Book book = entity.copyTo(newBook());
books.add(book);}return books;}
错误示例:
publicBookgetAllBooks(){returngetRepository().createQuery().where("id",1).fetchOne().block();}
‼️****警告
在响应式编程中,在任何时候执行业务代码时都不要使用
**block()**方法,使用block()方法可能引发以下问题:
- 阻塞线程:调用block()方法会阻塞当前线程,导致无法处理其他并发请求。这会降低系统的吞吐量和响应性能。
- 死锁风险:如果在处理响应式流时使用了block()方法,而其中某些操作也依赖于同一个线程的结果,则可能导致死锁。
- 内存资源浪费:阻塞调用将持续占用线程,而每个线程都需要额外的内存资源。如果应用程序中同时有大量的阻塞操作,可能导致线程池耗尽和内存资源浪费。
正确示例:
🌏****说明
为了避免使用block(),我们应该尽可能地使用响应式操作符(如map、flatMap、filter等)对数据流进行转换和处理,并使用其他响应式方法(如subscribe())来订阅数据流并触发异步处理。
publicFlux<Book>getAllBooks(){return repository
.findAll().map(entity-> entity.copyTo(newBook()))}
在非响应式方法中如何使用响应式
publicvoidhandleRequest(Request request){//不到万不得已请勿使用block方法//logService.saveLog(request).block()//
logService
.saveLog(request).subscribe(
result->log.debug("保存成功 {}",request),
error->log.warn("保存失败 {}",request,error))}
版权归原作者 haidi8 所有, 如有侵权,请联系我们删除。