「Spring Reactive Stack」Spring WebFlux响应式Web框架入门

Spring WebFluxSpring Framework 5.0中引入的以Reactor为基础的响应式编程Web框架。

WebFlux 的异步处理是基于Reactor实现的,是将输入流适配成MonoFlux进行统一处理。

1. 响应式流(Reactive Streams)

  • Reactor 是一个响应式流,它有对应的发布者(Publisher),用两个类来表示:

    • Flux(返回0-n个元素)
    • Mono(返回0或1个元素)
  • Reactor 的订阅者(Subscriber)则是由Spring框架去完成。

  • 响应式流(Reactive Streams) 其实就是一个规范,其特点:

    • 无阻塞;
    • 一个数据流;
    • 可以异步执行;
    • 能够处理背压;
  • 背压(Backpressure) 可以简单理解为 消费决定生产,生产者可以根据消费压力进行动态调节生产速率的机制。

2. 发布者(Publisher)

由于响应流的特点,我们不能再返回一个简单的POJO对象来表示结果了。必须返回一个类似Java中的Future的概念,在有结果可用时通知消费者进行消费响应。

Reactive Stream规范中这种被定义为PublisherPublisher是一个可以提供0-N个序列元素的提供者,并根据其订阅者Subscriber的需求推送元素。一个Publisher可以支持多个订阅者,并可以根据订阅者的逻辑进行推送序列元素。

可以通过下图Excel来理解,1-9行可以看作发布者Publisher提供的元素序列,10-13行的结果计算看作订阅者Subscriber

响应式的一个重要特点:当没有订阅时发布者(Publisher)什么也不做。

FluxMono都是PublisherReactor3实现。Publisher提供了subscribe方法,允许消费者在有结果可用时进行消费。如果没有消费者Publisher不会做任何事情,他根据消费情况进行响应。Publisher可能返回零或者多个,甚至可能是无限的,为了更加清晰表示期待的结果就引入了两个实现模型MonoFlux

WebFlux中,你的方法只需返回Mono或Flux即可。你的代码基本也只和MonoFlux打交道。而WebFlux则会实现SubscriberonNext时将业务开发人员编写的MonoFlux转换为HTTP Response返回给客户端。

3. Mono和Flux的抽象模型

Mono和Flux都是Publisher(发布者)的实现模型。

3.1 Flux

Flux是一个发出(emit)0-N个元素组成的异步序列的Publisher<T>,可以被onComplete信号或者onError信号所终止。
在响应流规范中存在三种给下游消费者调用的方法 onNext, onComplete, 和onError
下面这张图表示了Flux的抽象模型:

3.2 Mono

Mono是一个发出(emit)0-1个元素的Publisher<T>,可以被onComplete信号或者onError信号所终止。
下面这张图表示了Mono的抽象模型(整体和Flux差不多,只不过这里只会发出0-1个元素):

4. Mono API

MonoFlux都是实现org.reactivestreams.Publisher接口的抽象类。

Mono代表0-1个元素的发布者(Publisher)。

  • Mono里面有很多API
    • **just()**:可以指定序列中包含的全部元素。创建出来的 Mono序列在发布这些元素之后会自动结束。
    • **empty()**:创建一个不包含任何元素,只发布结束消息的序列。
    • **justOrEmpty(Optional<? extends T> data)**:从一个Optional对象或可能为null的对象中创建Mono。只有Optional对象中包含值或对象不为null时,Mono序列才产生对应的元素。
    • **error(Throwable error)**:创建一个只包含错误消息的序列。
    • **never()**:创建一个不包含任何消息通知的序列。
    • **delay(Duration duration)delayMillis(long duration)**:创建一个Mono序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
    • **fromCallable()fromCompletionStage()fromFuture()fromRunnable()fromSupplier()**:分别从 CallableCompletionStageCompletableFutureRunnableSupplier中创建Mono
    • **ignoreElements(Publisher source)**:创建一个Mono序列,忽略作为源的Publisher中的所有元素,只产生结束消息。
    • **create()**:通过create()方法来使用MonoSink来创建Mono
  • API使用案例如下所示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Slf4j
@SpringBootTest
public class MonoTest {
@Test
public void mono() {
// 通过just直接赋值
Mono.just("my name is charles").subscribe(log::info);
// empty 创建空mono
Mono.empty().subscribe();
// ustOrEmpty 只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
Mono.justOrEmpty(null).subscribe(System.out::println);
Mono.justOrEmpty("测试justOrEmpty").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("测试justOrEmpty")).subscribe(System.out::println);
// error 创建一个只包含错误消息的序列。
Mono.error(new RuntimeException("error")).subscribe(System.out::println, System.err::println);
// never 创建一个不包含任何消息通知的序列。
Mono.never().subscribe(System.out::println);
// 延迟生成0
Mono.delay(Duration.ofMillis(2)).map(String::valueOf).subscribe(log::info);
// 通过fromRunnable创建,并实现异常处理
Mono.fromRunnable(() -> {
System.out.println("thread run");
throw new RuntimeException("thread run error");
}).subscribe(System.out::println, System.err::println);
// 通过Callable
Mono.fromCallable(() -> "callback function").subscribe(log::info);
// future
Mono.fromFuture(CompletableFuture.completedFuture("from future")).subscribe(log::info);
// 通过runnable
Mono<Void> runnableMono = Mono.fromRunnable(() -> log.warn(Thread.currentThread().getName()));
runnableMono.subscribe();
// 通过使用 Supplier
Mono.fromSupplier(() -> new Date().toString()).subscribe(log::info);
// flux中
Mono.from(Flux.just("from", "flux")).subscribe(log::info); // 只返回flux第一个
//通过 create()方法来使用 MonoSink 来创建 Mono。
Mono.create(sink -> sink.success("测试create")).subscribe(System.out::println);
}
}

