本文主要分析一下 Flux 的 flatMap 方法的底层源码。
flatMap 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。
测试代码如下:

上面第 14 行创建了一个 FluxArray 对象,即 flux000 指向的是一个 FluxArray 对象,这在上一篇文章里已经说过了。
上面第 15 行的 flux000.flatMap(..) 创建的是一个 FluxFlatMap 对象,其源码如下:

上面 10590 行创建了一个 FluxFlatMap 对象,第一个参数是 this ,代表 图1 中的 flux000 对象,也就是说创建的 FluxFlatMap 对象内部持有一个指向 flux000 对象的引用 source(重点)。第二个参数是 mapper ,这个指的是下图高亮的这段 Lambda 表达式:

看 图1 中的 19 行,fluxMP.subscribe(…) ,已知 fluxMP 是一个FluxFlatMap 对象,但 fluxMP 这个引用是 Flux 类型的,所以我们还是要看 Flux 中的 .subscribe(…) 方法:

点进去上面的 8475 行:

上面 8604 行第一个参数 consumer 是下图高亮的 Lambda 表达式:

而上面 图H 中的 LambdaSubscriber 的处理逻辑我们在上一篇文章已经讲过了。我们继续点击 图H 第 8608 行的 subscribeWith(…) 方法:

注意上 图J 中的第 8628 行代码,这行代码创建了一个 FluxMapMain 对象,点进去是这样的:

重点是要知道 FluxMapMain 中有 图1 中的两个 Lambda 表达式,后面会用到。
继续跟踪上面 图J 中 8642 行代码:

注意上 图K 中的 s 是 FluxMapMain 对象,上面第 53 行创建 ArraySubscription 对象时把引用 s 也传了进去。点进去上 图K 中第 53 行:

注意上 图M 中的 onSubscribe(…) 方法是在 FluxMapMain 类中的,FluxMapMain 类是 FluxFlatMap 的一个内部类。
通过 图K 可知,图M 中的 s 是 ArraySubscription 对象,actual 是 LambdaSubscriber 对象。
图M 中 370 行,调用的是 LambdaSubscriber 对象 onSubscribe(this) 方法,此处的 this 是指 FluxMapMain 对象,点进去 370 行:

上图 119 行调用了 FluxMapMain 的 request(…) 方法【如下,没看动,有点复杂】:

上面代码有点复杂,暂时略过。
继续看 图M 中的 371 行 s.request(…) 方法调用的是 ArraySubscription 对象的 request(…) 方法。图M 中的 s 是 ArraySubscription 对象

注意上面 119行 这是一个循环,会对 图1 中的数据 1,2,3 进行循环处理,127行的 s 是 FlatMapMain 即会调 FlatMapMain 的 onNext(..) 方法:

。
更直观的分析 flatMap 的异步现象:
如上图代码,可以看到第 22 行在处理 Flux 中的每个元素时都延迟了 5 秒,但是第 25 行和 28 行的输出立刻就输出了,说明 flatMap 处理 Flux 中的每个元素是异步的。代码中第 22 行创建了三个 Mono 对象这三个操作是并发执行的,等这三个 Mono 对象都生成了再把他们扁平(flat)成一个 Flux。