最近在学响应式编程,这里先记录下,响应式编程的一些基础内容
1.名词解释
Reactive Streams、Reactor、WebFlux以及响应式编程之间存在密切的关系,它们共同构成了在Java生态系统中处理异步和响应式编程的一系列工具和框架。
- Reactive Streams:- Reactive Streams 是一个规范,定义了一组接口和协议,用于处理异步数据流的背压。它包括发布者(Publisher)、订阅者(Subscriber)、订阅(Subscription)和处理器(Processor)等接口。- Reactive Streams 规范的目标是提供一种标准的方式来处理异步数据流,解决背压问题。Java标准库从Java 9开始提供了
java.util.concurrent.Flow
类,定义了Reactive Streams规范。 - Reactor:- Reactor 是一个基于Reactive Streams规范的响应式编程框架。它提供了一组用于构建异步、事件驱动、响应式应用程序的工具和库。Reactor 的核心是 Flux(表示一个包含零到多个元素的异步序列)和 Mono(表示一个包含零或一个元素的异步序列)。- Reactor 通过提供响应式的操作符,如
map
、filter
、flatMap
等,使得开发者能够方便地进行数据流的转换和处理。 - WebFlux:- WebFlux 是Spring Framework 5引入的响应式编程支持。它构建在 Reactor 之上,提供了一套用于构建异步、非阻塞、响应式的Web应用程序的API。WebFlux支持使用Reactive Streams处理HTTP请求和响应。- Spring WebFlux 可以用于构建反应式的RESTful服务,支持使用注解的方式定义路由和处理器函数。
- 响应式编程:- 响应式编程是一种编程范式,强调数据流和变化的传播。在这个范式中,数据源产生数据并通知观察者,观察者相应地处理这些数据。这种方式更容易处理异步操作和事件。- 在Java中,响应式编程通常涉及到使用类似于Reactor或RxJava的库,这些库提供了响应式的操作符和工具。
综上所述,Reactive Streams 提供了规范,Reactor 是一个实现了该规范的响应式编程框架,而WebFlux是Spring对于响应式编程的支持。它们共同致力于构建异步、非阻塞、响应式的应用程序。响应式编程则是一种更广义的编程范式,与Reactive Streams和Reactor等具体实现密切相关。
2.Reactive Streams 规范
2.1.Reactive Streams规范定义
在
java.util.concurrent.Flow
类中,定义了Reactive Streams规范
- Publisher(发布者):负责生成数据流,并向订阅者发送数据。
- Subscriber(订阅者):表示数据流的消费者,它订阅一个或多个发布者,并接收数据。
- Subscription(订阅):表示订阅关系的接口,用于控制数据流的请求和取消。
- Processor(处理器):充当发布者和订阅者的中间组件,可以对数据进行转换和处理。
2.2.API方法
1. Publisher(发布者):
interfacePublisher<T>{voidsubscribe(Subscriber<?superT> subscriber);}
subscribe(Subscriber<? super T> subscriber)
: 用于订阅数据流。当订阅者调用这个方法时,发布者将建立与订阅者的订阅关系,并开始推送数据。
2. Subscriber(订阅者):
interfaceSubscriber<T>{voidonSubscribe(Subscription subscription);voidonNext(T item);voidonError(Throwable throwable);voidonComplete();}
onSubscribe(Subscription subscription)
: 在订阅关系建立时调用。通过这个方法,订阅者可以持有Subscription
对象,以便后续请求数据和取消订阅。onNext(T item)
: 在接收到新元素时调用。订阅者通过这个方法处理收到的数据。onError(Throwable throwable)
: 在数据流中出现错误时调用。订阅者通过这个方法处理错误情况。onComplete()
: 在数据流完成时调用。通知订阅者数据流结束,不再有新的元素。
3. Subscription(订阅):
interfaceSubscription{voidrequest(long n);voidcancel();}
request(long n)
: 用于请求订阅者处理指定数量的元素。订阅者通过这个方法告知发布者它可以处理多少个元素。cancel()
: 用于取消订阅关系。当订阅者不再需要接收数据时,调用此方法取消订阅。
4. Processor(处理器):
interfaceProcessor<T,R>extendsSubscriber<T>,Publisher<R>{}
Processor
接口是
Subscriber
和
Publisher
的组合,表示一个中间处理组件,可以同时充当订阅者和发布者的角色。
Subscriber
部分的方法:onSubscribe(Subscription subscription)
,onNext(T item)
,onError(Throwable throwable)
,onComplete()
。Publisher
部分的方法:subscribe(Subscriber<? super R> subscriber)
。表示Processor
可以被其他订阅者订阅。
5.泛型T
泛型T即为数据流
这些方法共同构成 Reactive Streams 协议,定义了发布者和订阅者之间的协作方式,以及订阅者如何处理数据流。在实际的使用中,这些方法的实现通常需要考虑异步处理、背压机制等方面,以确保响应式编程的目标得以实现。
2.3.工作流程
在 Reactive Streams 中,
Publisher
、
Subscriber
、
Subscription
和
Processor
之间的协作流程如下:
有时间再补流程图
- Publisher(发布者):-
Publisher
是异步产生数据流的组件,它通过subscribe
方法允许订阅者订阅。subscribe
方法会接收一个Subscriber
对象作为参数。- 当Publisher
有新数据准备好时,通过调用订阅者的onNext
方法将数据推送给订阅者。interfacePublisher<T>{voidsubscribe(Subscriber<?superT> subscriber);}
- Subscriber(订阅者):-
Subscriber
是数据流的消费者,通过实现Subscriber
接口来接收来自发布者的数据。订阅者通过调用subscription.request(n)
请求一定数量的数据,处理数据时通过onNext
方法接收元素。- 当订阅者无法处理更多的元素时,可以调用subscription.cancel()
来取消订阅。interfaceSubscriber<T>{voidonSubscribe(Subscription subscription);voidonNext(T item);voidonError(Throwable throwable);voidonComplete();}
- Subscription(订阅):-
Subscription
表示订阅关系,它在onSubscribe
方法中被传递给订阅者。通过Subscription
,订阅者可以请求数据和取消订阅。- 订阅者通过request(long n)
方法请求处理 n 个元素,通过cancel()
方法取消订阅。interfaceSubscription{voidrequest(long n);voidcancel();}
- Processor(处理器):-
Processor
是一个同时实现了Publisher
和Subscriber
接口的中间组件,可以作为数据流的处理器,对数据进行转换和处理。-Processor
既能接收数据,也能发布数据。它将onNext
、onError
和onComplete
方法委托给下游的订阅者,并将数据推送给上游的发布者。interfaceProcessor<T,R>extendsSubscriber<T>,Publisher<R>{}
这些接口一起构成了 Reactive Streams 的基本协议。发布者产生数据,订阅者订阅数据流并通过
onNext
方法接收元素,订阅者通过
request
方法请求处理一定数量的元素,同时可以通过
cancel
方法取消订阅。
Processor
则可以用于在订阅者和发布者之间进行数据转换和处理。在 Reactive Streams 的实现中,这些接口的方法调用是异步进行的,以支持非阻塞的数据流处理。
3.自定义实现Reactive Streams规范
自己实现了一个,参考了
SubmissionPublisher
- 同步实现的
- 功能不完善
- 有bug
classMyPublisherimplementsFlow.Publisher<String>{MySubscription<String> subscription;publicint request ;publicvoidpublish(String item){
subscription.items.add(item);while(true){if(request >0){for(int i =0; i < request; i++){if(!subscription.items.isEmpty()){try{Object o = subscription.items.get(subscription.items.size()-1);
subscription.subscriber.onNext(o.toString());
subscription.items.remove(o);}catch(Exception e){
subscription.subscriber.onError(e);return;}}}}if(subscription.items.isEmpty()){break;}}}@Overridepublicvoidsubscribe(Flow.Subscriber<?superString> subscriber){System.out.println("第一步:绑定订阅者");MySubscription<String> subscription =newMySubscription<>(subscriber,this);this.subscription = subscription;
subscriber.onSubscribe(subscription);}}classMySubscriberimplementsFlow.Subscriber<String>{privateFlow.Subscription subscription;@OverridepublicvoidonSubscribe(Flow.Subscription subscription){System.out.println("第二步:接收Subscription");this.subscription = subscription;// 请求订阅者处理的元素数量
subscription.request(1);}@OverridepublicvoidonNext(String item){System.out.println("第四步:推送数据");System.out.println("MySubscriber 消费了item = "+ item);
subscription.request(1);}@OverridepublicvoidonError(Throwable throwable){System.out.println("出异常了 = "+ throwable);}@OverridepublicvoidonComplete(){}}classMySubscription<T>implementsFlow.Subscription{finalFlow.Subscriber<?superT> subscriber;finalMyPublisher publisher;List items =newArrayList();publicMySubscription(Flow.Subscriber<?superT> subscriber,MyPublisher publisher){this.subscriber = subscriber;this.publisher = publisher;}@Overridepublicvoidrequest(long n){this.publisher.request++;System.out.println("第三步:拉取请求");}@Overridepublicvoidcancel(){}}publicclassFlowDemo{publicstaticvoidmain(String[] args){MyPublisher myPublisher =newMyPublisher();MySubscriber mySubscriber =newMySubscriber();
myPublisher.subscribe(mySubscriber);
myPublisher.publish("111");
myPublisher.publish("222");
myPublisher.publish(null);}}
4.Jdk实现Reactive Streams使用示例
classSimplePublisherimplementsFlow.Publisher<Integer>{privatefinalSubmissionPublisher<Integer> publisher =newSubmissionPublisher<>();publicvoidpublishItems(){for(int i =1; i <=5; i++){
publisher.submit(i);}// 发布者完成发布
publisher.close();}@Overridepublicvoidsubscribe(Flow.Subscriber<?superInteger> subscriber){
publisher.subscribe(subscriber);}}classSimpleSubscriberimplementsFlow.Subscriber<Integer>{privateFlow.Subscription subscription;@OverridepublicvoidonSubscribe(Flow.Subscription subscription){this.subscription = subscription;// 请求订阅者处理的元素数量
subscription.request(1);}@OverridepublicvoidonNext(Integer item){System.out.println("Received item: "+ item);// 处理完一个元素后请求下一个
subscription.request(1);}@OverridepublicvoidonError(Throwable throwable){System.err.println("Error occurred: "+ throwable.getMessage());}@OverridepublicvoidonComplete(){System.out.println("Processing completed.");}}publicclassReactiveStreamsExample{publicstaticvoidmain(String[] args)throwsInterruptedException{// 创建发布者和订阅者SimplePublisher simplePublisher =newSimplePublisher();SimpleSubscriber simpleSubscriber =newSimpleSubscriber();// 订阅者订阅发布者
simplePublisher.subscribe(simpleSubscriber);// 发布者发布数据
simplePublisher.publishItems();// 睡一觉,确保数据处理完成Thread.sleep(3000);}}
学习打卡:Java学习笔记-day05-响应式编程初探-自定义实现Reactive Streams规范
版权归原作者 摸魚散人 所有, 如有侵权,请联系我们删除。