Spring WebFlux
是Spring Framework 5.0
中引入的以Reactor
为基础的响应式编程Web
框架。
WebFlux 的异步处理是基于Reactor
实现的,是将输入流适配成Mono
或Flux
进行统一处理。
1. 响应式流(Reactive Streams)
Reactor 是一个响应式流,它有对应的发布者(
Publisher
),用两个类来表示:Flux
(返回0-n个元素)Mono
(返回0或1个元素)
Reactor 的订阅者(
Subscriber
)则是由Spring
框架去完成。响应式流(Reactive Streams) 其实就是一个规范,其特点:
- 无阻塞;
- 一个数据流;
- 可以异步执行;
- 能够处理背压;
背压(Backpressure) 可以简单理解为 消费决定生产,生产者可以根据消费压力进行动态调节生产速率的机制。
2. 发布者(Publisher)
由于响应流的特点,我们不能再返回一个简单的POJO
对象来表示结果了。必须返回一个类似Java
中的Future
的概念,在有结果可用时通知消费者进行消费响应。
Reactive Stream
规范中这种被定义为Publisher
,Publisher
是一个可以提供0-N
个序列元素的提供者,并根据其订阅者Subscriber
的需求推送元素。一个Publisher
可以支持多个订阅者,并可以根据订阅者的逻辑进行推送序列元素。
可以通过下图Excel来理解,1-9
行可以看作发布者Publisher
提供的元素序列,10-13
行的结果计算看作订阅者Subscriber
。
响应式的一个重要特点:当没有订阅时发布者(Publisher
)什么也不做。
而Flux
和Mono
都是Publisher
在Reactor3
实现。Publisher
提供了subscribe
方法,允许消费者在有结果可用时进行消费。如果没有消费者Publisher
不会做任何事情,他根据消费情况进行响应。Publisher
可能返回零或者多个,甚至可能是无限的,为了更加清晰表示期待的结果就引入了两个实现模型Mono
和Flux
。
在WebFlux
中,你的方法只需返回Mono或Flux
即可。你的代码基本也只和Mono
或Flux
打交道。而WebFlux
则会实现Subscriber
,onNext
时将业务开发人员编写的Mono
或Flux
转换为HTTP Response
返回给客户端。
3. Mono和Flux的抽象模型
Mono和Flux都是Publisher(发布者)的实现模型。
3.1 Flux
Flux
是一个发出(emit
)0-N
个元素组成的异步序列的Publisher<T>
,可以被onComplete
信号或者onError
信号所终止。
在响应流规范中存在三种给下游消费者调用的方法 onNext
, onComplete
, 和onError
。
下面这张图表示了Flux
的抽象模型:
3.2 Mono
Mono
是一个发出(emit
)0-1
个元素的Publisher<T>
,可以被onComplete
信号或者onError
信号所终止。
下面这张图表示了Mono
的抽象模型(整体和Flux
差不多,只不过这里只会发出0-1
个元素):
4. Mono API
Mono
和Flux
都是实现org.reactivestreams.Publisher
接口的抽象类。
Mono
代表0-1
个元素的发布者(Publisher
)。
Mono
里面有很多API
:- **just()**:可以指定序列中包含的全部元素。创建出来的
Mono
序列在发布这些元素之后会自动结束。 - **empty()**:创建一个不包含任何元素,只发布结束消息的序列。
- **justOrEmpty(Optional<? extends T> data)**:从一个
Optional
对象或可能为null
的对象中创建Mono
。只有Optional
对象中包含值或对象不为null
时,Mono
序列才产生对应的元素。 - **error(Throwable error)**:创建一个只包含错误消息的序列。
- **never()**:创建一个不包含任何消息通知的序列。
- **delay(Duration duration)和delayMillis(long duration)**:创建一个
Mono
序列,在指定的延迟时间之后,产生数字0
作为唯一值。 - **fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和fromSupplier()**:分别从
Callable
、CompletionStage
、CompletableFuture
、Runnable
和Supplier
中创建Mono
。 - **ignoreElements(Publisher source)**:创建一个
Mono
序列,忽略作为源的Publisher
中的所有元素,只产生结束消息。 - **create()**:通过
create()
方法来使用MonoSink
来创建Mono
。
- **just()**:可以指定序列中包含的全部元素。创建出来的
API
使用案例如下所示。
1 |
|
运行结果:
5. Flux API
Mono
和Flux
都是实现org.reactivestreams.Publisher
接口的抽象类。
Flux
表示连续序列,和Mono
的创建方法有些不同,Mono
是Flux
的简化版,Flux
可以用来表示流。
Flux API
:- **just()**:可以指定序列中包含的全部元素。
- **range()**:可以用来创建连续数值。
- **empty()**:创建一个不包含任何元素。
- **error(Throwable error)**:创建一个只包含错误消息的序列。
- **fromIterable()**:通过迭代器创建如list,set
- **fromStream()**:通过流创建
- **fromArray(T[])**:通过列表创建 如 String[], Integer[]
- **merge()**:通过将两个flux合并得到新的flux
- **interval()**:每隔一段时间生成一个数字,从1开始递增
API
使用案例如下所示。
1 |
|
运行结果:
1 | 123456789 |
6. subscribe方法
subscribe()方法表示对数据流的订阅动作,subscribe()方法有多个重载的方法,最多可以传入四个参数;
1 |
|
运行结果:
7. 使用StepVerifier测试响应式异步代码
通过expectNext执行类似断言的功能,如果断言不符合实际情况,就会报错。
1 |
|
运行结果:
1 | java.lang.AssertionError: expectation "expectNext(char)" failed (expected value: char; actual value: charles) |
参考连接:
https://mp.weixin.qq.com/s/O1VGS7d1TLQhgrCaQ-UQCw