这两天因为要解决 springcloud gateway 网关中的一些问题,去学习了一些 Flux、Mono 等知识点,本文记录一下 Flux 源码的简单分析。
Flux 是什么?
官网的解释是:
A Reactive Streams
Publisher
with rx operators that emits 0 to N elements, and then completes (successfully or with an error).
– Flux 表示的是包含零到多个元素的异步序列。
– 它可以发出 0 到 N 个元素,并在完成时发出完成信号或错误信号。
– Flux 可以用于表示异步的多个值的序列,比如集合、数组、数据库查询、网络数据等。
– Flux 可以被订阅,当有数据产生时,会将数据推送给订阅者。
Flux 实现了 Publisher 接口,Flux 就是一个 Publisher 。
我们看看 Publisher 接口:
Publisher 接口
1 2 3 4 |
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } |
Publisher 接口中只有一个 subscribe 方法,subscribe 是一个动词,意思是通过该方法,可以往 Publisher 上面订阅事件(”订阅事件”的意思就是想对 Publisher 中的数据做哪些处理)。
订阅什么事件呢?你自己实现 Subscriber 接口,Subscriber 就是”要订阅的事件逻辑” ,你的逻辑写这里。然后 Publisher 会调用 Subscriber 中的 onSubcribe(Subscription)
方法!其实准确点可以说:Subscriber 中的 onSubcribe(Subscription) 方法就是”要订阅的事件逻辑”,因为事件逻辑就是在 onSubcribe(Subscription)
方法中写的。
Subscriber
Subscriber 是 Publisher 接口中 subscribe 方法的入参类型。当你写了:flux0.subscribe(ele -> { System.out.println(ele * 2); });
后, ele -> { System.out.println(ele * 2); }
会被封装成 Subscriber 对象。所以拿到 Subscriber 对象就可以对数据流进行处理。
Subscription
Subscriber 类中的 onSubcribe(Subscription) 方法的入参是 Subscription ,Subscription又是什么呢?
上面说了,Subscriber 就是”要订阅的事件逻辑” ,事件逻辑就是在 onSubcribe(Subscription)
方法中写的,其实再准确点说,“事件的处理逻辑” 是在 Subscription
中实现的。
Subscription 就是处理数据的,它为什么能处理数据呢?
因为:(1)Subscription 内部有上面的 1,2,3 数据。(2)Subscription 内部拿到了Subscriber 对象,上面说了拿到 Subscriber 对象就可以对数据流进行处理。
Subscription 它的 request 方法会循环处理这些数据,每次处理数据都会调用 Subscriber 对象中的 onNext(..) 方法,而 Subscriber 对象的 onNext(..) 会调用 ele -> { System.out.println(ele * 2); }
处理数据。
所以 Subscription 的 request 方法是一个”时间点”,从此刻开始,处理数据。
Flux、Publisher、Subscriber、Subscription 的关系
Flux 实现了 Publisher 接口。Subscriber 是 Publisher 接口中的 subscribe 方法的入参类型。
当你写了:flux0.subscribe(ele -> { System.out.println(ele * 2); });
后,.subscribe(ele -> { System.out.println(ele * 2); })
就是对 Publisher 接口中的 subscribe 方法的实现。
ele -> { System.out.println(ele * 2); } 会被封装成 Subscriber 对象,当执行 flux0.subscribe(ele -> { System.out.println(ele * 2); });
的时候,就会触发中的 Subscriber 中的 onSubscibe(Subscription)
方法的执行。
举例说明
先从一个最简单的使用 Flux 的 demo 说起:
1 2 3 4 |
Flux<Integer> flux0 = Flux.just(1, 2, 3); flux0.subscribe(ele -> { System.out.println(ele * 2); });// 这个 Consumer 会被包装成 Publisher 接口中 subscribe 方法的入参 Subscriber |
如上代码。首先 Flux 是一个抽象类,它提供了数据处理的通用方法,FluxArray 是继承自 Flux 的一个类,Flux.just(1, 2, 3)
其实就是创建了一个 FluxArray 对象,FluxArray 中有一个数组,用于存放比如上面的 1,2,3 这些数据:
第一步(位于 Flux 类中):创建 Flux 对象:
第二步(位于 Flux 类中):实际创建的是 FluxArray 对象
可以看到上面 1110 行是创建了一个 FluxArray 对象作为参数。
上面 10556 行可以看到返回了在第二步中创建的 FluxArray 对象 。所以上面第一步中创建的对象就是 FluxArray 对象。
第三步(位于 Flux 类中):调用 Flux 的 subscribe(…) 方法:
第四步:
第三步中虽然我们写的是:
1 2 3 |
ele -> { System.out.println(ele * 2); } |
但在上面第 8608 行可以看到,这个 Consumer 被包装了 LambdaSubscriber 对象,与此同时,LambdaSubscriber 的构造器中还有其他一些参数比如 errorConsumer 等,但在上面 8475 行传递是 null 。
LambdaSubscriber 对象继承了 Subscriber 接口,Subscriber 接口中有这些方法:
当你调用了 Publisher(如Flux)的 subscribe 方法之后,就会触发 Subscriber 中的 onSubscribe(..) 方法。上面说了我们写的 Consumer 被包装了 LambdaSubscriber 对象,所以第三步调用了 Publisher(如Flux)的 subscribe 方法,就会触发 LambdaSubscriber 对象的 onSubscribe(..) 方法:
而上面 105 行的 Subscription 是什么,其实是 ArraySubscription:
它位于 FluxArray 中是 FluxArray 的一个内部类:
第五步 :
第四步已经说了,调用了 Publisher(如Flux)的 subscribe 方法,就会触发 LambdaSubscriber 对象的 onSubscribe(..) 方法,而 LambdaSubscriber 对象的 onSubscribe(..) 方法的入参是 ArraySubscription , LambdaSubscriber 对象的 onSubscribe(..) 方法中 119 行调用 s.request(..) :
上面 119 行的 s 其实就是 ArraySubscription 的 request 方法(如下图),这个 reqeust 方法会循环处理数据,最终会 LambdaSubscriber 的 Conumer(其实就是我们写的 ele -> { System.out.println(ele * 2); }
)处理数据:
处理完之后,调用上面 177 行的 onComplete() 方法。
注意,你对 Flux 中数据进行订阅、或者使用 .map(..) 方法进行处理,都不会改变 Flux 中的原始数据,.map(..) 方法会返回一个新的 Flux 对象。