Flutter-Stream学习(二)

在上一篇文章中我们大概了解关于Stream的一些概念和原理,并通过代码简洁说明了Stream的流程。本篇文章进一步学习Stream的其他内容同时用代码加深记忆。

Stream开发实战

通过上一篇文章对接口和分类的分析,已经对Stream有了进一步的认识,接下来通过例子来检验一下。

利用Stream实现事件的广播

事件的广播,在开发中总会遇到,尤其是在跨组件或跨页面的场景,相信大部分的开发在项目中都会引入EventBus的三方或者自研框架。例如:当我编辑资料后,保存完成通知其他页面进行刷新以展示最新的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// user entity
class UserInfo {
int uid;
String name;

UserInfo(this.uid, this.name);
}

// userinfo update
class UserInfoChangeEvent {

static final _controller = StreamController<UserInfo>.broadcast();

static StreamSubscription<UserInfo> subscribe(Function(UserInfo) callback) {
return _controller.stream.listen(callback);
}

static void broadcast(UserInfo userInfo) {
_controller.add(userInfo);
}
}

// 用户编辑页面
class UserProfileViewModel {
...
// 点击 save 时,会调用到 broadcast 方法向外发送事件
void onSave(int uid, String name) {
UserInfoChangeEvent.broadcast(UserInfo(uid, name));
}

}

// 其它页面状态刷新
class ViewState extends State<ViewWidget> {

StreamSubscription<UserInfo>? _subscription;
UserInfo? _curUserInfo;

@override
void initState() {
super.initState();
// 初始化时,监听 UserInfoChangeEvent
_subscription = UserInfoChangeEvent.subscribe((userInfo) {
setState(() {
_curUserInfo = userInfo;
})
});
}

@override
void dispose() {
super.dispose();
// 退出时,要取消监听。否则会有内存泄漏
_subscription?.cancel();
_subscription = null;
}
}

上面的代码中,UserInfoChangeEvent定义了广播类型的StreamController,并且向外暴露了subscribebroadcast接口,用户编辑页面后点击保存时执行onSave方法,方法中的UserInfoChangeEvent调用broadcast方法向外部发送数据变化的事件;然后ViewState在初始化是注册了监听UserInfoChangeEventsubscriptionsubscription接收到事件并获取更新后的userInfo赋值给_curUserInfo,同时刷新页面。

StreamBuilder实现Widget自动刷新

Flutter提供了一个组件StreamBuilder,可以帮助我们方便的监听Stream并刷新Widget。例如进入一个页面时,通常会有一个数据加载的过程,此时页面会从Loading状态到Loaded/LoadError的状态变更,不同的状态呈现不同的页面,这时我们就需要定义一个LoadingState的枚举类型,在数据加载后通过StreamController发布LoadState状态,StreamBuilder监听到更新会自动触发Widget的刷新。

AsyncSnapShot是快照的意思,保存此刻最新的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
enum LoadingState {
loading,
error,
success,
}
....
StreamBuilder<LoadingState>(
stream: viewModel.loadingStateStrem,
initialData: LoadingState.loading,
builder: (context, snapshot) {
//根据snapShot的数据返回处理
var data = snapshot.data;
if (data == LoadingState.loading) {
return const Center(child: CircularProgressIndicator());
} else if (data == LoadingState.error) {
return const Center(child: Text('加载失败'));
} else if (data == LoadingState.success) {
return TextWidget(text);
}
},
)

不过StreamBuilder也有坑,对于Stream来说,事件被消费了就会丢掉,无论是StreamController还是Stream都不会保存上次的值,以页面加载为例页面进来后viewModel执行数据加载完成后,想loadingStateStream里发布了Loaded的状态,如果此时StreamBuilder还没有开始布局,那Builder就无法收到这次事件,等到界面已经布局完成时就会丢失此次的事件,snapshot拿到还是初始状态,也就是loading,这样就会导致页面显示异常。

对此可能会有疑问,为什么StreamBuilder不能一开始就添加到build里呢?当然可以,但是即便如此也无法保证StreamBuilder的监听比viewModel的状态更新要早,因为如果页面的内容较长,一开始StreamBuilder还不在可视区内,它的initState方法就不会执行,也就不会监听LoadingState.

StreamBuilder会面临这样的困境,归根结底是Stream设计导致的。

Stream的变换和处理

前面在介绍Stream的接口时,我们提到Stream有很多操作方法。接下来着重挑几个名字不好理解的展开讲讲

Future<E> drain<E>([E? futureValue])

drain意为‘‘排出,消耗’’。这里指‘‘排掉’’这条流里的所有事件,只响应结束信号,当流关闭时返回futureValue

1
2
final result = await Stream.fromIterable([1, 2, 3]).drain(100);
print(result); // Outputs: 100.

Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine)

