在上一篇文章中我们大概了解关于Stream的一些概念和原理,并通过代码简洁说明了Stream的流程。本篇文章进一步学习Stream的其他内容同时用代码加深记忆。
Stream开发实战
通过上一篇文章对接口和分类的分析,已经对Stream有了进一步的认识,接下来通过例子来检验一下。
利用Stream实现事件的广播
事件的广播,在开发中总会遇到,尤其是在跨组件或跨页面的场景,相信大部分的开发在项目中都会引入EventBus的三方或者自研框架。例如:当我编辑资料后,保存完成通知其他页面进行刷新以展示最新的信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| class UserInfo { int uid; String name;
UserInfo(this.uid, this.name); }
class UserInfoChangeEvent {
static final _controller = StreamController<UserInfo>.broadcast();
static StreamSubscription<UserInfo> subscribe(Function(UserInfo) callback) { return _controller.stream.listen(callback); }
static void broadcast(UserInfo userInfo) { _controller.add(userInfo); } }
class UserProfileViewModel { ... void onSave(int uid, String name) { UserInfoChangeEvent.broadcast(UserInfo(uid, name)); }
}
class ViewState extends State<ViewWidget> {
StreamSubscription<UserInfo>? _subscription; UserInfo? _curUserInfo;
@override void initState() { super.initState(); _subscription = UserInfoChangeEvent.subscribe((userInfo) { setState(() { _curUserInfo = userInfo; }) }); }
@override void dispose() { super.dispose(); _subscription?.cancel(); _subscription = null; } }
|
上面的代码中,UserInfoChangeEvent
定义了广播类型的StreamController
,并且向外暴露了subscribe
和broadcast
接口,用户编辑页面后点击保存时执行onSave
方法,方法中的UserInfoChangeEvent
调用broadcast
方法向外部发送数据变化的事件;然后ViewState在初始化是注册了监听UserInfoChangeEvent
的subscription
,subscription
接收到事件并获取更新后的userInfo
赋值给_curUserInfo
,同时刷新页面。
Flutter提供了一个组件StreamBuilder,可以帮助我们方便的监听Stream并刷新Widget。例如进入一个页面时,通常会有一个数据加载的过程,此时页面会从Loading状态到Loaded/LoadError的状态变更,不同的状态呈现不同的页面,这时我们就需要定义一个LoadingState的枚举类型,在数据加载后通过StreamController发布LoadState状态,StreamBuilder监听到更新会自动触发Widget的刷新。
AsyncSnapShot是快照的意思,保存此刻最新的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| enum LoadingState { loading, error, success, } .... StreamBuilder<LoadingState>( stream: viewModel.loadingStateStrem, initialData: LoadingState.loading, builder: (context, snapshot) { var data = snapshot.data; if (data == LoadingState.loading) { return const Center(child: CircularProgressIndicator()); } else if (data == LoadingState.error) { return const Center(child: Text('加载失败')); } else if (data == LoadingState.success) { return TextWidget(text); } }, )
|
不过StreamBuilder也有坑,对于Stream来说,事件被消费了就会丢掉,无论是StreamController还是Stream都不会保存上次的值,以页面加载为例页面进来后viewModel执行数据加载完成后,想loadingStateStream里发布了Loaded的状态,如果此时StreamBuilder还没有开始布局,那Builder就无法收到这次事件,等到界面已经布局完成时就会丢失此次的事件,snapshot拿到还是初始状态,也就是loading,这样就会导致页面显示异常。
对此可能会有疑问,为什么StreamBuilder不能一开始就添加到build里呢?当然可以,但是即便如此也无法保证StreamBuilder的监听比viewModel的状态更新要早,因为如果页面的内容较长,一开始StreamBuilder还不在可视区内,它的initState方法就不会执行,也就不会监听LoadingState.
StreamBuilder会面临这样的困境,归根结底是Stream设计导致的。
Stream的变换和处理
前面在介绍Stream的接口时,我们提到Stream有很多操作方法。接下来着重挑几个名字不好理解的展开讲讲
Future<E> drain<E>([E? futureValue])
drain
意为‘‘排出,消耗’’。这里指‘‘排掉’’这条流里的所有事件,只响应结束信号,当流关闭时返回futureValue
1 2
| final result = await Stream.fromIterable([1, 2, 3]).drain(100); print(result);
|
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine)
事件迭代。根据combine合并流里面的事件,该方法可以返回指定类型的S,同时可以指定初始值initialValue。
1 2 3
| final result = await Stream<int>.fromIterable([2, 4, 6, 8, 10]) .fold<String>("0", (previous, element) => "$previous - $element"); print(result);
|
Future<T> reduce(T Function(T previous, T element) combine)
也是事件迭代。与fold不同的是,reduce无法指定初始值且它只能返回与源流相同的类型T。
1 2 3
| final result = await Stream.fromIterable<int>([2, 6, 10, 8, 2]) .reduce<int>(10, (previous, element) => previous + element); print(result);
|
Future pipe(StreamConsumer<T> streamConsumer)
流管道拼接。将当前流的事件流流向streamConsumer中,streamConsumer的子类实现通常是一个StreamController,拿到事件后通知给它的订阅者。
1 2 3 4 5 6 7 8 9
| var controller = StreamController<int>(); var stream = controller.stream;
stream.listen((event) { print(event); }); var result = await Stream<int>.fromIterable([2, 4, 6, 8, 10]).pipe(controller); print("result: $result");
|
Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert)
异步展开。将原流中的事件做一次展操作,得到一个E类型的新流。
1 2 3 4 5 6 7 8 9
| var stream = Stream<int>.fromIterable([2, 4, 6, 8, 10]); var newStream = stream.asyncExpand((event) { return Stream<int>.fromIterable([event, event + 1]); }); newStream.listen((event){ print(event); });
|
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert)
异步映射。跟asyncExpand类似,只是转换操作返回的是FutureOr对象,为那些转换过程中涉及到异步处理的场景提供便利。
1 2 3 4 5 6 7 8
| var newStream = stream.asyncMap((event) async { await Future.delayed(const Duration(seconds: 1)); return event + 1; });
newStream.listen((event){ print(event); });
|
测试一下
说了这么多,现在来检验一下。假设有一段逻辑,controller会增加三个事件,分别是add(1) add(2) add(3)
,subscription
会在每次收到事件时打印output:$event
, 中间会有一次暂停,3秒后恢复,猜一下以下几种场景下最后输出的属性是什么?
1.同步流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| void main() async { var controller = StreamController<int>(sync: true); var subscription = controller.stream.listen((event) { print('output: $event'); }); controller.add(1); controller.add(2); controller.add(3);
print('暂停'); subscription.pause();
Future.delayed(const Duration(seconds: 3), () { print('3秒后 -> 恢复'); subscription.resume(); }); }
|
2.异步流
保持1中其他代码不变,将sync的值设置为false
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| void main() async { var controller = StreamController<int>(sync: false); var subscription = controller.stream.listen((event) { print('output: $event'); }); controller.add(1); controller.add(2); controller.add(3);
print('暂停'); subscription.pause();
Future.delayed(const Duration(seconds: 3), () { print('3秒后 -> 恢复'); subscription.resume(); }); }
|
3.异步流:使用Future.delayed延迟暂停
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Future.delayed(Duration.zero, (){ print('暂停'); subscription.pause();
Future.delayed(const Duration(seconds: 3), () { print('3秒后 -> 恢复'); subscription.resume(); }); });
|
4. 异步流:使用 scheduleMicrotask 延迟暂停
保持 3 中其它代码不变,用scheduleMicrotask
代替Future.delayed
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| scheduleMicrotask(() { print('暂停'); subscription.pause(); Future.delayed(const Duration(seconds: 3), () { print('3秒后 -> 恢复'); subscription.resume(); }); });
|
上面的输出是否如你所料?相信如果你理解了之前的介绍,对1 2 3点的输出结果是没有问题的。但是对于第 4 点:虽然同样为延迟暂停,3 和 4 中的输出完全不一样,4 中的输出在输出output: 1
后才会触发暂停。这又是为什么呢?要解释这个输出,就要从源码出发了。
结语
所以,我们首先要从概念上理解他们,其次我们还要从代码上知道具体的实现。当程度的执行不及预期,缺乏代码实现层面的理解,我们便会显得手忙脚乱。像前面出现的StreamBuilder处理中的坑和输出顺序的问题,只有阅读底层源码,才能发现原因并准确修复。下一篇文章,将从源码实现上深入分析Stream
。