目录💻
前言
响应式编程 (Reactive Programming) 是一种声明式编程范式,专注于数据流和变化的传播。随着软件系统日益复杂,对高并发、实时性和弹性的需求不断增长,响应式编程正逐渐成为主流。特别从Spring Boot3开始逐渐越来越重视使用,并且Spring框架为了全面拥抱响应式编程,提供了Spring WebFlux、Spring Data Reactive等模块,为Java开发者构建响应式应用提供了强大的支持。
一、简介
1、响应式编程概述
背景知识
为了应对高并发服务器端开发场景,在2009 年,微软提出了一个更优雅地实现异步编程的方式——Reactive Programming,我们称之为响应式编程。随后,Netflix 和LightBend 公司提供了RxJava 和Akka Stream 等技术,使得Java 平台也有了能够实现响应式编程的框架。
在2017 年9 月28 日,Spring 5 正式发布。Spring 5 发布最大的意义在于,它将响应式编程技术的普及向前推进了一大步。而同时,作为在背后支持Spring 5 响应式编程的框架Spring Reactor,也进入了里程碑式的3.1.0 版本。
什么是响应式编程
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
响应式编程基于reactor(Reactor 是一个运行在 Java8 之上的响应式框架)的思想,当你做一个带有一定延迟的才能够返回的io操作时,不会阻塞,而是立刻返回一个流,并且订阅这个流,当这个流上产生了返回数据,可以立刻得到通知并调用回调函数处理数据。
电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。
响应式传播核心特点之一:变化传播:一个单元格变化之后,会像多米诺骨牌一样,导致直接和间接引用它的其他单元格均发生相应变化。
具体概述
响应式编程 (Reactive Programming) 是一种声明式的编程范式,它关注于数据流和变化的传播。这意味着可以通过定义数据流和它们之间的关系来构建应用程序,当数据发生变化时,应用程序会自动做出响应。
核心概念:
- Publisher:发布者;产生数据流
- Subscriber:订阅者;消费数据流
- Subscription:订阅关系; - 订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素,也可以取消订阅。
- Processor:处理器 - 处理器是同时实现了发布者和订阅者接口的组件,它可以接收来自一个发布者的数据,进行处理,并将结果发布给下一个订阅者。处理器在Reactor中充当中间环节,代表一个处理阶段,允许你在数据流中进行转换、过滤和其他操作。
这种模型遵循Reactive Streams规范,确保了异步流的一致性和可靠性。
应用场景:
- 实时数据流处理: 例如股票交易系统、传感器数据监控、网络游戏等。
- 用户界面开发: 例如响应式Web应用、移动应用等。
- 微服务架构: 响应式编程可以帮助构建更加弹性和可扩展的微服务系统。
- 大数据处理: 响应式编程可以用于处理大规模数据集,例如使用Spark Streaming或Apache Flink。
常用的库和框架
- RxJava: Java的响应式扩展库,提供了丰富的操作符和工具。
- Reactor: Java的响应式编程框架,由Pivotal开发,是Spring WebFlux的基础。
- Kotlin Coroutines: Kotlin的协程库,提供了轻量级的异步编程模型,可以与响应式编程框架集成。
- Spring WebFlux: Spring框架的响应式Web框架,用于构建响应式Web应用。
二、 Reactor实现响应式编程
目前java要实现响应式编程主要就是使用Reactor进行使用, Reactor 也是 Spring WebFlux 的基础,所以在使用上可以与 Spring 框架无缝集成,构建响应式 Web 应用。使用下面我们要介绍的就是Reactor的常用API的使用,基本上学会了Reactor,就可以直接使用Spring WebFlux 构建高性能、可扩展的 Web 应用。
并且使用Spring WebFlux 构建的非阻塞 I/O 和事件驱动模型可以充分利用系统资源,可以有效的提高应用程序的性能和效率。相对于Spring MVC 这种阻塞式来说在性能上会得到很大的提升,
而且响应式编程可以更好地处理并发和异步操作,提高系统的弹性和可扩展性。
1、Flux 和 Mono介绍
在 Reactor 中,Flux 和 Mono 是两个核心组件,用于表示异步数据流。它们都实现了 Reactive Streams 规范中的 Publisher 接口,可以发出零个或多个元素。
Flux:
- 表示一个可以发出零个或多个元素的异步序列。
- 可以用于表示任何类型的数据流,例如用户输入事件、传感器数据、数据库查询结果等。
- 提供了丰富的操作符,用于对数据流进行转换、过滤、合并、延迟等操作。
Flux<String> names =Flux.just("Alice","Bob","Charlie");names.subscribe(System.out::println);// 输出: Alice Bob Charlie
Mono:
- 表示一个最多发出一个元素的异步序列(1个或者0个)。
- 通常用于表示单个结果,例如数据库查询结果、HTTP 请求响应等。
- 也提供了一些操作符,用于对结果进行转换和处理。
Mono<String> name =Mono.just("Alice");
name.subscribe(System.out::println);// 输出: Alice
Flux 和 Mono 的区别:
特性FluxMono发射元素个数零个或多个零个或一个使用场景表示数据流表示单个结果操作符提供丰富的操作符提供一些操作符
Flux 和 Mono 的关系:
- Mono 可以看作是 Flux 的特例,它最多只发出一个元素。
- 可以使用 Flux.from(mono) 将 Mono 转换为 Flux。
- 可以使用 mono.flux() 将 Mono 转换为 Flux。
- 可以使用 flux.single() 或 flux.singleOrEmpty() 将 Flux 转换为 Mono,但前提是 Flux 必须只包含一个元素或为空。
2、常用API使用
在具体使用上Flux和Mono的API使用都差不多,在分类上,我们可以大致分为三类,用来产生生产流的和中间做转换的,还有结束流的
添加依赖
<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency>
2.1、生产流
也就是创建Flux和Mono的API
常用汇总
方法作用
just(T... data)
创建一个发射指定元素的 Flux
fromIterable(Iterable<? extends T> it)
从 Iterable(集合) 创建一个 Flux
fromArray(T[] array)
从数组创建 Flux
fromStream(Stream<? extends T> s)
从Stream中创建 Flux,意味着我们可以把jdk8的Stream也直接进行无缝衔接转为Flux
range(int start, int count)
创建发射指定范围的 Flux,一般可以用作计数器或者读秒配合
delayElements()
。
empty()
创建一个不发射任何元素的 Flux,mono也一样。
Flux<T> error(Throwable error
创建一个发射错误信号的 Flux
2.1.1、直接创建
Flux<T> just(T... data)
:创建一个发射指定元素的 Fluxpublicvoidjust()throwsIOException{Flux<String> just =Flux.just("a","b","c","d","e"); just.subscribe(System.out::println);System.in.read();}/*得到的结果 abcde
Flux<T> fromIterable(Iterable<? extends T> it)
:从 Iterable(集合) 创建一个 FluxpublicvoidfromIterable()throwsIOException{List<String> list =Arrays.asList("a","b","c","d","e");Flux<String> just =Flux.fromIterable(list); just.subscribe(System.out::println);System.in.read();}/*得到的结果 abcde
Flux<T> fromArray(T[] array)
:从数组创建 FluxpublicvoidfromArray()throwsIOException{String[] arrays ={"a","b","c","d","e"};Flux<String> just =Flux.fromArray(arrays); just.subscribe(System.out::println);System.in.read();}/*得到的结果 abcde
Flux<T> fromStream(Stream<? extends T> s)
:从Stream中创建 Flux,意味着我们可以把jdk8的Stream也直接进行无缝衔接转为FluxpublicvoidfromStream()throwsIOException{Stream<String> stream =Stream.of("a","b","c","d","e");Flux<String> just =Flux.fromStream(stream); just.subscribe(System.out::println);System.in.read();}/*得到的结果 abcde
Flux<Integer> range(int start, int count)
:创建发射指定范围的 Flux,一般可以用作计数器或者读秒配合delayElements()
。publicvoidrange()throwsIOException{Flux<Integer> just =Flux.range(0,5).delayElements(Duration.ofSeconds(1));//一秒发射一次 just.subscribe(System.out::println);System.in.read();}/* 得到的结果01234
Flux<T> empty()
:创建一个不发射任何元素的 Flux,mono也一样。publicvoidempty()throwsIOException{Mono<Integer> mono =Mono.empty();Flux<Integer> flux =Flux.empty(); flux.subscribe(System.out::println);System.in.read();}/* 订阅者不会接受到任何结果
Flux<T> error(Throwable error)
:创建一个发射错误信号的 Fluxpublicvoiderror()throwsIOException{Flux<Integer> just =Flux.error(newRuntimeException("执行错误!")); just.subscribe(System.out::println);System.in.read();}/*reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 执行错误!
2.1.2、使用Sinks工具类
Sinks 是一个用于创建各种类型的 Sink工厂类,然后在通过asFlux方法把Sink转为Flux。具体分别为:
One<T> one()
:创建一个只接收一个数据的 Sink。publicvoidone(){Sinks.One<Integer> one =Sinks.one(); one.tryEmitValue(1); one.tryEmitValue(2);Mono<Integer> mono = one.asMono(); mono.subscribe(System.out::println);}/* 不管发射多少个消息到one里都只会读取一个1
many()
:创建一个可以接收多个数据的 Sink。-unicast()
:单播,只能有一个消费者订阅转换后到flux,第二个去订阅时会报错publicvoidunicast()throwsIOException{Sinks.Many<Integer> sink =Sinks.many().unicast().onBackpressureBuffer();// 生产消息 sink.tryEmitNext(1); sink.tryEmitNext(2);Flux<Integer> flux = sink.asFlux(); flux.subscribe(v->System.out.println("p1:"+v));//如果有第二个消费者来消费消息,会直接抛出错误 flux.subscribe(v->System.out.println("p2:"+v)); sink.tryEmitNext(3); sink.tryEmitNext(4);System.in.read();}/* 执行到订阅者2时会报错
-multicast()
:多播。Sinks.many()
方法创建的多播 Sink 可以选择不同的策略解决背压问题。不同策略之间的区别主要体现在 如何处理下游消费者无法及时处理数据的情况。> 背压 (Backpressure):在响应式编程中,背压指的是下游消费者无法及时处理上游生产者发送的数据,导致数据堆积,最终可能导致内存溢出等问题。也就是生产者生产的速度大于消费者消费的速度-onBackpressureBuffer()
:如果下游消费者无法及时处理数据,则 Sink 会将数据缓存到一个缓冲区中,直到消费者能够处理为止,缓冲区可以设置大小publicvoidonBackpressureBuffer()throwsIOException{Sinks.Many<Integer> sink =Sinks.many().multicast().onBackpressureBuffer();//开启缓冲区 sink.tryEmitNext(1); sink.tryEmitNext(2);Flux<Integer> flux = sink.asFlux();//在订阅之前发送的消息会先被放在缓冲区 flux.subscribe(v->System.out.println("p1:"+v)); sink.tryEmitNext(3); sink.tryEmitNext(4); flux.subscribe(v->System.out.println("p2:"+v)); sink.tryEmitNext(5); sink.tryEmitNext(6);System.in.read();}/* 第二个订阅的消费者会从订阅的时间开始接收消息,前面存放缓冲区的消息无法接收到p1:1p1:2p1:3p1:4p1:5p2:5p1:6p2:6
-directBestEffort()
:如果下游消费者无法及时处理数据,则 Sink 会尽力将数据发送给消费者,但可能会丢弃一些数据。publicvoiddirectBestEffort()throwsIOException{Sinks.Many<Integer> sink =Sinks.many().multicast().directBestEffort(); sink.tryEmitNext(1); sink.tryEmitNext(2); sink.tryEmitNext(3); sink.tryEmitNext(4);Flux<Integer> flux = sink.asFlux(); flux.subscribe(v->System.out.println("p1:"+v)); sink.tryEmitNext(5); sink.tryEmitNext(6); flux.subscribe(v->System.out.println("p2:"+v)); sink.tryEmitNext(7); sink.tryEmitNext(8);System.in.read();}/*p1:5p1:6p1:7p2:7p1:8p2:8
-directAllOrNothing()
:如果下游消费者无法及时处理数据,则 Sink 会直接丢弃所有后续数据,并发出一个 onError 信号。操作同上empty()
:创建一个空的 Sink
2.2、中间操作
常用汇总
方法****作用
map()
将每个元素映射到另一个类型的值。
flatMap()
将每个元素映射到一个新的 Publisher,并将其扁平化成一个新的 Flux
transform()
transformer会立即执行传递给它的转换函数,并生成新的 Flux。
filter()
根据指定条件过滤
take(long n)
获取到指定长度的结果结束
skip(long skipped)
从第几个元素开始获取
distinct()
去重,去除重复元素
contextWrite()
在整个响应式链中传递上下文信息
merge()
合并多个flux,按照源里每个元素的写入的时间排序,不会保证顺序
zip()
压缩多个flux,按照下标把每个flux的元素都放入一个数组
combineLatest()
把多个flux按照下标把值压缩再一起进行处理
publishOn(Scheduler scheduler)
并发处理,指定后续操作的执行线程池
doOnComplete(Runnable onComplete)
当数据流完成时执行指定的操作,可以用于清理资源、记录日志等。
doOnEach(Consumer<? super Signal<T>> signalConsumer)
每个元素(流的数据和信号)到达的时候触发
onErrorReturn()
吃掉异常,消费者无异常感知,返回一个兜底默认值,并结束flux流
onErrorContinue()
忽略当前异常,仅通知记录,继续推进
retry()
当发生错误时,重新订阅 Flux 流,最多尝试指定次数
buffer()
缓冲指定元素再消费
cache()
缓存数据,把订阅的数据缓存,可以设置缓存大小。当第二个订阅者来获取数据的时候就只能到缓存区去取数据
handle()
自定义流中元素处理规则,内部可以自定义设置处理规则,然后再通过sink.next(),把处理后的数据发送到下一个节点
switchIfEmpty()
如果流数据为空则动态兜底获取数据
2.2.1、变换数据
map()
:将每个元素映射到另一个类型的值。publicvoidmap(){Flux<String> flux =Flux.just("a","b","c","d","e").map(String::toUpperCase).log(); flux.subscribe(System.out::println);}
flatMap()
:将每个元素映射到一个新的 Publisher,并将其扁平化成一个新的 FluxpublicvoidflatMap()throwsIOException{Flux<String> flux =Flux.just("a1","b2","c3","d4","e5").flatMap(v->Flux.just(v.split(""))); flux.subscribe(System.out::println);System.in.read();}
transformDeferred
和
transform
都是用来转换 Flux 数据流的运算符。区别在于 转换逻辑的执行时机
transformDeferred()
运算符会延迟执行传递给它的转换函数,直到下游订阅者订阅时才会执行。每个订阅者都会独立地执行转换逻辑,生成自己的转换结果。适用于复杂、有状态的转换publicvoidtransformDeferred()throwsIOException{AtomicInteger atomicInteger =newAtomicInteger();Flux<String> flux =Flux.just("a","b","c").transformDeferred(v->{//int andIncrement = atomicInteger.getAndIncrement();//把值加一return v.map(it->it+andIncrement);});//transformDeferred中,每个订阅者都会去执行一次transformDeferred,等于每个订阅者都会有自己独立的结果 flux.subscribe(v->System.out.println("订阅者1:"+v)); flux.subscribe(v->System.out.println("订阅者2:"+v));System.in.read();}/* transformDeferred每个订阅者都会独立生成结果订阅者1:a0订阅者1:b0订阅者1:c0订阅者2:a1订阅者2:b1订阅者2:c1
transform()
:transformer会立即执行传递给它的转换函数,并生成新的 Flux。由于转换逻辑立即执行,所有订阅者都会共享同一个转换结果。适用于简单、无状态的转换publicvoidtransform()throwsIOException{AtomicInteger atomicInteger =newAtomicInteger();Flux<String> flux =Flux.just("a","b","c").transform(v->{int andIncrement = atomicInteger.getAndIncrement();return v.map(it->it+andIncrement);});//不管多少个订阅者都会共享transform这一个转换的结果,等于transform只会执行一次 flux.subscribe(v->System.out.println("订阅者1:"+v)); flux.subscribe(v->System.out.println("订阅者2:"+v));System.in.read();}/*transform所有订阅者都会共享同一个转换结果订阅者1:a0订阅者1:b0订阅者1:c0订阅者2:a0订阅者2:b0订阅者2:c0
filter()
:根据指定条件过滤publicvoidfilter()throwsIOException{Flux<String> flux =Flux.just("a","b","c").filter(v-> v.equals("b")); flux.subscribe(System.out::println);System.in.read();}/*生成结果b
take(long n)
:获取到指定长度的结果结束publicvoidtake()throwsIOException{Flux<String> flux =Flux.just("a","b","c","d","e").take(3); flux.subscribe(System.out::println);System.in.read();}/*生成结果abc
skip(long skipped)
:从第几个元素开始获取publicvoidskip()throwsIOException{Flux<String> flux =Flux.just("a","b","c","d","e").skip(3); flux.subscribe(System.out::println);System.in.read();}/*生成结果de
distinct()
:去重,去除重复元素publicvoiddistinct()throwsIOException{Flux<String> flux =Flux.just("a","a","c","a","e").distinct(); flux.subscribe(System.out::println);System.in.read();}/*ace
2.2.2、Context API
在Flux响应式编程中,Context API用于在整个响应式链中传递上下文信息,例如订阅者信息、调度器、钩子等。它类似于线程本地存储,但作用于响应式流。
可以使用Context API添加钩子函数,例如在响应式流的每个元素处理前后执行特定操作。
用法:
transformDeferredContextual()
:两个参数一个是当前的flux对象,一个是Context上下文对象contextWrite()
:一个参数,就是Context对象Context
:通过of方法把数据通过key,value的方式写入到flux中 -of(Object key1, Object value1, Object key2, Object value2)
:如果有多组参数要传入,通过of的(k, v, k, v)的方式这样写publicvoidcontext()throwsIOException{Flux<String> fluxs =Flux.just(1,2,3).transformDeferredContextual((flux, context)->{//它会在每个元素处理之前,提供当前的 Flux 和下面添加的上下文context对象信息。System.out.println("flux:"+ flux);System.out.println("context:"+ context);return flux.map(i -> i +"====>"+ context.get("key1"));//对每个元素进行处理,将元素与上下文中的 "key" 值拼接在一起。}).map(String::toUpperCase);// 可以在后面把context通过contextWrite添加到上下文中Context context =Context.of("key1","zhangsan","key2","list"); fluxs.contextWrite(context)//设置上下文中的 "key" 为 "zhangsan"。.subscribe(v->System.out.println("v = "+ v));System.in.read();}/* 返回的结果flux:FluxArray 当前fluxcontext:Context2{key1=zhangsan, key2=list} Context对象,几组k,v,名字就是几v = 1====>ZHANGSANv = 2====>ZHANGSANv = 3====>ZHANGSAN
2.2.3、合并组合Flux
merge()
:合并多个flux,按照源里每个元素的写入的时间排序,不会保证顺序publicvoidmerge()throwsInterruptedException{Flux<String> flux =Flux.just("a","b");Mono<String> mono =Mono.just("c");Flux.merge(mono,flux).subscribe(System.out::println);Thread.sleep(5000);}/* 输出结果,可能会出现a、c、babc
concat()
:合并多个flux,会严格保证元素的顺序。publicvoidconcat()throwsInterruptedException{Flux<String> flux1 =Flux.just("a","b");Flux<String> flux2 =Flux.just("c","d");Flux.concat(flux1,flux2).subscribe(System.out::println);Thread.sleep(5000);}/* 输出结果,一定会说先输出完flux1,再到flux2abcd
zip()
:压缩多个flux,按照下标把每个flux的元素都放入一个数组,如果多个flux的长度不一样,以最短的为长度publicvoidzip()throwsIOException{Flux<String> flux1 =Flux.just("a","b","c");Flux<Integer> flux2 =Flux.just(1,2,3,4);Flux<String> flux3 =Flux.just("A","B","C","D","E");Flux.zip(flux1, flux2, flux3).map(Tuple2::toString).subscribe(System.out::println);System.in.read();}/* 输出结果。会保证每个数组的长度都一样[a,1,A][b,2,B][c,3,C]
zipWith
:压缩拼接,作用同上,只不过是只能一个一个的压缩。publicvoidzipWith()throwsIOException{Flux<String> flux1 =Flux.just("a","b","c");Flux.just(1,2,3,4).zipWith(flux1).subscribe(System.out::println);System.in.read();}/* 输出结果。[1,a][2,b][3,c]
combineLatest()
:把多个flux按照下标把值压缩再一起进行处理publicvoidcombineLatest()throwsIOException{Flux<String> flux1 =Flux.just("a","b","c");Flux<Integer> flux2 =Flux.just(1,2,3,4);Flux<String> flux3 =Flux.just("A","B","C");// 两个的写法// Flux.combineLatest(flux1, flux2, (f1,f2)->f1+"-"+f2)// .map(String::toUpperCase)// .subscribe(System.out::println);// 多个的写法Iterable<Publisher<?>> publishers =Arrays.asList(flux1, flux2,flux3);Flux.combineLatest(publishers,fs->fs[0]+"-"+fs[1]+"-"+fs[2]).map(String::toUpperCase).subscribe(System.out::println);System.in.read();}/* 获取按照下标拼接后的值C-4-AC-4-BC-4-C
2.2.3、并发控制Flux
Scheduler:调度器
Reactor 中,Schedulers 提供了多种指定线程的方式,可以根据不同的场景选择合适的 Scheduler 来执行任务。下面是常用的Scheduler
Schedulers.immediate()
:默认,无执行上下文,当前线程运行所有操作Schedulers.single()
:使用固定的单线程Schedulers.boundedElastic()
:使用一个有界弹性线程池执行任务。最大线程数为 CPU 核心数 * 10。Schedulers.parallel()
:使用一个固定大小的线程池执行任务,线程池的大小默认为 CPU 核心数Schedulers.fromExecutor(Executor executor)
:使用自定义的 Executor 执行任务Schedulers.newParallel(String name, int parallelism)
:也可以通过new的方式自定义Scheduler ,直接指定线程池大小,作用同上
parallel(int parallelism)
:并行处理,将 Flux 的数据流分成多个并行的轨道runOn()
:指定后续操作的执行线程,一般会配合parallel()
一起使用,使用parallel控制并发数,再通过runOn绑定线程池publicvoidparallel()throwsIOException{Flux.just("a","b","c","d","e").log().parallel(4)//指定并行数.runOn(Schedulers.newParallel("yy"))//指定线程名称.map(String::toUpperCase).log().subscribe();System.in.read();}/* 通过log查看使用runOn前后线程,可以看到Map操作都是使用的runOn指定的yy名称的线程执行的 [ main] reactor.Flux.Array.1 : | onNext(a) [ main] reactor.Flux.Array.1 : | onNext(b) [ yy-2] reactor.Parallel.Map.2 : onNext(A) [ main] reactor.Flux.Array.1 : | onNext(c) [ main] reactor.Flux.Array.1 : | onNext(d) [ yy-3] reactor.Parallel.Map.2 : onNext(B) [ yy-5] reactor.Parallel.Map.2 : onNext(D) [ main] reactor.Flux.Array.1 : | onNext(e) [ yy-4] reactor.Parallel.Map.2 : onNext(C) [ yy-2] reactor.Parallel.Map.2 : onNext(E)
publishOn(Scheduler scheduler)
:指定下游操作执行的线程,作用差不多相当于上面两个的组合。不会影响上游操作的执行线程,只影响其后的操作符publicvoidpublishOn()throwsInterruptedException{//自定义线程池Scheduler scheduler =Schedulers.fromExecutor(newThreadPoolExecutor(4,8,60,TimeUnit.SECONDS,newLinkedBlockingQueue<>(100),(r)->{Thread thread =newThread(r); thread.setName("yy:"+ thread.getName());return thread;}));Flux.just("a","b","c","d","e").log().publishOn(scheduler)//指定.map(String::toUpperCase).log().subscribe();Thread.sleep(1000);}/* 执行结果,因为比较快所以只用了一条线程 [ main] reactor.Flux.Array.1 : | onNext(a) [ main] reactor.Flux.Array.1 : | onNext(b) [ main] reactor.Flux.Array.1 : | onNext(c) [ main] reactor.Flux.Array.1 : | onNext(d) [ main] reactor.Flux.Array.1 : | onNext(e) [ main] reactor.Flux.Array.1 : | onComplete() [ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(A) [ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(B) [ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(C) [ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(D) [ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(E)
subscribeOn(Scheduler scheduler)
:指定订阅发生和上游操作执行的线程publicvoidsubscribeOn()throwsInterruptedException{Scheduler scheduler =Schedulers.fromExecutor(newThreadPoolExecutor(4,8,60,TimeUnit.SECONDS,newLinkedBlockingQueue<>(100),(r)->{Thread thread =newThread(r); thread.setName("yy:"+ thread.getName());return thread;}));Flux.just("a","b","c","d","e").log().subscribeOn(scheduler).map(String::toUpperCase).log().subscribe();Thread.sleep(1000);}/* 使用subscribeOn后,会把上游的操作也全部使用指定的线程 [ yy:Thread-1] reactor.Flux.Array.1 : | onNext(a) [ yy:Thread-1] reactor.Flux.Map.2 : onNext(A) [ yy:Thread-1] reactor.Flux.Array.1 : | onNext(b) [ yy:Thread-1] reactor.Flux.Map.2 : onNext(B) [ yy:Thread-1] reactor.Flux.Array.1 : | onNext(c) [ yy:Thread-1] reactor.Flux.Map.2 : onNext(C) [ yy:Thread-1] reactor.Flux.Array.1 : | onNext(d) [ yy:Thread-1] reactor.Flux.Map.2 : onNext(D) [ yy:Thread-1] reactor.Flux.Array.1 : | onNext(e) [ yy:Thread-1] reactor.Flux.Map.2 : onNext(E)
2.2.4、doOnxxx:感知事件相关的 API
在 Reactor 中,感知事件相关的 API 主要用于处理数据流中的特殊事件,例如数据流的完成、错误以及取消等。这些 API 可以帮助我们更好地控制数据流的行为,并对不同的事件做出相应的处理。
doOnComplete(Runnable onComplete)
:当数据流完成时执行指定的操作,可以用于清理资源、记录日志等。publicvoiddoOnComplete()throwsInterruptedException{Flux<String> flux =Flux.just("a","b","c").map(String::toUpperCase).doOnComplete(()->System.out.println("执行完成!")); flux.subscribe(v->System.out.println("v1:"+v)); flux.subscribe(v->System.out.println("v2:"+v));Thread.sleep(1000);}/* 输出结果,会再消费完成时感知v1:Av1:Bv1:C执行完成!v2:Av2:Bv2:C执行完成!
doOnError(Consumer<? super Throwable> onError)
:当数据流发生错误时执行指定的操作,用作处理异常publicvoiddoOnError()throwsInterruptedException{Flux<String> flux =Flux.just("a","b","c").map(v->{if(v.equals("b")){thrownewRuntimeException("b");}return v;}).doOnError((v)->System.out.println("发生异常==>"+v)); flux.subscribe(v->System.out.println("v1:"+v));Thread.sleep(1000);}/*v1:a发生异常==>java.lang.RuntimeException: b
doOnEach(Consumer<? super Signal<T>> signalConsumer)
:每个元素(流的数据和信号)到达的时候触发doOnNext(Consumer<? super T> onNext)
:每个数据(流的数据)到达的时候触发publicvoiddoOnNext()throwsInterruptedException{Flux<String> flux =Flux.just("a","b","c").map(String::toUpperCase).doOnEach((v)->System.out.println("Each读取到==>"+v)).doOnNext((v)->System.out.println("Next读取到==>"+v)); flux.subscribe(v->System.out.println("v1:"+v));Thread.sleep(1000);}/* doOnNext只能读取到元素,doOnEach可以获取到信号Each读取到==>doOnEach_onNext(A)Next读取到==>Av1:AEach读取到==>doOnEach_onNext(B)Next读取到==>Bv1:BEach读取到==>doOnEach_onNext(C)Next读取到==>Cv1:CEach读取到==>onComplete()
doOnCancel(Runnable onCancel)
:流被取消时触发,如通过take再还没读取完就取消。正常结束不会被触发doFinally(Consumer<SignalType> onFinally)
:流被订阅执行完成终止时触发,包括正常结束和异常结束publicvoiddoOnCancel()throwsInterruptedException{Flux<String> flux =Flux.just("a","b","c").doOnCancel(()->System.out.println("流被取消")).doFinally((v)->System.out.println("执行结束"+v)).take(2); flux.subscribe(v->System.out.println("v1:"+v)); flux.subscribe(v->System.out.println("v2:"+v));Thread.sleep(1000);}/* 正常结束不会有doOnCancel的触发v1:av1:b流被取消执行结束cancelv2:av2:b流被取消执行结束cancel
doOnRequest(LongConsumer consumer)
:流被订阅者请求数据时触发doOnSubscribe(Consumer<? super Subscription> onSubscribe)
:流被订阅开始订阅时触发publicvoiddoOnRequest()throwsInterruptedException{Flux<String> flux =Flux.just("a","b","c").map(String::toUpperCase).doOnRequest((n)->System.out.println("流被请求"+n)).doOnSubscribe((n)->System.out.println("流被订阅"+n)); flux.subscribe(v->System.out.println("v1:"+v)); flux.subscribe(v->System.out.println("v2:"+v));Thread.sleep(1000);}/* 订阅者会需要先订阅在请求数据,所以订阅会被触发在前面流被订阅reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber@60e06f7d流被请求9223372036854775807v1:Av1:Bv1:C流被订阅reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber@59b32539流被请求9223372036854775807v2:Av2:Bv2:C
2.2.4、onErrorXXX:异常处理相关的 API
onErrorReturn()
:吃掉异常,消费者无异常感知,返回一个兜底默认值,并结束flux流publicvoidonErrorReturn()throwsInterruptedException{Flux<String> flux =Flux.just("a","b","c").map(v->{if(v.equals("b"))thrownewRuntimeException("b");elsereturn v;}).onErrorReturn("Error");//发生异常返回兜底 flux.subscribe(v->System.out.println("v1:"+v));Thread.sleep(1000);}/* 会返回兜底的异常输出v1:av1:Error
onErrorResume()
:当发生错误时,使用另一个 Flux 继续执行流程。相当于一个补偿结果publicvoidonErrorResume()throwsInterruptedException{Flux<String> flux =Flux.just("a","b","c").map(v->{if(v.equals("b"))thrownewRuntimeException("b");elsereturn v;}).onErrorResume(e->{System.err.println("发生异常==>"+e);returnFlux.just("D","E");}); flux.subscribe(v->System.out.println("v1:"+v));Thread.sleep(1000);}/* 发生异常后,原始的flux会停止,会去执行onErrorResume内部的fluxv1:a发生异常==>java.lang.RuntimeException: bv1:Dv1:E
onErrorMap()
:捕获并包装成一个业务异常,并重新抛出,消费者有感知publicvoidonErrorMap()throwsInterruptedException{Flux<String> flux =Flux.just("a","b","c").map(v->{if(v.equals("b"))thrownewRuntimeException("b");elsereturn v;}).onErrorMap(RuntimeException.class,e ->newIllegalArgumentException("计算失败:"+e.getMessage())); flux.subscribe(v->System.out.println("v1:"+v));Thread.sleep(1000);}/** 会捕获异常,然后在转换为onErrorMap内部定义的异常返回v1:areactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: 计算失败:bCaused by: java.lang.IllegalArgumentException: 计算失败:b
onErrorContinue()
:忽略当前异常,仅通知记录,继续推进publicvoidonErrorContinue()throwsInterruptedException{Flux<String> flux =Flux.just("a","b","c").map(v->{if(v.equals("b"))thrownewRuntimeException("b");elsereturn v;}).onErrorContinue((e, obj)-> 忽略错误,并打印错误信息和当前元素System.err.println("发生异常: "+ e +", 数据: "+ obj)); flux.subscribe(v->System.out.println("v1:"+v));Thread.sleep(1000);}/**v1:a发生异常: java.lang.RuntimeException: b, 数据: bv1:c
retry()
:当发生错误时,重新订阅 Flux 流,最多尝试指定次数publicvoidretry()throwsInterruptedException{Flux<String> flux =Flux.just("a","b","c").map(v->{if(v.equals("b"))thrownewRuntimeException("b");elsereturn v;}).retry(2);//最多尝试两次 flux.subscribe(v->System.out.println("v1:"+v));Thread.sleep(1000);}/* 发生错误时尝试重新订阅,如果超过设置的次数,则抛出错误v1:av1:av1:areactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: b
2.2.4、其他工具 API
buffer()
:缓冲指定元素再消费,缓冲区:缓冲n个元素: 消费一次最多可以拿到n个元素; 凑满数批量发给消费者。blockFirst()
:阻塞当前线程,直到 Flux 发出其第一个元素 ,然后返回该元素publicvoidblockFirst()throwsInterruptedException{List<String> block =Flux.just("a","b","c","d","e").map(String::toUpperCase).buffer(3)//创建缓冲区,接收到3个元素转为集合在一起返回订阅者.blockFirst();//阻塞当前线程,收到第一个元素就结束阻塞System.out.println(block);Thread.sleep(1000);}/* 首先是通过buffer设置缓冲区,然后把前三个元素一起发生过来到blockFirst,所以接受到的第一个元素就是被buffer转为数组的元素[A, B, C]
bufferUntilChanged()
:作用和buffer一样,也是设置缓冲区,只不过不是固定的长度,而是根据条件分隔publicvoidbufferUntilChanged()throwsInterruptedException{Flux<List<String>> flux =Flux.just("a1","b1","c2","d2","e1").map(String::toUpperCase).bufferUntilChanged(i -> i.contains("2"));//设置分割缓冲区的条件 flux.subscribe(v->System.out.println("v1:"+v));Thread.sleep(1000);}/** 包含2则拆分,然后2结束拆分,不会去跳着拆分v1:[A1, B1]v1:[C2, D2]v1:[E1]
cache()
:缓存数据,把订阅的数据缓存,可以设置缓存大小,当第二个订阅者来获取数据的时候就只能到缓存区去取数据publicvoidcache()throwsIOException{Flux<String> cache =Flux.just("a","b","c","d","e").cache(2); cache.subscribe(v->System.out.println("v1:"+v)); cache.subscribe(v->System.out.println("v2:"+v));System.in.read();}/* 订阅者1直接获取到全部数据,然后把数据缓存到缓冲区,但只要两个大小,所以订阅者2就只拿到了最后放入缓冲区的两个数据v1:av1:bv1:cv1:dv1:ev2:dv2:e
handle()
:自定义流中元素处理规则,内部可以自定义设置处理规则,然后再通过sink.next(),把处理后的数据发送到下一个节点publicvoidhandle()throwsInterruptedException{Flux.just("a","b","c").delayElements(Duration.ofSeconds(1)).handle((v, sink)->{ sink.next("自定义增强:"+v);}).subscribe(System.out::println);Thread.sleep(5000);}/* 获取结果自定义增强:a自定义增强:b自定义增强:c
switchIfEmpty()
:如果为空则动态兜底获取数据publicvoidswitchIfEmpty()throwsInterruptedException{Mono.just("add").switchIfEmpty(Mono.just("xx"))//如果为空,则动态兜底去调取方法.log().subscribe(System.out::println);Thread.sleep(5000);}
defaultIfEmpty()
:如果为空,则调用当前的兜底的默认数据,作用同上类似publicvoiddefaultIfEmpty()throwsInterruptedException{Mono.just("add").defaultIfEmpty("xx")//如果为空,则动态兜底去调取方法.log().subscribe(System.out::println);Thread.sleep(5000);}
2.3、订阅流
常用汇总
方法****作用
collectList()
把flux转为Mono的list。
block()
阻塞当前线程等待完成。
subscribe()
订阅流,订阅的时候可以有多种方式订阅。除了直接订阅的,还可以捕获异常,以及监听结束等。
详情使用
collectList()
:把flux转为Mono的list。这个开发中会很常用publicvoidcollectList()throwsInterruptedException{Mono<List<String>> mono =Flux.just("a","b","c","d","e").collectList(); mono.subscribe(v->System.out.println("v:"+v));Thread.sleep(1000);}/* 结果v:[a, b, c, d, e]
block()
:阻塞当前线程等待完成。一般不建议这样写,这样就丧失了响应式编程的本质了publicvoidblock()throwsInterruptedException{Mono<List<String>> mono =Flux.just("a","b","c","d","e").collectList();List<String> block = mono.block();System.out.println(block);Thread.sleep(1000);}/* 直接阻塞获取到结果[a, b, c, d, e]
subscribe()
:订阅流,订阅的时候可以有多种方式订阅。除了前面用的,直接订阅的,还可以捕获异常,以及监听结束等。publicvoidsubscribe()throwsInterruptedException{Flux<String> flux =Flux.just("a","b","c","d","e").map(String::toUpperCase); flux.subscribe(v->System.out.println("v:"+v), throwable ->System.out.println("异常"+throwable.getMessage()),()->System.out.println("流执行完成!!"));Thread.sleep(1000);}/* 正常结束,如果我异常结束会被捕获到v:Av:Bv:Cv:Dv:E流执行完成!!
BaseSubscriber()
:subscribe还可以通过BaseSubscriber来自定义消费者。publicvoidBaseSubscriber()throwsInterruptedException{Flux<String> flux =Flux.just("a","b","c","d","e").map(String::toUpperCase);// Flux<String> flux2 = Flux.just("a", "b", "c", "d", "e")// .map(v->{// if (v.equals("b")) {throw new RuntimeException("b");}// return v;// }); flux.subscribe(newBaseSubscriber<String>(){@OverrideprotectedvoidhookOnSubscribe(Subscription subscription){System.out.println(Thread.currentThread()+"流开始了:"+subscription);request(1);}@OverrideprotectedvoidhookOnNext(String value){System.out.println(Thread.currentThread()+"开始接收元素:"+value);request(1);//持续接收元素}@OverrideprotectedvoidhookOnComplete(){System.out.println(Thread.currentThread()+"流正常结束");}@OverrideprotectedvoidhookOnError(Throwable throwable){System.out.println(Thread.currentThread()+"流错误结束:"+throwable);}@OverrideprotectedvoidhookOnCancel(){System.out.println(Thread.currentThread()+"流被取消");}@OverrideprotectedvoidhookFinally(SignalType type){System.out.println(Thread.currentThread()+"最终回调");}});Thread.sleep(6000);}/* 到对应的节点时会 触发对应的回调方法Thread[main,5,main]流开始了:reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@782e6b40Thread[main,5,main]开始接收元素:AThread[main,5,main]开始接收元素:BThread[main,5,main]开始接收元素:CThread[main,5,main]开始接收元素:DThread[main,5,main]开始接收元素:EThread[main,5,main]流正常结束Thread[main,5,main]最终回调
版权归原作者 方渐鸿 所有, 如有侵权,请联系我们删除。