前言:在Dart库中,实现异步编程主要有两种方式Future&Stream,使用他们需要引入dart:async。
本文主要介绍Stream的相关概念及基础用法和原理解析。
Stream的基本用法 Stream能通过async*和StreamController产生,也能通过其他Stream转换而来,相较于async*,StreamController因为灵活性高,因此更为常用,但两者在使用场景也有一定差异。
Async* async在flutter开发中很常见,但是加上*的async*可能就未必见过,同样作为Flutter里异步处理的一环,async主要是跟Future打交道,而async*处理的对象是Stream,async*在使用上需要搭配yield。下面的代码演示了如何使用async*进行1到10的相加。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Stream<int >countStream(int to) async *{ print ('Stream 被监听' ); for (int i = 0 ; i <= to; i++) { yield i; } } Future<int >sumStream(Stream<int > stream) async { var sum = 0 ; await for (final value in stream) { sum += value; } return sum; }void main() async () { var stream = countStream(10 ); var sum = await sumStream(stream); print (sum); }
在上面的示例中,async*方法体里yield在每次的遍历中,都往Stream返回一个数据,通过await for 的监听拿到每次返回的值,接着执行sum操作。值得注意的是,async*这种方式产生的Stream,当stream没有被监听时,async* 方法体是不会被执行的。
如果你看着async*还有点别扭,请记住:**async返回的是一个Future,而async*返回的是一个Stream。
StreamController 在flutter中StreamController是比较常用的创建Stream方法。只需要构造出StreamController对象,然后通过.stream就可以得到对应的Stream。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 Stream<int > countStream(int to) { late StreamController<int > controller; controller = StreamController<int >(onListen: () { for (var i = 0 ; i <= to; i++) { controller.add(i); } controller.close(); }); return controller.stream; } Future<int > listenOn(Stream<int > stream) async { var completer = Completer<int >(); var sum = 0 ; stream.listen((event) { sum += event; print ('sum:$sum , event:$event ' ); }, onDone: () => completer.complete(sum)); return completer.future; } Future<void > onInit() async { super .onInit(); var stream = countStream(10 ); var sum = await listenOn(stream); print (sum); }
Stream的组成 Flutter中的Stream处理,主要涉及三类对象,以发布订阅模式的角度去看的话,可以分为发布者StreamController 、数据通道Stream 、订阅者StreamSubscription 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class Example { var controller = StreamController<int >(); Stream<int > get stream => controller.stream; StreamSubscription<int >? _subscription; void initState() { _subscription = stream.listen((event) { print (event); }); for (var i = 0 ; i <= 10 ; i++) { controller.add(i); } } void dispose() { _subscription?.cancel(); _subscription = null ; controller.close(); } }
每一个StreamController都对应着一个Stream,当Stream被订阅时,会得到一个StreamSubscription对象。上面代码简单的使用了Stream并展示了他们的关系,但是他们内部的原理是怎么样了,事件从发布到消费的过程又是怎样的,可以通过数据流向图来看到。
数据流向图
在事件处理上:Stream在被订阅时,会创建StreamSubscription,并将其中的onData等事件处理的回调传给StreamSubscription。
在事件输入输出上:StreamController通过add方法输入事件后,先判断此时是否存在订阅者StreamSubscription,如果存在则调用StreamSubscription的onData处理,不存在就先存到_pendingEvents里,等到下次StreamSubscription出现了再向它输出事件。
可以看到,StreamController 在整个事件流向的处理中肩负着最重要的使命,它控制着事件如何输入和输出,StreamSubscription负责处理输出到这里的事件,Stream在得到StreamSubscription后就完成了它的使命选择“退隐山林”。
这么讲可能还有点“干”,为了更直观的介绍他们各自的职责,接下来我们从他们定义的接口出发,去思考他们都能做哪些事件。为了方便呈现,我只取其中最关键的部分。
StreamController 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 abstract interface class StreamController <T > implements StreamSink <T > { Stream<T> get stream; abstract void Function ()? onListen; abstract void Function ()? onPause; abstract void Function ()? onResume; abstract void Function ()? onCancel; bool get isClosed; bool get isPaused; bool get hasLitener; Future addStream(Stream<T> source, {bool? cancelOnError}); void add(T event); void addError(Object error, [StackTrace? stackTrace]); Future close(); void _sendData(T data); void _sendError(Object error, StackTrace stackTrace); void _sendDone(); }
1.StreamController负责管理事件流的状态,当状态变化时,会触发到相应的回调(onListen/onPause等)。
2.StreamController负责事件的输入,输入的方式有两种,一种是事件接口add、addError;另外一种是通过监听其它的Stream;同时事件也分为两种,一种是正常事件,一种是错误事件。
3.StreamController能关闭这个事件流通道,会产生一个onDone事件。
4.StreamController负责事件的输出,不同的输入对应不同的输出。
Stream 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 abstract mixin class Stream <T > { bool get isBroadcast => false ; StreamSubscription<T> listen(void onData(T event)?, {Function? onError, void onDone()?, bool? cancelOnError}); Stream<T> where(bool test(T event)) { ... } Stream<S> map<S>(S convert(T event)) { ... } Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) { ...} Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) { ... } Stream<T> handleError(Function onError, {bool test(error)?}) { ... } ... }
StreamSubscription 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 abstract interface class StreamSubscription <T > { void onData(void handleData(T data)?); void onError(Function? handleError); void onDone(void handleDone()?); void pause([Future<void >? resumeSignal]); void resume(); bool get isPaused; Future<void > cancel(); Future<E> asFuture<E>([E? futureValue]); }
1.StreamSubscription作为事件输出端,负责事件的输出处理。
2.StreamSubscription也能对自己的订阅行为进行暂停、恢复或取消等动作。
Stream的分类 同步和异步 Stream在输入端可以分为同步流和异步流:
StreamController的工厂方法中,通过sync可以指定同步或者异步。同步和异步的区别是:事件输入后是否会立即输出。
同步流在事件输入后立刻执行onData,异步流在事件输入后注册一个异步事件,等到当前EventLoop中的同步事件处理后触发onData。
1 2 3 4 5 6 7 8 9 10 factory StreamController( {void onListen()?, void onPause()?, void onResume()?, FutureOr<void > onCancel()?, bool sync = false }) { return sync ? _SyncStreamController<T>(onListen, onPause, onResume, onCancel) : _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); }
在实现上看,_SyncStreamController最终输出时使用的是_SyncStreamControllerDispatch,_AsyncStreamController使用的是_AsyncStreamControllerDispatch。
两者在输出处理不同,_SyncStreamControllerDispatch调用的是subscription的_add方法,_AsyncStreamControllerDispatch调用的是subscription的_addPending方法。_addPending会先将事件存到队列里,同时如果队列没有在跑就开启队列的处理,通过scheduleMicrotask对事件进行异步处理,处理完当前事件继续处理队列时的其它事件,直到队列清空。
广播和非广播 Stream在输出端可以分为两类:
单订阅流或者叫非广播流(Single Subscription),这种Stream最多只能有一个监听器(Listener)
多订阅流或者叫广播流(Broadcast),这种流可以有多个监视器监听(Listener)
上述代码中生产的是非广播流,广播流通过StreamController.broadcast方法创建。广播和非广播的区别是是否允许多次订阅。
1 2 3 4 5 6 factory StreamController.broadcast( {void onListen()?, void onCancel()?, bool sync = false }) { return sync ? _SyncBroadcastStreamController<T>(onListen, onCancel) : _AsyncBroadcastStreamController<T>(onListen, onCancel); }
非广播StreamController继承自_StreamController,广播StreamController继承自_BroadcastStreamController,两者的区别可以通过_subscribe的实现体现。_StreamController的实现如下,当重复订阅后会直接抛出 StateError 异常。
1 2 3 4 5 6 7 8 9 10 StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError) { if (!_isInitialState) { throw StateError("Stream has already been listened to." ); } _ControllerSubscription<T> subscription = _ControllerSubscription<T>( this , onData, onError, onDone, cancelOnError); return subscription; }
_BroadcastStreamController里面有两个对象_firstSubscription、_lastSubscription,_BroadcastSubscription是双向链表结构。当需要输出事件时,通过整个链表,通知所有的订阅进行消息的处理。
1 2 _BroadcastSubscription<T>? _firstSubscription; _BroadcastSubscription<T>? _lastSubscription;