flatMap
Flux、Mono 里面都有 flatMap 方法。flatMap 方法是干什么的呢?我的理解是:flatMap 对 Flux、Mono 里面的元素逐个处理转换成另一个 Publisher,然后再将这些 publishers 合并成一个新的 Flux、Mono 对象返回。
flatMap function in Flux/Mono is used to transform each element emitted by a Publisher into another Publisher, and then flatten these inner publishers into a single Flux/Mono。
但是一个很重要的知识点是,flatMap 的转换,需要 subscribe(…) 触发,也就是说如果没有 subscribe(…) 触发,flatMap 的转换就不会进行。
Flux 的 flatMap 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public static void test2() { Flux<Integer> flux000 = Flux.just(1, 2, 3); Flux<Integer> flux000New = flux000.flatMap(ele -> { ele = ele * 2; return Flux.just(ele); // 返回的还是 Flux }); flux000.subscribe(ele -> { System.out.println("========:" + ele);// 还是输出100 ,因为 .flatMap 并不会改变原来Mono中的值 }); flux000New.subscribe(ele -> { System.out.println("========:" + ele);// 输出200 }); System.out.println("===========结束============="); } |
Mono 的 flatMap 方法:
1 |
public static void test3() {<br> Mono<Integer> mono1 = Mono.<em>just</em>(100);<br> Mono<Integer> monoNew = mono1.flatMap(ele -> {<br> ele = ele * 2;<br> return Mono.<em>just</em>(ele); // 返回的还是 Mono<br> });<br> mono1.subscribe(ele -> {<br> System.<em>out</em>.println("========:" + ele);<strong><em>// 还是输出100 ,因为 .flatMap 并不会改变原来Mono中的值<br> </em></strong>});<br> monoNew.subscribe(ele -> {<br> System.<em>out</em>.println("========:" + ele);<strong><em>// 输出200<br> </em></strong>});<br>} |
一个很关键的问题是: flatMap 操作是异步的吗?
网上每个答案都告诉你,flatMap 是异步的,但是在我的学习测试过程中,并没有测出来 flatMap 的异步现象,比如上面的 test2() 方法,我在最后一步输出那里打断点,则前面的代码没有执行完是不会走到走后一步的,虽然这种测试异步的方式不严谨,但我一次也遇到过例外情况。
flatMapMany
flatMapMany 是 Mono 里面的方法,Flux 里面没有。
flatMapMany
in Reactor is a powerful operator that transforms each item emitted by a Mono
into a Publisher
, and then flattens these publishers into a single Flux
. This can be particularly useful for transforming data in a reactive stream or chaining asynchronous operations.
flatMapMany
是将 Mono 发射的每个元素转换成一个 Publisher ,再将这个 Publisher 合并一个 Flux。
Here are some examples to demonstrate how to use flatMapMany
.
Example 1: Simple flatMapMany
Usage
In this example, we use flatMapMany
to transform a Mono
of a list of items into a Flux
of individual items.
flatMapMany
可以将一个 Mono<List<E>> 转换成只包含一个元素的 Flux<E> 。
1 2 3 4 5 6 7 |
public static void main(String[] args) { Mono<List<String>> monoList = Mono.just(Arrays.asList("apple", "banana", "cherry")); Flux<String> flux = monoList.flatMapMany(Flux::fromIterable); flux.subscribe(System.out::println); } |
Example 2: flatMapMany
with Asynchronous Operations
In this example, we use flatMapMany
to transform a Mono
into a Flux
of items that each undergo an asynchronous operation.
flatMapMany
可以
1 2 3 4 5 6 7 8 |
public static void main1(String[] args) { Mono<String> mono = Mono.just("Reactive Programming"); Flux<String> flux = mono.flatMapMany(value -> Flux.fromArray(value.split(" ")) .delayElements(Duration.ofMillis(500))); // Simulating async operation flux.subscribe(System.out::println); } |
Example 3: flatMapMany
for Network Calls
In this example, flatMapMany
is used to perform multiple network calls for each item in a Mono
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public static void main2(String[] args) { List<String> urls = Arrays.asList("http://example.com/1", "http://example.com/2", "http://example.com/3"); Mono<List<String>> monoUrls = Mono.just(urls); Flux<String> fluxResponses = monoUrls.flatMapMany(urlList -> Flux.fromIterable(urlList) .flatMap(url -> makeNetworkCall(url)) ); fluxResponses.subscribe(System.out::println); } private static Mono<String> makeNetworkCall(String url) { // Simulating network call return Mono.just("Response from " + url).delayElement(Duration.ofMillis(300)); } |
Example 4: Error Handling with flatMapMany
In this example, we demonstrate how to handle errors within flatMapMany
.
1 2 3 4 5 6 7 |
public static void main3(String[] args) { Mono<String> mono = Mono.just("Reactive Programming"); Flux<String> flux = mono.flatMapMany(value -> Flux.fromArray(value.split(" ")) .delayElements(Duration.ofMillis(500))) .onErrorResume(e -> Flux.just("An error occurred: " + e.getMessage())); flux.subscribe(System.out::println); } |
Example 5: Complex Data Processing with flatMapMany
In this example, we use flatMapMany
to process complex data structures.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public static void main4(String[] args) { Map<String, List<String>> data = Map.of( "fruits", Arrays.asList("apple", "banana"), "vegetables", Arrays.asList("carrot", "potato") ); Mono<Map<String, List<String>>> monoData = Mono.just(data); Flux<String> flux = monoData.flatMapMany(map -> Flux.fromIterable(map.entrySet()).flatMap(entry -> Flux.fromIterable(entry.getValue())) ); flux.subscribe(System.out::println); } |
Explanation:
- Example 1: Demonstrates how to convert a
Mono
containing a list into aFlux
of individual items. - Example 2: Demonstrates asynchronous processing of each item in a
Mono
. - Example 3: Demonstrates making network calls for each item in a
Mono
. - Example 4: Demonstrates error handling within
flatMapMany
. - Example 5: Demonstrates processing complex data structures with
flatMapMany
.
These examples should help you understand how to use flatMapMany
effectively in different scenarios.