事件迭代。根据combine合并流里面的事件,该方法可以返回指定类型的S,同时可以指定初始值initialValue。

1
2
3
final result = await Stream<int>.fromIterable([2, 4, 6, 8, 10])
.fold<String>("0", (previous, element) => "$previous - $element");
print(result); // 0 - 2 - 4 - 6 - 8 - 10

Future<T> reduce(T Function(T previous, T element) combine)

也是事件迭代。与fold不同的是,reduce无法指定初始值且它只能返回与源流相同的类型T。

1
2
3
final result = await Stream.fromIterable<int>([2, 6, 10, 8, 2])
.reduce<int>(10, (previous, element) => previous + element);
print(result); // 38

Future pipe(StreamConsumer<T> streamConsumer)

流管道拼接。将当前流的事件流流向streamConsumer中,streamConsumer的子类实现通常是一个StreamController,拿到事件后通知给它的订阅者。

1
2
3
4
5
6
7
8
9
var controller = StreamController<int>();  
var stream = controller.stream;

stream.listen((event) {
print(event); // 2 4 6 8 10
});

var result = await Stream<int>.fromIterable([2, 4, 6, 8, 10]).pipe(controller);
print("result: $result"); // null

Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert)

异步展开。将原流中的事件做一次展操作,得到一个E类型的新流。

1
2
3
4
5
6
7
8
9
var stream = Stream<int>.fromIterable([2, 4, 6, 8, 10]);

var newStream = stream.asyncExpand((event) {
return Stream<int>.fromIterable([event, event + 1]);
});

newStream.listen((event){
print(event); // 2 3 4 5 6 7 8 9 10
});

Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert)

异步映射。跟asyncExpand类似,只是转换操作返回的是FutureOr对象,为那些转换过程中涉及到异步处理的场景提供便利。

1
2
3
4
5
6
7
8
var newStream = stream.asyncMap((event) async {
await Future.delayed(const Duration(seconds: 1));
return event + 1;
});

newStream.listen((event){
print(event); // 3 5 7 9 11
});

测试一下

说了这么多,现在来检验一下。假设有一段逻辑,controller会增加三个事件,分别是add(1) add(2) add(3)subscription会在每次收到事件时打印output:$event, 中间会有一次暂停,3秒后恢复,猜一下以下几种场景下最后输出的属性是什么?

1.同步流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void main() async {
// 同步流:sync 为 true
var controller = StreamController<int>(sync: true);
var subscription = controller.stream.listen((event) {
print('output: $event');
});

controller.add(1);
controller.add(2);
controller.add(3);

print('暂停');
subscription.pause();

Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 恢复');
subscription.resume();
});
}

// will print:
// output: 1
// output: 2
// output: 3
// 暂停
// 3秒后 -> 恢复

2.异步流

保持1中其他代码不变,将sync的值设置为false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void main() async {
var controller = StreamController<int>(sync: false);
var subscription = controller.stream.listen((event) {
print('output: $event');
});

controller.add(1);
controller.add(2);
controller.add(3);

print('暂停');
subscription.pause();

Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 恢复');
subscription.resume();
});
}

// will print:
// 暂停
// 3秒后 -> 恢复
// output: 1
// output: 2
// output: 3

3.异步流:使用Future.delayed延迟暂停

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Future.delayed(Duration.zero, (){
print('暂停');
subscription.pause();

Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 恢复');
subscription.resume();
});
});

// will print
// output: 1
// output: 2
// output: 3
// 暂停
// 3秒后 -> 恢复

4. 异步流:使用 scheduleMicrotask 延迟暂停

保持 3 中其它代码不变,用scheduleMicrotask代替Future.delayed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  scheduleMicrotask(() {
print('暂停');
subscription.pause();
Future.delayed(const Duration(seconds: 3), () {
print('3秒后 -> 恢复');
subscription.resume();
});
});

// will print
// output: 1
// 暂停
// 3秒后 -> 恢复
// output: 2
// output: 3

上面的输出是否如你所料?相信如果你理解了之前的介绍,对1 2 3点的输出结果是没有问题的。但是对于第 4 点:虽然同样为延迟暂停,3 和 4 中的输出完全不一样,4 中的输出在输出output: 1后才会触发暂停。这又是为什么呢?要解释这个输出,就要从源码出发了。

结语

所以,我们首先要从概念上理解他们,其次我们还要从代码上知道具体的实现。当程度的执行不及预期,缺乏代码实现层面的理解,我们便会显得手忙脚乱。像前面出现的StreamBuilder处理中的坑输出顺序的问题,只有阅读底层源码,才能发现原因并准确修复。下一篇文章,将从源码实现上深入分析Stream


Flutter-Stream学习(二)
https://ilittle.fun/2024/06/24/Flutter-Stream学习(二)/
作者
Leelt
发布于
2024年6月24日
许可协议