blog/source/_posts/successor-of-observable.md
2021-05-13 10:19:30 +08:00

144 lines
7.9 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
title: Observable的后来者响应式接口
date: 2021-04-10 20:26:21
tags: [JVM, Java, Observable, Concurrent, Flow, Publisher, Subscriber]
categories:
- [JVM, Java]
keywords: JVM,Java,Observable,Concurrent,Flow,Publisher,Subscriber
---
在包`java.util`中的接口`Observer`和类`Observable`提供了非常常用的观察者模式实现而且这一实现在许多项目中都有着广泛的应用。但是随着Java 9的发布这一著名实现却被`@Deprecated`标记了,难道观察者模式已经过时了吗?然而并不是。<!--more-->`Observable`类被废弃,其主要原因是这个类并没有实现线程安全,而观察者模式又常常用在多线程项目里,这样的一对矛盾就给项目埋下了不少隐患。
当一个常用的接口被废弃那么一定会有一个更优化的继任者出现。自Java 9开始`Observable`的继任者就是在包`java.util.concurrent`中定义的`Flow`系列接口和类,`Flow`中的四个接口定义了一套响应流式处理的过程。一提到响应流式处理我们就会自然的想到Rx系列类库而`Flow`中定义的系列接口也的确类似于Rx类库但是形态比Rx类库要简单使用却要复杂一些。
`Flow`中的接口主要有以下这四个:
- `Flow.Publisher<T>`:生产者。
- `Flow.Subscriber<T>`:消费者。
- `Flow.Processor<T, R>`:即作为生产者,又作为消费者的中间处理组件。
- `Flow.Subscription`:生产者对于消费者的订阅,是两者之间沟通的桥梁。
## 消费者
消费者的概念是比较简单的,`Flow.Subscriber`接口只需要实现以下四个方法。
- `void onSubscribe(Subscription)`,当消费者被生产者订阅时,即调用这个方法通知消费者订阅已经开始,消费者可以在这里通过`Subscription`来对订阅做一些操作。
- `void onComplete()`,当生产者所产生的内容结束时,可以调用这个方法以示内容生产完结,订阅正常结束。
- `void onError(Throwable)`,当生产者出现异常时,异常可以通过这个方法抛出给消费者。当这个方法被调用的时候,订阅将会异常结束。
- `void onNext(T)`,这是消费者处理生产者发送来的数据的主要方法,这个方法每次只处理一个来自于生产者的数据内容。
以下是一个专门接受整型数据并将其打印出来的简单示例。
```java
class NumberPrinter implements Flow.Subscriber<Long> {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(50);
}
@Override
public void onComplete() {
System.out.println("Complete");
}
@Override
public void onError(Throwable throwable) {
throwable.printStacktrace();
}
@Override
public void onNext(Long item) {
System.out.println(item);
}
}
```
这个示例已经十分的简单了,里面唯一需要进行额外解释的部分是`subscription.request(n)`。与Rx库不同Flow中的接口天生带有回压控制也就是说作为消费者是可以通过Subecription来控制生产者发送数据的频率的。消费者每次调用`.request(n)`方法,其意义都是向生产者发出一个请求,请求生产者发送至多`n`个数据或者仅仅是传递一个控制信号等。Subscription中的`.request(n)`方法是整个Flow的响应式接口中比较核心的部分在下文中将会详细的说明。
## 生产者
Flow中的生产者接口`Flow.Publisher`只需要实现一个方法:`void subscribe(Subscriber<T>)`。这个方法允许生产者订阅消费者这与我们平时的理解是不一样的但是与Rx库中的习惯是相同的。虽然生产者中的接口直接提供了消费者的实例但是我们却不能在在生产者中直接调用消费者中的方法这样调用不会产生任何效果。
所以从这个角度来看任何实现了生产者接口的类实际上都只是一个订阅调度器其主要作用是为它订阅的消费者分配订阅Subscription。而依据JDK官方文档的说明每个实现了生产者接口的类一般都会包含一个实现了`Flow.Subscription`的静态内部类或者能够为消费者构建不同Subscription接口实例的工厂。
至此我们又回到了Subscription接口所以说Subscription接口是整个Flow系列接口的核心。前面也提到了Subscription接口是沟通生产者和消费者的桥梁所以对于生产者来说它并不直接持有所有消费者实例而是持有消费者对应的Subscription实例对于消费者来说它也并不直接与生产者打交道而是通过它所持有的Subscription实例来实施间接的控制。
Subscription接口需要实现两个方法`void request(Long)`和`void cancel()`。其中`void cancel()`十分容易理解,当调用这个方法的时候,代表消费者或者生产者将要取消订阅了,在这个方法里可以放置一些资源清理或者需要其他额外通知的任务。
方法`void request(Long)`在前面已经提到过了,在消费者中调用可以向生产者发出请求。这个方法实际上本质设计意图是让消费者向生产者声明自己的缓冲区大小,让生产者一次性推送的内容不至于溢出缓冲区。在实际的操作中,`void request(Long)`里除了包含对于缓冲区的控制以外,还担负着向消费者发送数据的功能,或者控制向消费者发送数据的过程。
这里用一个可以产生斐波那契数列的生产者来做示例这个生产者使用一个静态内部类来定义Subscription每次只发送请求数量的数字。
```java
class FibonacciProducer implements Flow.Publisher<Long> {
@Override
public void subscribe(Flow.Subscribe<? super Long> subscriber) {
subscriber.onSubscribe(new FibonacciSubscription(subscriber));
}
public static class FibonacciSubscription implements Flow.Subscription {
private final Flow.Subscriber<? super Long> subscriber;
private long a = 0;
private long b = 1;
public FibonacciSubscription(Flow.Subscriber<? super Long> subscriber) {
this.subscriber = subscriber;
}
@Override
public void request(long n) {
if (n > 0) {
long counter = 0;
while (counter < n) {
this.subscriber.onNext(b);
long temp = a;
a = b;
b += temp;
counter++;
}
} else {
subscriber.onError(new IllegalArgumentException());
}
}
@Override
public void cancel() {
this.subscriber.onComplete();
}
}
}
```
要试验这个示例,可以在`main()`函数中完成订阅以后,多次调用消费者的`request()`方法就可以观察到生产者和消费者之间的联动关系。这个示例中没有涉及任何多线程的内容也没有展示Flow系列接口的线程安全能力读者可自行在示例中加入多线程设计来体验一下。
至于`Flow.Processor<T, R>`是更加简单的可以看一下Java的文档这个接口直接继承了`Flow.Publisher`和`Flow.Subscriber`,所以这个中间处理接口就是两者的简单结合体,在使用时也只需要结合两者的实现即可。
## 生产者与消费者的交互示意图
```plantuml
@startuml
autonumber
control Main as m
control Publisher as pub
control Subscriber as sub
m -> pub : 使用subscribe方法订阅
pub -> sub : 调用onSubscribe方法
create control Subscription as ss
pub -> ss : 创建Subscription
ss -> sub : 交予Subscription
activate ss
sub -> ss : 调用request
alt 请求合法
ss -> sub : 控制调用onNext
ss -> sub : 调用onComplete结束
else 请求不合法
ss -> sub : 调用onError传递错误
else 主动结束订阅
sub -> ss : 调用cancel
end
deactivate ss
@enduml
```