探索Flutter与RxDart的完美结合

发表时间: 2024-01-18 14:24

RxDart通过在响应式扩展规范的基础上添加功能,扩展了Dart Streams和StreamController的功能。如果你还不熟悉“流”,你可以阅读本系列的前几部分。

向项目添加一个包

软件包链接:
https://pub.dev/packages/rxdart

将包添加到文件pubspec.yaml,然后运行flutter pub get:

rxdart: 0.27.7

RxDart概述

RxDart是一个为Stream类提供大量类和函数的库。

RxDart通过三种方式向Dart Streams添加功能:

  • Stream Classes--创建具有特定功能的流,例如组合或合并多个流。
  • Extension Methods--将源流转换为具有不同功能的新流,例如节流或缓冲事件。
  • Subjects--流控制器与额外的能力。

Stream Classes

在这里,只介绍一些基本的类,然后您可以继续探索其余的类。

通过在流中移动数据(标记为1、2、3、4等),这个网站(https://rxmarbles.com)介绍了各个函数如何工作,尽管它是RxJs,但RxDart的操作符与之相同。

MergeStream

在这里,我们有两个流称为stream1和stream2同时运行,这两个流被合并为一个MergeStream。MergeStream中的事件以与stream1和stream2中的时间轴相同的顺序发出。事实上,你并不局限于两种流。我在这里合并了两条流,所以它和图像相似。

void main() {     var currentTime = DateTime.now();     var mergeStream = MergeStream([firstStream(), secondStream()]);     mergeStream.listen((event) => println(event, currentTime));}// stream 1: the first event is emitted after 1 secondStream<int> firstStream() async* {    await Future.delayed(Duration(seconds: 1));    yield 20;    await Future.delayed(Duration(seconds: 1));    yield 40;    await Future.delayed(Duration(seconds: 2));    yield 60;    await Future.delayed(Duration(seconds: 6));    yield 80;    await Future.delayed(Duration(seconds: 3));    yield 100;}// stream 2 - the first event is emitted after 7 secondsStream<int> secondStream() async* {    await Future.delayed(Duration(seconds: 7));    yield 1;    await Future.delayed(Duration(seconds: 9));    yield 1;}void println(Object value, DateTime currentTime) {    print('Emit $value after ${DateTime.now().difference(currentTime).inSeconds} seconds');}

输出:

Emit 20 after 1 seconds
Emit 40 after 2 seconds
Emit 60 after 4 seconds
Emit 1 after 7 seconds
Emit 80 after 10 seconds
Emit 100 after 13 seconds
Emit 1 after 16 seconds

除了这个方法之外,RxDart实际上还附带了一个静态函数:Rx.merge(),因此您也可以使用它,输出是相同的,试着替换这行代码

var mergeStream = MergeStream([firstStream(), secondStream()]);

为下面这一行代码

var mergeStream = Rx.merge([firstStream(), secondStream()]);

ZipStream

从这个图像中,你可以看出stream1和stream12同时运行,并且这两个流合并为ZipStream。

ZipStream使用给定的zipper函数将指定的流合并为一个流序列,每当所有的流序列在相应的索引处产生一个元素时。

void main() {    ZipStream(        [            Stream.fromIterable(['1', '2', '3', '4', '5']),            Stream.fromIterable(['A', 'B', 'C', 'D']),        ], (values) => values.join(), // zipper function    ).listen(print);}

输出:

1A
2B
3C
4D

此外,如果您确切地知道有多少流需要被压缩,您可以使用ZipStream的其他构造函数,如ZipStream.zip2 , ZipStream.zip3,…,ZipStream.zip9。这里我将压缩3个流:

void main() {    ZipStream.zip3(        Stream.fromIterable([1, 2]),        Stream.fromIterable([3, 4, 5]),        Stream.fromIterable([6, 7, 8]),        (a, b, c) => a + b + c, // sum 3 numbers    ).listen(print);}

输出:

10 // 1 + 3 + 6
13 // 2 + 4 + 7

我建议这样做,而不是上面的方法。通过这个,(a, b, c) => a + b + c将得到3个自变量abc你可以随意使用它们。在上面的方法中,lambda只有一个List类型的变量值,这有点限制。

与merge类似,RxDart也提供了一个静态函数Rx.zip:

Rx.zip(    [        Stream.fromIterable(['1', '2', '3', '4', '5']),        Stream.fromIterable(['A', 'B', 'C', 'D']),    ],     (values) => values.join(), ).listen(print); // prints 1A, 2B, 3C, 4D
Rx.zip2(    Stream.fromIterable(['1', '2', '3', '4', '5']),    Stream.fromIterable(['A', 'B', 'C', 'D']),     (a, b) => a + b).listen(print); // prints 1A, 2B, 3C, 4D

TimerStream

传入一个值和一个持续时间,在持续时间过去后,流将发出该值。例如,如果我们想在3秒后触发一个事件:

void main() {    TimerStream('Hello', Duration(seconds: 3)).listen((i) => print(i));}

类似地,也有静态函数:Rx.timer。

RangeStream

传入两个变量:startInclusive和endInclusive来创建一个范围。RangeStream将在该范围内发出一个连续整数范围。这里我将发出从1到6的整数作为例子。

void main() {    RangeStream(1, 6).listen((i) => print(i));}

输出:

1
2
3
4
5
6

类似地,也有静态函数:Rx.range。

RetryStream

创建一个流,该流将重新创建并重新侦听源流指定的次数,直到流成功终止。如果未指定重试计数,则无限重试。如果满足重试计数,但流没有成功终止,则会触发导致失败的所有错误和StackTraces。

void main() {     RetryStream(        () {            return errorStream();        },       2, // retry 2 times    ).listen(print, onError: print);}Stream<int> errorStream() async* {    yield 10;    throw FormatException('wrong format'); // error    yield 11; // 11 will not be emitted}

输出:

10
10
10
Unhandled exception:
FormatException: wrong format
#0 errorStream
<asynchronous suspension>

类似地,也有静态函数:Rx.retry。

Extension Methods

将源流转换为具有不同功能的新流,例如节流或缓冲事件。

debounceTime

转换一个流,以便只在duration定义的时间跨度过去时从源序列发出项,而不从源序列发出另一个项,此时间范围从最后一次释放事件发出后开始。简而言之,这个函数允许我们传入一个Duration,在本例中是10秒。如果源流发出一个事件,并且在10秒后没有发出另一个事件,则输出流将发出该事件。

void main() {     // If the demoStream emits an event and doesn't emit another after 500ms,      // the Output Stream will emit that event    demoStream().debounceTime(Duration(milliseconds: 500)).listen((event) {          // call API search        print('Search with keyword: $event');    });}// emit events when user typed something in a TextFieldStream<String> demoStream() async* {     yield 'L'; // ignored     yield 'Le'; // ignored    yield 'Lea'; // ignored    yield 'Lear'; // ignored    yield 'Learn'; // ignored    await Future.delayed(Duration(seconds: 1)); // emit 'Learn'    yield 'Learn R'; // ignored    yield 'Learn Rx'; // ignored    yield 'Learn RxD'; // ignored    yield 'Learn RxDa'; // ignored    yield 'Learn RxDar'; // ignored    yield 'Learn RxDart'; // emit 'Learn RxDart'}

输出:

Search with keyword: Learn
Search with keyword: Learn RxDart

throttleTime

Stream<T> throttleTime(Duration duration, {bool trailing = false, bool leading = true})

从源流发出一个值,然后在一段时间内忽略后续的源值,然后重复此过程。

如果leading为true,则发出每个窗口中的第一项。上图描述了这种情况。这个功能在后反应功能(点击喜欢或心形按钮)中很有用,因为应用程序可以防止点击垃圾信息。

void main() {     var currentTime = DateTime.now();     demoStream()         .throttleTime(Duration(seconds: 5), leading: true, trailing: false)         .listen((event) => println(event, currentTime));}Stream<String> demoStream() async* {    yield 'A'; // emit 'A'    await Future.delayed(Duration(seconds: 1));    yield 'B'; // ignored    await Future.delayed(Duration(seconds: 1));    yield 'C'; // ignored    await Future.delayed(Duration(seconds: 5));    yield 'D'; // emit 'D'    await Future.delayed(Duration(seconds: 2));    yield 'E'; // ignored}void println(Object value, DateTime currentTime) {    print('Emit $value after ${DateTime.now().difference(currentTime).inSeconds} seconds');}

输出:

Emit A after 0 seconds
Emit D after 7 seconds

如果trailing为true,则发出最后一项。输出:

Emit C after 5 seconds
Emit E after 12 seconds

onErrorResumeNext

这个函数在流中传递,我们将调用recoveryStream。当Source Streams遇到错误时,它不会发出错误事件,而是会在recoveryStream中发出元素。请注意,在下面的示例中,onError不打印任何内容。

void main() {    errorStream()        .onErrorResumeNext(Stream.fromIterable([             '1 from recoveryStream',             '2 from recoveryStream',             '3 from recoveryStream',      ])).listen(print, onError: (e) => print('Error: $e'));}Stream<String> errorStream() async* {    yield 'a';    yield 'b';    throw FormatException('wrong format');    yield 'c';}

输出:

a
b
1 from recoveryStream
2 from recoveryStream
3 from recoveryStream

interval

创建一个流,该流在给定的持续时间后发出流中的每个项。

Stream.fromIterable([1, 2, 3])     .interval(Duration(seconds: 1))    .listen((i) => print('$i sec'); // prints 1 sec, 2 sec, 3 sec

concatWith

连接所有指定的流序列,只要前一个流序列成功终止。它通过一个接一个地订阅每个流,发出所有项,并在订阅下一个流之前完成。如果提供的流为空,则结果序列立即完成,不发出任何项。

以上图为例,在stream1中发出所有事件后,ConcatStream等待x秒(在示例中)才发出stream2中的第一个事件,而不是立即执行。这就是为什么t1(stream1停止的时间)+ t2(stream2停止的时间)= t3 (ConcatStream停止的时间)的总和。

void main() {     var currentTime = DateTime.now();     var concatStream = firstStream().concatWith([secondStream()]);     concatStream.listen((event) => println(event, currentTime));}// stream 1 takes 13 seconds to emit all eventsStream<int> firstStream() async* {    await Future.delayed(Duration(seconds: 1));    yield 20;    await Future.delayed(Duration(seconds: 1));    yield 40;    await Future.delayed(Duration(seconds: 2));    yield 60;    await Future.delayed(Duration(seconds: 6));    yield 80;    await Future.delayed(Duration(seconds: 3));    yield 100;}// stream 2 takes 16 seconds to emit all eventsStream<int> secondStream() async* {    await Future.delayed(Duration(seconds: 7));    yield 1;    await Future.delayed(Duration(seconds: 9));    yield 1;}void println(Object value, DateTime currentTime) {    print('Emit $value after ${DateTime.now().difference(currentTime).inSeconds} seconds');}

stream1中的所有事件需要13秒,stream2则需要16秒,总共是29秒。输出:

Emit 20 after 1 seconds
Emit 40 after 2 seconds
Emit 60 after 4 seconds
Emit 80 after 10 seconds
Emit 100 after 13 seconds
Emit 1 after 20 seconds
Emit 1 after 29 seconds

distinctUnique

RxDart中的distinctUnique函数不同于Stream类:

  • Dart的distinct用于跳过与前一个元素相等的元素。注意,它只将自己与前一个元素进行比较。
  • RxDart的distinctUnique用于在源流中发出唯一的元素。

下面的代码来说明distinct和distinctUnique:

void main() {     // distinct    Stream.fromIterable([1, 2, 2, 1, 3]).distinct()         .listen(print); // print: 1, 2, 1, 3     // distinctUnique    Stream.fromIterable([1, 2, 2, 1, 3]).distinctUnique()         .listen(print); // print: 1, 2, 3}

Subjects

Subjects本质上是Dart API中的StreamController,具有更多功能。在RxDart中,有3种类型的主题:BehaviorSubject, ReplaySubject和PublishSubject。它们默认处理广播流。让我们更详细地看看它们。

BehaviorSubject

一个特殊的StreamController,它捕获已添加到控制器的最新项,并将其作为第一个项发送到任何新的侦听器。
这个主题允许向侦听器发送数据、错误和完成事件。添加到主题的最新项将发送给该主题的任何新侦听器。之后,任何新事件都将被适当地发送到侦听器。
默认情况下,BehaviorSubject是一个广播控制器。这意味着主题的流可以被多次收听。

void main() {     // create BehaviorSubject    final behaviorSubject = BehaviorSubject();     // the first subscription    behaviorSubject.stream.listen((event) {        println('First subscription: $event');    });    // the second subscription subsribes the above stream after 7 seconds    Future.delayed(const Duration(seconds: 7), () {        println('Create second subscription');        behaviorSubject.stream.listen((event) {            println('Second subscription: $event');        });    });    // Push events every 2 seconds    behaviorSubject.sink        .addStream(RangeStream(0, 4).interval(const Duration(seconds: 2)));}void println(Object value) {    print('${DateTime.now()}: $value');}

输出:

2023-09-09 16:19:17.224566: First subscription: 0
2023-09-09 16:19:19.225456: First subscription: 1
2023-09-09 16:19:21.227959: First subscription: 2
2023-09-09 16:19:22.213850: Create second subscription
2023-09-09 16:19:22.220363: Second subscription: 2
2023-09-09 16:19:23.229872: First subscription: 3
2023-09-09 16:19:23.230100: Second subscription: 3
2023-09-09 16:19:25.231686: First subscription: 4
2023-09-09 16:19:25.231726: Second subscription: 4

输出告诉我们,在16:19:22创建第二个订阅时,它立即收到了最新的项目(数据号为2)。之后,所有新事件都被适当地发送到两个侦听器。

此外,BehaviorSubject还有另一个构造函数BehaviorSubject. seeds()。当使用构造函数时,可以提供一个种子值,如果没有向主题添加任何项,则该种子值将被触发。

void main() {    // create BehaviorSubject    final behaviorSubject = BehaviorSubject.seeded(-1);    // the first subscription    behaviorSubject.stream.listen((event) {        println('First subscription: $event');    });    // the second subscription subsribes the above stream after 7 seconds    Future.delayed(const Duration(seconds: 7), () {        println('Create second subscription');        behaviorSubject.stream.listen((event) {            println('Second subscription: $event');        });    });    // Push events every 2 seconds    behaviorSubject.sink        .addStream(RangeStream(0, 4).interval(const Duration(seconds: 2)));}void println(Object value) {    print('${DateTime.now()}: $value');}

输出:

2023-09-09 16:19:15.223456: First subscription: -1

2023-09-09 16:19:17.224566: First subscription: 0
2023-09-09 16:19:19.225456: First subscription: 1
2023-09-09 16:19:21.227959: First subscription: 2
2023-09-09 16:19:22.213850: Create second subscription
2023-09-09 16:19:22.220363: Second subscription: 2
2023-09-09 16:19:23.229872: First subscription: 3
2023-09-09 16:19:23.230100: Second subscription: 3
2023-09-09 16:19:25.231686: First subscription: 4
2023-09-09 16:19:25.231726: Second subscription: 4

ReplaySubject

这也是一个广播流控制器。这与BehaviorSubject的不同之处在于,它捕获已添加到控制器的所有项,并将它们作为第一个项发送给任何新的侦听器。
这个主题允许向侦听器发送数据、错误和完成事件。当项目被添加到主题时,ReplaySubject将存储它们。当流被监听时,那些记录的项将被发送到监听器。之后,所有新事件都被适当地发送到两个侦听器。

void main() {     // create ReplaySubject    final behaviorSubject = ReplaySubject();     // the first subscription    behaviorSubject.stream.listen((event) {        println('First subscription: $event');    });     // the second subscription subsribes the above stream after 7 seconds    Future.delayed(const Duration(seconds: 7), () {        println('Create second subscription');        behaviorSubject.stream.listen((event) {            println('Second subscription: $event');        });    });     // Push events every 2 seconds    behaviorSubject.sink        .addStream(RangeStream(0, 4).interval(const Duration(seconds: 2)));}void println(Object value) {    print('${DateTime.now()}: $value');}

输出:

2023-09-09 16:36:38.206317: First subscription: 0
2023-09-09 16:36:40.206706: First subscription: 1
2023-09-09 16:36:42.209323: First subscription: 2
2023-09-09 16:36:43.193860: Create second subscription
2023-09-09 16:36:43.200865: Second subscription: 0
2023-09-09 16:36:43.201115: Second subscription: 1
2023-09-09 16:36:43.201170: Second subscription: 2
2023-09-09 16:36:44.211727: First subscription: 3
2023-09-09 16:36:44.211899: Second subscription: 3
2023-09-09 16:36:46.215944: First subscription: 4
2023-09-09 16:36:46.216120: Second subscription: 4

在“创建第二个订阅”时,它检索从0到2的所有数据。之后,所有新事件都被适当地发送到两个侦听器。

可以通过设置maxSize值来限制存储事件的数量。

void main() {    // create ReplaySubject    final behaviorSubject = ReplaySubject(maxSize: 1);    // the first subscription    behaviorSubject.stream.listen((event) {        println('First subscription: $event');    });    // the second subscription subsribes the above stream after 7 seconds    Future.delayed(const Duration(seconds: 7), () {        println('Create second subscription');        behaviorSubject.stream.listen((event) {            println('Second subscription: $event');        });    });    // Push events every 2 seconds    behaviorSubject.sink        .addStream(RangeStream(0, 4).interval(const Duration(seconds: 2)));}void println(Object value) {    print('${DateTime.now()}: $value');}

输出:

2023-09-09 16:40:32.442972: First subscription: 0
2023-09-09 16:40:34.445597: First subscription: 1
2023-09-09 16:40:36.447013: First subscription: 2
2023-09-09 16:40:37.433808: Create second subscription
2023-09-09 16:40:37.441369: Second subscription: 2
2023-09-09 16:40:38.449134: First subscription: 3
2023-09-09 16:40:38.449477: Second subscription: 3
2023-09-09 16:40:40.451214: First subscription: 4
2023-09-09 16:40:40.451255: Second subscription: 4

在“创建第二个订阅”时,它只检索最新的项(数据号2)。

PublishSubject

这实际上非常类似于广播流控制器。可以使用PublishSubject(var streamController = PublishSubject();),你可以用Broadcast streamController (var streamController = streamController . Broadcast();)来做这件事,两者都给出相同的输出:

void main() {     // create PublishSubject    final behaviorSubject = PublishSubject();     // the first subscription    behaviorSubject.stream.listen((event) {        println('First subscription: $event');    });    // the second subscription subsribes the above stream after 7 seconds    Future.delayed(const Duration(seconds: 7), () {        println('Create second subscription');        behaviorSubject.stream.listen((event) {            println('Second subscription: $event');       });    });    // Push events every 2 seconds    behaviorSubject.sink        .addStream(RangeStream(0, 4).interval(const Duration(seconds: 2)));}void println(Object value) {    print('${DateTime.now()}: $value');}

输出:

2023-09-10 08:56:06.593458: First subscription: 0
2023-09-10 08:56:08.593156: First subscription: 1
2023-09-10 08:56:10.595600: First subscription: 2
2023-09-10 08:56:11.579242: Create second subscription
2023-09-10 08:56:12.598082: First subscription: 3
2023-09-10 08:56:12.598404: Second subscription: 3
2023-09-10 08:56:14.602879: First subscription: 4
2023-09-10 08:56:14.603005: Second subscription: 4

CompositeSubscription

compositessubscription类似于多个订阅的容器,这使得一次取消所有订阅变得更加容易。

void main() async {     // create 3 subscriptions     var subscription1 = testStream().listen(print);     var subscription2 = testStream().listen(print);     var subscription3 = testStream().listen(print);     // create a compositeSubscription     var compositeSubscription = CompositeSubscription();     // add 3 subscriptions to the compositeSubscription    compositeSubscription.add(subscription1);    compositeSubscription.add(subscription2);    compositeSubscription.add(subscription3);     // Cancel all 3 subscriptions after 2 seconds    await Future.delayed(const Duration(seconds: 2));    compositeSubscription.clear();     // we can use the dispose method to cancel all 3 subscriptions too    compositeSubscription.dispose();}Stream<int> testStream() async* {     yield 10;     await Future.delayed(const Duration(seconds: 2));     yield 11;}

输出:

10
10
10

clear()函数类似于dispose(),因为它们都取消所有订阅。然而,通过使用clear,我们可以重用复合订阅,而dispose将不再允许我们重用它。

小节

已经介绍了RxDart的一些功能强大的类。你可以在这里(
https://pub.dev/packages/rxdart#stream-classes)了解更多关于其他课程的信息。