diff --git a/CHANGELOG.md b/CHANGELOG.md index ccda9d8..e8a452d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ ## 2.12.0-wip - Require Dart 2.19 +- Add `Notifier`, allowing micro-tasks to wait for `Notifier.wait`, + which is completed the next time `Notifier.notify()` is called. +- Added `StreamExtensions.parallelForEach(N, each)` to enable concurrently + running a micro-task for each item in a stream, while never running more + than `N` micro-tasks concurrently. ## 2.11.0 diff --git a/lib/async.dart b/lib/async.dart index 1d7b797..1b40fcc 100644 --- a/lib/async.dart +++ b/lib/async.dart @@ -22,6 +22,7 @@ export 'src/delegate/stream_sink.dart'; export 'src/delegate/stream_subscription.dart'; export 'src/future_group.dart'; export 'src/lazy_stream.dart'; +export 'src/notifier.dart'; export 'src/null_stream_sink.dart'; export 'src/restartable_timer.dart'; export 'src/result/error.dart'; diff --git a/lib/src/notifier.dart b/lib/src/notifier.dart new file mode 100644 index 0000000..1aad9be --- /dev/null +++ b/lib/src/notifier.dart @@ -0,0 +1,67 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:meta/meta.dart'; + +/// A [Notifier] allows micro-tasks to [wait] for other micro-tasks to +/// [notify]. +/// +/// [Notifier] is a concurrency primitive that allows one micro-task to +/// wait for notification from another micro-task. The [Future] return from +/// [wait] will be completed the next time [notify] is called. +/// +/// ```dart +/// var weather = 'rain'; +/// final notifier = Notifier(); +/// +/// // Create a micro task to fetch the weather +/// scheduleMicrotask(() async { +/// // Infinitely loop that just keeps the weather up-to-date +/// while (true) { +/// weather = await getWeather(); +/// notifier.notify(); +/// +/// // Sleep 5s before updating the weather again +/// await Future.delayed(Duration(seconds: 5)); +/// } +/// }); +/// +/// // Wait for sunny weather +/// while (weather != 'sunny') { +/// await notifier.wait; +/// } +/// ``` +// TODO: Apply `final` when language version for this library is bumped to 3.0 +@sealed +class Notifier { + var _completer = Completer(); + + /// Notify everybody waiting for notification. + /// + /// This will complete all futures previously returned by [wait]. + /// Calls to [wait] after this call, will not be resolved, until the + /// next time [notify] is called. + void notify() { + if (!_completer.isCompleted) { + _completer.complete(); + } + } + + /// Wait for notification. + /// + /// Returns a [Future] that will complete the next time [notify] is called. + /// + /// The [Future] returned will always be unresolved, and it will never throw. + /// Once [notify] is called the future will be completed, and any new calls + /// to [wait] will return a new future. This new future will also be + /// unresolved, until [notify] is called. + Future get wait { + if (_completer.isCompleted) { + _completer = Completer(); + } + return _completer.future; + } +} diff --git a/lib/src/stream_extensions.dart b/lib/src/stream_extensions.dart index 4ba9254..30593bb 100644 --- a/lib/src/stream_extensions.dart +++ b/lib/src/stream_extensions.dart @@ -4,6 +4,8 @@ import 'dart:async'; +import 'notifier.dart'; + /// Utility extensions on [Stream]. extension StreamExtensions on Stream { /// Creates a stream whose elements are contiguous slices of `this`. @@ -78,4 +80,90 @@ extension StreamExtensions on Stream { ..onCancel = subscription.cancel; return controller.stream; } + + /// Call [each] for each item in this stream with [maxParallel] invocations. + /// + /// This method will invoke [each] for each item in this stream, and wait for + /// all futures from [each] to be resolved. [parallelForEach] will call [each] + /// in parallel, but never more then [maxParallel]. + /// + /// If [each] throws and [onError] rethrows (default behavior), then + /// [parallelForEach] will wait for ongoing [each] invocations to finish, + /// before throw the first error. + /// + /// If [onError] does not throw, then iteration will not be interrupted and + /// errors from [each] will be ignored. + /// + /// ```dart + /// // Count size of all files in the current folder + /// var folderSize = 0; + /// // Use parallelForEach to read at-most 5 files at the same time. + /// await Directory.current.list().parallelForEach(5, (item) async { + /// if (item is File) { + /// final bytes = await item.readAsBytes(); + /// folderSize += bytes.length; + /// } + /// }); + /// print('Folder size: $folderSize'); + /// ``` + Future parallelForEach( + int maxParallel, + FutureOr Function(T item) each, { + FutureOr Function(Object e, StackTrace? st) onError = Future.error, + }) async { + // Track the first error, so we rethrow when we're done. + Object? firstError; + StackTrace? firstStackTrace; + + // Track number of running items. + var running = 0; + final itemDone = Notifier(); + + try { + var doBreak = false; + await for (final item in this) { + // For each item we increment [running] and call [each] + running += 1; + unawaited(() async { + try { + await each(item); + } catch (e, st) { + try { + // If [onError] doesn't throw, we'll just continue. + await onError(e, st); + } catch (e, st) { + doBreak = true; + if (firstError == null) { + firstError = e; + firstStackTrace = st; + } + } + } finally { + // When [each] is done, we decrement [running] and notify + running -= 1; + itemDone.notify(); + } + }()); + + if (running >= maxParallel) { + await itemDone.wait; + } + if (doBreak) { + break; + } + } + } finally { + // Wait for all items to be finished + while (running > 0) { + await itemDone.wait; + } + } + + // If an error happened, then we rethrow the first one. + final firstError_ = firstError; + final firstStackTrace_ = firstStackTrace; + if (firstError_ != null && firstStackTrace_ != null) { + Error.throwWithStackTrace(firstError_, firstStackTrace_); + } + } } diff --git a/test/notifier_test.dart b/test/notifier_test.dart new file mode 100644 index 0000000..60c378d --- /dev/null +++ b/test/notifier_test.dart @@ -0,0 +1,53 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:async/async.dart'; +import 'package:test/test.dart'; + +void main() { + test('Notifier.wait/notify', () async { + final notified = Completer(); + + final notifier = Notifier(); + notifier.wait.then((value) => notified.complete()); + expect(notified.isCompleted, isFalse); + + notifier.notify(); + expect(notified.isCompleted, isFalse); + + await notified.future; + expect(notified.isCompleted, isTrue); + }); + + test('Notifier.wait is never resolved', () async { + var count = 0; + + final notifier = Notifier(); + notifier.wait.then((value) => count++); + expect(count, 0); + + await Future.delayed(Duration.zero); + expect(count, 0); + + notifier.notify(); + expect(count, 0); + + await Future.delayed(Duration.zero); + expect(count, 1); + + notifier.wait.then((value) => count++); + notifier.wait.then((value) => count++); + + await Future.delayed(Duration.zero); + expect(count, 1); + + notifier.notify(); + expect(count, 1); + + await Future.delayed(Duration.zero); + expect(count, 3); + }); +} diff --git a/test/stream_extensions_test.dart b/test/stream_extensions_test.dart index c922923..c32fa97 100644 --- a/test/stream_extensions_test.dart +++ b/test/stream_extensions_test.dart @@ -3,6 +3,7 @@ // BSD-style license that can be found in the LICENSE filevents. import 'dart:async'; +import 'dart:math'; import 'package:async/async.dart'; import 'package:test/test.dart'; @@ -182,4 +183,119 @@ void main() { expect(cancelCompleted, isTrue); }); }); + + group('parallelForEach', () { + test('sum (maxParallel: 1)', () async { + var sum = 0; + await Stream.fromIterable([1, 2, 3]).parallelForEach(1, (item) { + sum += item; + }); + expect(sum, 6); + }); + + test('sum (maxParallel: 2)', () async { + var sum = 0; + var active = 0; + var maxActive = 0; + await Stream.fromIterable([1, 2, 3]).parallelForEach(2, (item) async { + active++; + expect(active, lessThanOrEqualTo(2)); + maxActive = max(active, maxActive); + await Future.delayed(Duration(milliseconds: 50)); + expect(active, lessThanOrEqualTo(2)); + maxActive = max(active, maxActive); + sum += item; + active--; + }); + expect(sum, 6); + expect(maxActive, 2); + }); + + test('abort when error is thrown (maxParallel: 1)', () async { + var sum = 0; + await expectLater( + Stream.fromIterable([1, 2, 3]).parallelForEach(1, (item) async { + sum += item; + if (sum > 2) { + throw Exception('abort'); + } + }), + throwsException, + ); + expect(sum, 3); + }); + + test('abort will not comsume the entire stream', () async { + var countedTo = 0; + Stream countToN(int N) async* { + for (var i = 1; i <= N; i++) { + await Future.delayed(Duration.zero); + yield i; + countedTo = i; + } + } + + var sum = 0; + await countToN(20).parallelForEach(2, (item) async { + sum += item; + }); + expect(sum, greaterThan(20)); + expect(countedTo, 20); + + countedTo = 0; + await expectLater( + countToN(20).parallelForEach(2, (item) async { + if (item > 10) throw Exception('abort'); + }), + throwsException, + ); + expect(countedTo, greaterThanOrEqualTo(10)); + expect(countedTo, lessThan(20)); + }); + + test('onError can ignore errors', () async { + var countedTo = 0; + Stream countToN(int N) async* { + for (var i = 1; i <= N; i++) { + await Future.delayed(Duration.zero); + yield i; + countedTo = i; + } + } + + var sum = 0; + await countToN(20).parallelForEach(2, (item) async { + sum += item; + if (sum > 10) { + throw Exception('ignore this'); + } + }, onError: (_, __) => null); + expect(sum, greaterThan(20)); + expect(countedTo, 20); + + countedTo = 0; + await expectLater( + countToN(20).parallelForEach( + 2, + (item) async { + sum += item; + if (countedTo > 15) { + throw Exception('break'); + } + if (countedTo > 10) { + throw Exception('ignore this'); + } + }, + onError: (e, st) { + if (e.toString().contains('break')) { + throw e; + } + }, + ), + throwsException, + ); + expect(countedTo, greaterThanOrEqualTo(10)); + expect(countedTo, lessThan(20)); + }); + }); }