前言:在Dart库中,实现异步编程主要有两种方式Future&Stream,使用他们需要引入dart:async
。
本文主要介绍Stream的相关概念及基础用法和原理解析。
Stream的基本用法 Stream
能通过async*
和StreamController
产生,也能通过其他Stream
转换而来,相较于async*
,StreamController
因为灵活性高,因此更为常用,但两者在使用场景也有一定差异。
Async* async
在flutter开发中很常见,但是加上*的async*
可能就未必见过,同样作为Flutter里异步处理的一环,async
主要是跟Future
打交道,而async*
处理的对象是Stream
,async*
在使用上需要搭配yield
。下面的代码演示了如何使用async*
进行1到10的相加。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Stream<int >countStream(int to) async *{ print ('Stream 被监听' ); for (int i = 0 ; i <= to; i++) { yield i; } } Future<int >sumStream(Stream<int > stream) async { var sum = 0 ; await for (final value in stream) { sum += value; } return sum; }void main() async () { var stream = countStream(10 ); var sum = await sumStream(stream); print (sum); }
在上面的示例中,async*
方法体里yield
在每次的遍历中,都往Stream
返回一个数据,通过await for
的监听拿到每次返回的值,接着执行sum
操作。值得注意的是,async*
这种方式产生的Stream,当stream
没有被监听时,async*
方法体是不会被执行的。
如果你看着async*
还有点别扭,请记住:**async
返回的是一个Future
,而async*
返回的是一个Stream
。
StreamController 在flutter中StreamController
是比较常用的创建Stream
方法。只需要构造出StreamController
对象,然后通过.stream
就可以得到对应的Stream
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 Stream<int > countStream(int to) { late StreamController<int > controller; controller = StreamController<int >(onListen: () { for (var i = 0 ; i <= to; i++) { controller.add(i); } controller.close(); }); return controller.stream; } Future<int > listenOn(Stream<int > stream) async { var completer = Completer<int >(); var sum = 0 ; stream.listen((event) { sum += event; print ('sum:$sum , event:$event ' ); }, onDone: () => completer.complete(sum)); return completer.future; } Future<void > onInit() async { super .onInit(); var stream = countStream(10 ); var sum = await listenOn(stream); print (sum); }
Stream的组成 Flutter中的Stream处理,主要涉及三类对象,以发布订阅模式的角度去看的话,可以分为发布者StreamController 、数据通道Stream 、订阅者StreamSubscription 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class Example { var controller = StreamController<int >(); Stream<int > get stream => controller.stream; StreamSubscription<int >? _subscription; void initState() { _subscription = stream.listen((event) { print (event); }); for (var i = 0 ; i <= 10 ; i++) { controller.add(i); } } void dispose() { _subscription?.cancel(); _subscription = null ; controller.close(); } }
每一个StreamController
都对应着一个Stream
,当Stream
被订阅时,会得到一个StreamSubscription
对象。上面代码简单的使用了Stream并展示了他们的关系,但是他们内部的原理是怎么样了,事件从发布到消费的过程又是怎样的,可以通过数据流向图来看到。
数据流向图
在事件处理上:Stream
在被订阅时,会创建StreamSubscription
,并将其中的onData
等事件处理的回调传给StreamSubscription
。
在事件输入输出上:StreamController
通过add
方法输入事件后,先判断此时是否存在订阅者StreamSubscription
,如果存在则调用StreamSubscription
的onData
处理,不存在就先存到_pendingEvents
里,等到下次StreamSubscription
出现了再向它输出事件。
可以看到,StreamController
在整个事件流向的处理中肩负着最重要的使命,它控制着事件如何输入和输出,StreamSubscription
负责处理输出到这里的事件,Stream
在得到StreamSubscription
后就完成了它的使命选择“退隐山林”。
这么讲可能还有点“干”,为了更直观的介绍他们各自的职责,接下来我们从他们定义的接口出发,去思考他们都能做哪些事件。为了方便呈现,我只取其中最关键的部分。
StreamController 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 abstract interface class StreamController <T > implements StreamSink <T > { Stream<T> get stream; abstract void Function ()? onListen; abstract void Function ()? onPause; abstract void Function ()? onResume; abstract void Function ()? onCancel; bool get isClosed; bool get isPaused; bool get hasLitener; Future addStream(Stream<T> source, {bool? cancelOnError}); void add(T event); void addError(Object error, [StackTrace? stackTrace]); Future close(); void _sendData(T data); void _sendError(Object error, StackTrace stackTrace); void _sendDone(); }
1.StreamController
负责管理事件流的状态,当状态变化时,会触发到相应的回调(onListen/onPause等)。
2.StreamController
负责事件的输入,输入的方式有两种,一种是事件接口add
、addError
;另外一种是通过监听其它的Stream
;同时事件也分为两种,一种是正常事件,一种是错误事件。
3.StreamController
能关闭这个事件流通道,会产生一个onDone
事件。
4.StreamController
负责事件的输出,不同的输入对应不同的输出。
Stream 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 abstract mixin class Stream <T > { bool get isBroadcast => false ; StreamSubscription<T> listen(void onData(T event)?, {Function? onError, void onDone()?, bool? cancelOnError}); Stream<T> where(bool test(T event)) { ... } Stream<S> map<S>(S convert(T event)) { ... } Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) { ...} Stream<E> asyncExpand<E>(Stream<E>? convert(T event)) { ... } Stream<T> handleError(Function onError, {bool test(error)?}) { ... } ... }
StreamSubscription 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 abstract interface class StreamSubscription <T > { void onData(void handleData(T data)?); void onError(Function? handleError); void onDone(void handleDone()?); void pause([Future<void >? resumeSignal]); void resume(); bool get isPaused; Future<void > cancel(); Future<E> asFuture<E>([E? futureValue]); }
1.StreamSubscription
作为事件输出端,负责事件的输出处理。
2.StreamSubscription
也能对自己的订阅行为进行暂停、恢复或取消等动作。
Stream的分类 同步和异步 Stream在输入端可以分为同步流和异步流:
StreamController
的工厂方法中,通过sync
可以指定同步或者异步。同步和异步的区别是:事件输入后是否会立即输出。
同步流在事件输入后立刻执行onData
,异步流在事件输入后注册一个异步事件,等到当前EventLoop
中的同步事件处理后触发onData
。
1 2 3 4 5 6 7 8 9 10 factory StreamController( {void onListen()?, void onPause()?, void onResume()?, FutureOr<void > onCancel()?, bool sync = false }) { return sync ? _SyncStreamController<T>(onListen, onPause, onResume, onCancel) : _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); }
在实现上看,_SyncStreamController
最终输出时使用的是_SyncStreamControllerDispatch
,_AsyncStreamController
使用的是_AsyncStreamControllerDispatch
。
两者在输出处理不同,_SyncStreamControllerDispatch
调用的是subscription
的_add
方法,_AsyncStreamControllerDispatch
调用的是subscription
的_addPending
方法。_addPending
会先将事件存到队列里,同时如果队列没有在跑就开启队列的处理,通过scheduleMicrotask
对事件进行异步处理,处理完当前事件继续处理队列时的其它事件,直到队列清空。
广播和非广播 Stream在输出端可以分为两类:
单订阅流或者叫非广播流(Single Subscription),这种Stream最多只能有一个监听器(Listener)
多订阅流或者叫广播流(Broadcast),这种流可以有多个监视器监听(Listener)
上述代码中生产的是非广播流,广播流通过StreamController.broadcast
方法创建。广播和非广播的区别是是否允许多次订阅。
1 2 3 4 5 6 factory StreamController.broadcast( {void onListen()?, void onCancel()?, bool sync = false }) { return sync ? _SyncBroadcastStreamController<T>(onListen, onCancel) : _AsyncBroadcastStreamController<T>(onListen, onCancel); }
非广播StreamController
继承自_StreamController
,广播StreamController
继承自_BroadcastStreamController
,两者的区别可以通过_subscribe
的实现体现。_StreamController
的实现如下,当重复订阅后会直接抛出 StateError 异常。
1 2 3 4 5 6 7 8 9 10 StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError, void onDone()?, bool cancelOnError) { if (!_isInitialState) { throw StateError("Stream has already been listened to." ); } _ControllerSubscription<T> subscription = _ControllerSubscription<T>( this , onData, onError, onDone, cancelOnError); return subscription; }
_BroadcastStreamController
里面有两个对象_firstSubscription
、_lastSubscription
,_BroadcastSubscription
是双向链表结构。当需要输出事件时,通过整个链表,通知所有的订阅进行消息的处理。
1 2 _BroadcastSubscription<T>? _firstSubscription; _BroadcastSubscription<T>? _lastSubscription;