Flux 源码分析(一)

这两天因为要解决 springcloud gateway 网关中的一些问题,去学习了一些 Flux、Mono 等知识点,本文记录一下 Flux 源码的简单分析。

Flux 是什么?

官网的解释是:

A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).

– Flux 表示的是包含零到多个元素的异步序列。

– 它可以发出 0 到 N 个元素,并在完成时发出完成信号或错误信号。

– Flux 可以用于表示异步的多个值的序列,比如集合、数组、数据库查询、网络数据等。

– Flux 可以被订阅,当有数据产生时,会将数据推送给订阅者。

Flux 实现了 Publisher 接口,Flux 就是一个 Publisher 。

我们看看 Publisher 接口:

Publisher 接口

Publisher 接口中只有一个 subscribe 方法,subscribe 是一个动词,意思是通过该方法,可以往 Publisher 上面订阅事件(”订阅事件”的意思就是想对 Publisher 中的数据做哪些处理)。

订阅什么事件呢?你自己实现 Subscriber 接口,Subscriber 就是”要订阅的事件逻辑” ,你的逻辑写这里。然后 Publisher 会调用 Subscriber 中的 onSubcribe(Subscription) 方法!其实准确点可以说:Subscriber 中的 onSubcribe(Subscription) 方法就是”要订阅的事件逻辑”,因为事件逻辑就是在 onSubcribe(Subscription) 方法中写的

Subscriber

Subscriber 是 Publisher 接口中 subscribe 方法的入参类型。当你写了:flux0.subscribe(ele -> { System.out.println(ele * 2); }); 后, ele -> { System.out.println(ele * 2); } 会被封装成 Subscriber 对象。所以拿到 Subscriber 对象就可以对数据流进行处理。

Subscription

Subscriber 类中的 onSubcribe(Subscription) 方法的入参是 Subscription ,Subscription又是什么呢?

上面说了,Subscriber 就是”要订阅的事件逻辑” 事件逻辑就是在 onSubcribe(Subscription) 方法中写的,其实再准确点说,“事件的处理逻辑” 是在 Subscription 中实现的

Subscription 就是处理数据的,它为什么能处理数据呢?

因为:(1)Subscription 内部有上面的 1,2,3 数据。(2)Subscription 内部拿到了Subscriber 对象,上面说了拿到 Subscriber 对象就可以对数据流进行处理

Subscription 它的 request 方法会循环处理这些数据,每次处理数据都会调用 Subscriber 对象中的 onNext(..) 方法,而 Subscriber 对象的 onNext(..) 会调用 ele -> { System.out.println(ele * 2); } 处理数据。

所以 Subscription 的 request 方法是一个”时间点”,从此刻开始,处理数据。

Flux、Publisher、Subscriber、Subscription 的关系

Flux 实现了 Publisher 接口。Subscriber 是 Publisher 接口中的 subscribe 方法的入参类型。

当你写了:flux0.subscribe(ele -> { System.out.println(ele * 2); }); 后,.subscribe(ele -> { System.out.println(ele * 2); }) 就是对 Publisher 接口中的 subscribe 方法的实现。

ele -> { System.out.println(ele * 2); } 会被封装成 Subscriber 对象,当执行 flux0.subscribe(ele -> { System.out.println(ele * 2); }); 的时候,就会触发中的 Subscriber 中的 onSubscibe(Subscription) 方法的执行。

举例说明

先从一个最简单的使用 Flux 的 demo 说起:

如上代码。首先 Flux 是一个抽象类,它提供了数据处理的通用方法,FluxArray 是继承自 Flux 的一个类,Flux.just(1, 2, 3)其实就是创建了一个 FluxArray 对象,FluxArray 中有一个数组,用于存放比如上面的 1,2,3 这些数据:

第一步(位于 Flux 类中):创建 Flux 对象:

第二步(位于 Flux 类中):实际创建的是 FluxArray 对象

可以看到上面 1110 行是创建了一个 FluxArray 对象作为参数。

上面 10556 行可以看到返回了在第二步中创建的 FluxArray 对象 。所以上面第一步中创建的对象就是 FluxArray 对象。

第三步(位于 Flux 类中):调用 Flux 的 subscribe(…) 方法:

第四步:

第三步中虽然我们写的是:

但在上面第 8608 行可以看到,这个 Consumer 被包装了 LambdaSubscriber 对象,与此同时,LambdaSubscriber 的构造器中还有其他一些参数比如 errorConsumer 等,但在上面 8475 行传递是 null 。

LambdaSubscriber 对象继承了 Subscriber 接口,Subscriber 接口中有这些方法:

当你调用了 Publisher(如Flux)的 subscribe 方法之后,就会触发 Subscriber 中的 onSubscribe(..) 方法。上面说了我们写的 Consumer 被包装了 LambdaSubscriber 对象,所以第三步调用了 Publisher(如Flux)的 subscribe 方法,就会触发 LambdaSubscriber 对象的 onSubscribe(..) 方法:

而上面 105 行的 Subscription 是什么,其实是 ArraySubscription:

它位于 FluxArray 中是 FluxArray 的一个内部类:

第五步 :

第四步已经说了,调用了 Publisher(如Flux)的 subscribe 方法,就会触发 LambdaSubscriber 对象的 onSubscribe(..) 方法,而 LambdaSubscriber 对象的 onSubscribe(..) 方法的入参是 ArraySubscription , LambdaSubscriber 对象的 onSubscribe(..) 方法中 119 行调用 s.request(..) :

上面 119 行的 s 其实就是 ArraySubscription 的 request 方法(如下图),这个 reqeust 方法会循环处理数据,最终会 LambdaSubscriber 的 Conumer(其实就是我们写的 ele -> { System.out.println(ele * 2); })处理数据:

处理完之后,调用上面 177 行的 onComplete() 方法。

注意,你对 Flux 中数据进行订阅、或者使用 .map(..) 方法进行处理,都不会改变 Flux 中的原始数据,.map(..) 方法会返回一个新的 Flux 对象。

码先生
Author: 码先生

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注