运行结果:

5. Flux API

MonoFlux都是实现org.reactivestreams.Publisher接口的抽象类。

Flux表示连续序列,和Mono的创建方法有些不同,MonoFlux的简化版,Flux可以用来表示流。

  • Flux API
    • **just()**:可以指定序列中包含的全部元素。
    • **range()**:可以用来创建连续数值。
    • **empty()**:创建一个不包含任何元素。
    • **error(Throwable error)**:创建一个只包含错误消息的序列。
    • **fromIterable()**:通过迭代器创建如list,set
    • **fromStream()**:通过流创建
    • **fromArray(T[])**:通过列表创建 如 String[], Integer[]
    • **merge()**:通过将两个flux合并得到新的flux
    • **interval()**:每隔一段时间生成一个数字,从1开始递增
  • API使用案例如下所示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
@SpringBootTest
public class FluxTest {
@Test
public void flux () throws InterruptedException {
// 通过just赋值
Flux<Integer> intFlux = Flux.just(1, 2, 3, 4, 5);
// 以6开始,取4个值:6,7,8,9
Flux<Integer> rangeFlux = Flux.range(6, 4);
// 通过merge合并
Flux<Integer> intMerge = Flux.merge(intFlux, rangeFlux);
intMerge.subscribe(System.out::print);
System.out.println();//换行
// 通过fromArray构建
Flux.fromArray(new Integer[]{1,3,5,7,9}).subscribe(System.out::print);
System.out.println();//换行
// 通过流和迭代器创建
Flux<String> strFluxFromStream = Flux.fromStream(Stream.of(" just", " test", " reactor", " Flux", " and", " Mono"));
Flux<String> strFluxFromList = Flux.fromIterable(Arrays.asList(" just", " test", " reactor", " Flux", " and", " Mono"));
// 通过merge合并
Flux<String> strMerge = Flux.merge(strFluxFromStream, strFluxFromList);
strMerge.subscribe(System.out::print);
System.out.println();
// 通过interval创建流数据
Flux.interval(Duration.ofMillis(100)).map(String::valueOf)
.subscribe(System.out::print);
Thread.sleep(2000);
}
}

运行结果:

1
2
3
4
123456789
13579
just test reactor Flux and Mono just test reactor Flux and Mono
012345678910111213141516171819

6. subscribe方法

subscribe()方法表示对数据流的订阅动作,subscribe()方法有多个重载的方法,最多可以传入四个参数;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Test
public void subscribe () throws InterruptedException {
// 测试Mono
Mono.just(1).subscribe(System.out::println);
// 测试Flux
Flux.just('a', 'b').subscribe(System.out::println);
// 测试2个参数的subscribe方法
Flux.just('i', 'j').map(chr -> {
if ('j'== chr) throw new RuntimeException("test 2 parameters");
else return String.valueOf(chr);
})
.subscribe(System.out::println, // 参数1,接受内容
err -> log.error(err.getMessage())); // 参数2,对err处理的lambda函数
// 测试3个参数的subscribe方法
Flux.just("你", "我", "他", "它", "ta")
.subscribe(System.out::print, // 参数1,接受内容
System.err::println, // 参数2,对err处理的lambda函数
() -> System.out.println("complete for 3"));// 参数3,完成subscribe之后执行的lambda函数
// 测试4个参数的subscribe方法
Flux.interval(Duration.ofMillis(100))
.map(i -> {
if (i == 3) throw new RuntimeException("fake a mistake");
else return String.valueOf(i);
})
.subscribe(info -> log.info("info: {}", info), // 参数1,接受内容
err -> log.error("error: {}", err.getMessage()),// 参数2,对err处理的lambda函数
() -> log.info("Done"), // 参数3,完成subscribe之后执行的lambda函数
sub -> sub.request(10)); // 参数4,Subscription操作,设定从源头获取元素的个数
Thread.sleep(2000);
}

运行结果:

7. 使用StepVerifier测试响应式异步代码

通过expectNext执行类似断言的功能,如果断言不符合实际情况,就会报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void StepVerifier () {
// 使用StepVerifier测试Flux,正常
Flux flux = Flux.just(1, 2, 3, 4, 5, 6);
StepVerifier.create(flux)
// 测试下一个期望的数据元素
.expectNext(1, 2, 3, 4, 5, 6)
// 测试下一个元素是否为完成信号
.expectComplete()
.verify();
// 使用StepVerifier测试Mono,报错
Mono<String> mono = Mono.just("charles").log();
StepVerifier.create(mono)
.expectNext("char")
.verifyComplete();
}

运行结果:

1
2
java.lang.AssertionError: expectation "expectNext(char)" failed (expected value: char; actual value: charles)
...

参考连接:https://mp.weixin.qq.com/s/O1VGS7d1TLQhgrCaQ-UQCw