Flutter-Stream学习(一)

前言:在Dart库中,实现异步编程主要有两种方式Future&Stream,使用他们需要引入dart:async

本文主要介绍Stream的相关概念及基础用法和原理解析。

Stream的基本用法

Stream能通过async*StreamController产生,也能通过其他Stream转换而来,相较于async*StreamController因为灵活性高,因此更为常用,但两者在使用场景也有一定差异。

Async*

async在flutter开发中很常见,但是加上*的async*可能就未必见过,同样作为Flutter里异步处理的一环,async主要是跟Future打交道,而async*处理的对象是Streamasync*在使用上需要搭配yield。下面的代码演示了如何使用async*进行1到10的相加。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Stream<int>countStream(int to) async*{
print('Stream 被监听');
for (int i = 0; i <= to; i++) {
yield i;
}
}
Future<int>sumStream(Stream<int> stream) async{
var sum = 0;
await for(final value in stream) {
sum += value;
}
return sum;
}

void main() async() {
var stream = countStream(10);
// 当注释掉下面这行,控制台不会打印出 "stream 被监听",也就表示 async* 方法体没被执行
var sum = await sumStream(stream);
print(sum); // 55
}

在上面的示例中,async*方法体里yield在每次的遍历中,都往Stream返回一个数据,通过await for 的监听拿到每次返回的值,接着执行sum操作。值得注意的是,async*这种方式产生的Stream,当stream没有被监听时,async* 方法体是不会被执行的。

如果你看着async*还有点别扭,请记住:**async返回的是一个Future,而async*返回的是一个Stream

StreamController

在flutter中StreamController是比较常用的创建Stream方法。只需要构造出StreamController对象,然后通过.stream就可以得到对应的Stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

Stream<int> countStream(int to) {
//先创建StreamController
late StreamController<int> controller;
controller = StreamController<int>(onListen: () {
//当Stream被监听时会触发onListen回调
for (var i = 0; i <= to; i++) {
controller.add(i);
}
controller.close();
});
return controller.stream;
}

Future<int> listenOn(Stream<int> stream) async {
var completer = Completer<int>();
var sum = 0;
//监听Stream
stream.listen((event) {
sum += event;
print('sum:$sum, event:$event');
}, onDone: () => completer.complete(sum));
return completer.future;
}
Future<void> onInit() async {
super.onInit();
var stream = countStream(10);
var sum = await listenOn(stream);
print(sum);//55
}

Stream的组成

Flutter中的Stream处理,主要涉及三类对象,以发布订阅模式的角度去看的话,可以分为发布者StreamController数据通道Stream订阅者StreamSubscription

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Example {
var controller = StreamController<int>();

Stream<int> get stream => controller.stream;
StreamSubscription<int>? _subscription;

void initState() {
_subscription = stream.listen((event) {
print(event);
});
for (var i = 0; i <= 10; i++) {
controller.add(i);
}
}

void dispose() {
_subscription?.cancel();
_subscription = null;
controller.close();
}
}

每一个StreamController都对应着一个Stream,当Stream被订阅时,会得到一个StreamSubscription对象。上面代码简单的使用了Stream并展示了他们的关系,但是他们内部的原理是怎么样了,事件从发布到消费的过程又是怎样的,可以通过数据流向图来看到。

数据流向图

Stream (1).png

在事件处理上:Stream在被订阅时,会创建StreamSubscription,并将其中的onData等事件处理的回调传给StreamSubscription

在事件输入输出上:StreamController通过add方法输入事件后,先判断此时是否存在订阅者StreamSubscription,如果存在则调用StreamSubscriptiononData处理,不存在就先存到_pendingEvents里,等到下次StreamSubscription出现了再向它输出事件。

可以看到,StreamController 在整个事件流向的处理中肩负着最重要的使命,它控制着事件如何输入和输出,StreamSubscription负责处理输出到这里的事件,Stream在得到StreamSubscription后就完成了它的使命选择“退隐山林”。

这么讲可能还有点“干”,为了更直观的介绍他们各自的职责,接下来我们从他们定义的接口出发,去思考他们都能做哪些事件。为了方便呈现,我只取其中最关键的部分。

StreamController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
abstract interface class StreamController<T> implements StreamSink<T> {

// stream 流
Stream<T> get stream;

// 流状态的回调
abstract void Function()? onListen; // 被监听
abstract void Function()? onPause; // 流暂停
abstract void Function()? onResume; // 流恢复
abstract void Function()? onCancel; // 流取消/关闭

// 流状态
bool get isClosed;
bool get isPaused;
bool get hasLitener; // 当前流是否有订阅者

// 监听 source,转发给 stream
Future addStream(Stream<T> source, {bool? cancelOnError});

// 往流里面添加事件
void add(T event);
void addError(Object error, [StackTrace? stackTrace]);
Future close(); // 关闭流

// 输出事件
// 以下这三个接口在 _StreamControllerBase 中
void _sendData(T data);
void _sendError(Object error, StackTrace stackTrace);
void _sendDone();
}
  • 1.StreamController负责管理事件流的状态,当状态变化时,会触发到相应的回调(onListen/onPause等)。

  • 2.StreamController负责事件的输入,输入的方式有两种,一种是事件接口addaddError;另外一种是通过监听其它的Stream;同时事件也分为两种,一种是正常事件,一种是错误事件。

  • 3.StreamController能关闭这个事件流通道,会产生一个onDone事件。

  • 4.StreamController负责事件的输出,不同的输入对应不同的输出。

Stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
abstract mixin class Stream<T> {

// 是否地广播流,广播流允许多订阅
bool get isBroadcast => false;

// 监听流变化,返回订阅者
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});

// 一系列 Stream 处理和变换操作

Stream<T> where(bool test(T event)) { ... }
Stream<S> map<S>(S convert(T event)) { ... }
Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) { ...}
Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) { ... }
Stream<T> handleError(Function onError, {bool test(error)?}) { ... }
...
}
  • 1.listen方法可以订阅Stream事件流,返回当前订阅者,并把listen方法中的onData等方法注册到当前订阅者中。

  • 2.Stream有很多过滤转换等语法糖方法。

StreamSubscription

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
abstract interface class StreamSubscription<T> {

// 监听数据变化
void onData(void handleData(T data)?);
void onError(Function? handleError);
void onDone(void handleDone()?);

// 暂停/恢复 监听
void pause([Future<void>? resumeSignal]);
void resume();
bool get isPaused;

// 取消监听
Future<void> cancel();
// 转成 Future 对象,监听流结束事件
Future<E> asFuture<E>([E? futureValue]);
}
  • 1.StreamSubscription作为事件输出端,负责事件的输出处理。
  • 2.StreamSubscription也能对自己的订阅行为进行暂停、恢复或取消等动作。

Stream的分类

同步和异步

Stream在输入端可以分为同步流和异步流:

StreamController的工厂方法中,通过sync可以指定同步或者异步。同步和异步的区别是:事件输入后是否会立即输出。

同步流在事件输入后立刻执行onData,异步流在事件输入后注册一个异步事件,等到当前EventLoop中的同步事件处理后触发onData

1
2
3
4
5
6
7
8
9
10
factory StreamController(
{void onListen()?,
void onPause()?,
void onResume()?,
FutureOr<void> onCancel()?,
bool sync = false}) {
return sync
? _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}

在实现上看,_SyncStreamController最终输出时使用的是_SyncStreamControllerDispatch_AsyncStreamController使用的是_AsyncStreamControllerDispatch

两者在输出处理不同,_SyncStreamControllerDispatch调用的是subscription_add方法,_AsyncStreamControllerDispatch调用的是subscription_addPending方法。_addPending会先将事件存到队列里,同时如果队列没有在跑就开启队列的处理,通过scheduleMicrotask对事件进行异步处理,处理完当前事件继续处理队列时的其它事件,直到队列清空。

广播和非广播

Stream在输出端可以分为两类:

  • 单订阅流或者叫非广播流(Single Subscription),这种Stream最多只能有一个监听器(Listener)
  • 多订阅流或者叫广播流(Broadcast),这种流可以有多个监视器监听(Listener)

上述代码中生产的是非广播流,广播流通过StreamController.broadcast方法创建。广播和非广播的区别是是否允许多次订阅。

1
2
3
4
5
6
factory StreamController.broadcast(
{void onListen()?, void onCancel()?, bool sync = false}) {
return sync
? _SyncBroadcastStreamController<T>(onListen, onCancel)
: _AsyncBroadcastStreamController<T>(onListen, onCancel);
}

非广播StreamController继承自_StreamController,广播StreamController继承自_BroadcastStreamController,两者的区别可以通过_subscribe的实现体现。_StreamController的实现如下,当重复订阅后会直接抛出 StateError 异常。

1
2
3
4
5
6
7
8
9
10
StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError,
void onDone()?, bool cancelOnError) {
if (!_isInitialState) {
throw StateError("Stream has already been listened to.");
}
_ControllerSubscription<T> subscription = _ControllerSubscription<T>(
this, onData, onError, onDone, cancelOnError);
// ...
return subscription;
}

_BroadcastStreamController里面有两个对象_firstSubscription_lastSubscription_BroadcastSubscription是双向链表结构。当需要输出事件时,通过整个链表,通知所有的订阅进行消息的处理。

1
2
_BroadcastSubscription<T>? _firstSubscription;
_BroadcastSubscription<T>? _lastSubscription;

Flutter-Stream学习(一)
https://ilittle.fun/2024/06/21/Flutter-Stream学习(一)/
作者
Leelt
发布于
2024年6月21日
许可协议