Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Added Condition and boundedForEach #249

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
## 2.12.0-wip

- Require Dart 2.19
- Add `Notifier`, allowing micro-tasks to wait for `Notifier.wait`,
which is completed the next time `Notifier.notify()` is called.
- Added `StreamExtensions.parallelForEach(N, each)` to enable concurrently
running a micro-task for each item in a stream, while never running more
than `N` micro-tasks concurrently.

## 2.11.0

Expand Down
1 change: 1 addition & 0 deletions lib/async.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export 'src/delegate/stream_sink.dart';
export 'src/delegate/stream_subscription.dart';
export 'src/future_group.dart';
export 'src/lazy_stream.dart';
export 'src/notifier.dart';
export 'src/null_stream_sink.dart';
export 'src/restartable_timer.dart';
export 'src/result/error.dart';
Expand Down
67 changes: 67 additions & 0 deletions lib/src/notifier.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:meta/meta.dart';

/// A [Notifier] allows micro-tasks to [wait] for other micro-tasks to
/// [notify].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't mention the microtask!

(Also, makes no sense. If a microtask is waiting for something, it's no longer the same microtask.)

So:

/// An asynchronous synchonization primitive.
///
/// Any number of asynchronous computations can
/// wait on the [wait] future, which will be
/// completed when someone calls [notify()].
/// After that, further reads of [wait] will provide
/// a new future which is then completed by the next
/// following call to [notify()].

///
/// [Notifier] is a concurrency primitive that allows one micro-task to
/// wait for notification from another micro-task. The [Future] return from
/// [wait] will be completed the next time [notify] is called.
///
/// ```dart
/// var weather = 'rain';
/// final notifier = Notifier();
///
/// // Create a micro task to fetch the weather
/// scheduleMicrotask(() async {
/// // Infinitely loop that just keeps the weather up-to-date
/// while (true) {
/// weather = await getWeather();
/// notifier.notify();
///
/// // Sleep 5s before updating the weather again
/// await Future.delayed(Duration(seconds: 5));
/// }
/// });
///
/// // Wait for sunny weather
/// while (weather != 'sunny') {
/// await notifier.wait;
/// }
/// ```
// TODO: Apply `final` when language version for this library is bumped to 3.0
@sealed
class Notifier {
var _completer = Completer<void>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make this nullable, setting it to non-null on the first wait, and setting it back to null on notify.

That way, if the first call is notify, you're not wasting a completer.


/// Notify everybody waiting for notification.
///
/// This will complete all futures previously returned by [wait].
/// Calls to [wait] after this call, will not be resolved, until the
/// next time [notify] is called.
void notify() {
if (!_completer.isCompleted) {
_completer.complete();
}
}

/// Wait for notification.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noun phrase for getters.

Maybe call it notification, since wait is also a verb.
But long, wait is short.
Maybe next

var tick = Notifier();
....
   tick.notify();
   ...
do {
  await tick.next;
} while (result != "success");
``  

///
/// Returns a [Future] that will complete the next time [notify] is called.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use "Returns" about properties.

/// The `wait` [Future] is completed the next time [notify] is called.

///
/// The [Future] returned will always be unresolved, and it will never throw.
/// Once [notify] is called the future will be completed, and any new calls
/// to [wait] will return a new future. This new future will also be
/// unresolved, until [notify] is called.
Future<void> get wait {
if (_completer.isCompleted) {
_completer = Completer();
}
return _completer.future;
}
}
88 changes: 88 additions & 0 deletions lib/src/stream_extensions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import 'dart:async';

import 'notifier.dart';

/// Utility extensions on [Stream].
extension StreamExtensions<T> on Stream<T> {
/// Creates a stream whose elements are contiguous slices of `this`.
Expand Down Expand Up @@ -78,4 +80,90 @@ extension StreamExtensions<T> on Stream<T> {
..onCancel = subscription.cancel;
return controller.stream;
}

/// Call [each] for each item in this stream with [maxParallel] invocations.
///
/// This method will invoke [each] for each item in this stream, and wait for
/// all futures from [each] to be resolved. [parallelForEach] will call [each]
/// in parallel, but never more then [maxParallel].
///
/// If [each] throws and [onError] rethrows (default behavior), then
/// [parallelForEach] will wait for ongoing [each] invocations to finish,
/// before throw the first error.
///
/// If [onError] does not throw, then iteration will not be interrupted and
/// errors from [each] will be ignored.
///
/// ```dart
/// // Count size of all files in the current folder
/// var folderSize = 0;
/// // Use parallelForEach to read at-most 5 files at the same time.
/// await Directory.current.list().parallelForEach(5, (item) async {
/// if (item is File) {
/// final bytes = await item.readAsBytes();
/// folderSize += bytes.length;
/// }
/// });
/// print('Folder size: $folderSize');
/// ```
Future<void> parallelForEach(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could return a Stream<void>, if you want to know how many operations have completed, and see all their errors.

Or make it:

Stream<R> parallelMap<R>(int maxParallel, FutureOr<R> mapData(T), {FutureOr<R> Function(Object error, StackTrace stack)? mapError});

which just calls mapData on each element and mapError on each error (or forwards it unchanged if omitted), and does not guarantee preserving order.

int maxParallel,
FutureOr<void> Function(T item) each, {
FutureOr<void> Function(Object e, StackTrace? st) onError = Future.error,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to:

FutureOr<bool> Function(Exception e) continueIf

On the assumption that:

  • You should never ignore an Error (I could also be convince to be less opinionated).
  • You should never consider the stack trace when deciding to continue.

But I could use help choosing a better name? breakIf or breakOnError or ignoreErrorIf or filterError or errorIf?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not open to special-casing Exception. Especially not the raw Exception type.

If the user knows that each can throw a specific subtype of Exception, then each should be made to handle that, rather than failing and having a second function handle it (but not break iteration?).

I'd just not have this parameter at all.
And I'd make the first each to throw cancel the entire operation (probably still need to wait for the rest of the pending futures to complete, but I'd cancel the stream subscription.

}) async {
// Track the first error, so we rethrow when we're done.
Object? firstError;
StackTrace? firstStackTrace;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd store this as (Object, StackTrace)? now, so we won't need to do if (firstError != null) ... (firstError, firstStackTrace!).


// Track number of running items.
var running = 0;
final itemDone = Notifier();

try {
var doBreak = false;
await for (final item in this) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This throws if the stream contains an error. User might expect that their onError would handle it.
(If anything, I'd expect the onError argument to apply to these errors, not errors created by each. The callbacks are provide at the same level, so one of them should not apply to the result of the other.)

// For each item we increment [running] and call [each]
running += 1;
unawaited(() async {
try {
await each(item);
} catch (e, st) {
try {
// If [onError] doesn't throw, we'll just continue.
await onError(e, st);
} catch (e, st) {
doBreak = true;
if (firstError == null) {
firstError = e;
firstStackTrace = st;
}
}
} finally {
// When [each] is done, we decrement [running] and notify
running -= 1;
itemDone.notify();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't really need notify for this.

Future<void> parallelForEach(
    int maxParallel,
    FutureOr<void> Function(T item) each) {
  var pending = 0;
  var done = false;
  var result = Completer<void>.sync();
  (Object, StackTrace)? firstError;
  void complete() {
    assert(pending == 0);
    assert(done);
    if (firstError case (var error, var stack)) {
      result.completeError(error, stack);
    } else {
      result.complete(null);
    }
 }
  var subscription = stream.listen(null, onDone: () {
    done = true;
    if (pending == 0) {
      complete();
    }
  });
  subscription
    ..onError((Object error, StackTrace stack) {
      subscription.cancel().ignore();
      done = true;
      firstError = (error, stack); // Takes precedence over user errors.
      if (pending == 0) complete();
    })
    ..onData((T value) {
        try {
          var computation = each(value);
          if (computation is Future) {
            if (++pending >= maxParallel) subscription.pause();
            computation.then((_) {
              if (--pending == 0 && done) complete();
              subscription.resume();
            }, onError: (Object error, StackTrace stack) {
              subscription.cancel().ignore();
              done = true;
              firstError ??= (error, stack);
              if (--pending == 0) complete();
           });
        } catch (error, stack) {
          subscription.cancel().ignore();
          done = true;
          firstError ??= (error, stack);
          if (pending == 0) complete();
        }
    });
  return result.future;
}

}
}());

if (running >= maxParallel) {
await itemDone.wait;
}
if (doBreak) {
break;
}
}
} finally {
// Wait for all items to be finished
while (running > 0) {
await itemDone.wait;
}
}

// If an error happened, then we rethrow the first one.
final firstError_ = firstError;
final firstStackTrace_ = firstStackTrace;
if (firstError_ != null && firstStackTrace_ != null) {
Error.throwWithStackTrace(firstError_, firstStackTrace_);
}
}
jonasfj marked this conversation as resolved.
Show resolved Hide resolved
}
53 changes: 53 additions & 0 deletions test/notifier_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:async/async.dart';
import 'package:test/test.dart';

void main() {
test('Notifier.wait/notify', () async {
final notified = Completer<void>();

final notifier = Notifier();
notifier.wait.then((value) => notified.complete());
expect(notified.isCompleted, isFalse);

notifier.notify();
expect(notified.isCompleted, isFalse);

await notified.future;
expect(notified.isCompleted, isTrue);
});

test('Notifier.wait is never resolved', () async {
var count = 0;

final notifier = Notifier();
notifier.wait.then((value) => count++);
expect(count, 0);

await Future.delayed(Duration.zero);
expect(count, 0);

notifier.notify();
expect(count, 0);

await Future.delayed(Duration.zero);
expect(count, 1);

notifier.wait.then((value) => count++);
notifier.wait.then((value) => count++);

await Future.delayed(Duration.zero);
expect(count, 1);

notifier.notify();
expect(count, 1);

await Future.delayed(Duration.zero);
expect(count, 3);
});
}
116 changes: 116 additions & 0 deletions test/stream_extensions_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// BSD-style license that can be found in the LICENSE filevents.

import 'dart:async';
import 'dart:math';

import 'package:async/async.dart';
import 'package:test/test.dart';
Expand Down Expand Up @@ -182,4 +183,119 @@ void main() {
expect(cancelCompleted, isTrue);
});
});

group('parallelForEach', () {
test('sum (maxParallel: 1)', () async {
var sum = 0;
await Stream.fromIterable([1, 2, 3]).parallelForEach(1, (item) {
sum += item;
});
expect(sum, 6);
});

test('sum (maxParallel: 2)', () async {
var sum = 0;
var active = 0;
var maxActive = 0;
await Stream.fromIterable([1, 2, 3]).parallelForEach(2, (item) async {
active++;
expect(active, lessThanOrEqualTo(2));
maxActive = max(active, maxActive);
await Future.delayed(Duration(milliseconds: 50));
expect(active, lessThanOrEqualTo(2));
maxActive = max(active, maxActive);
sum += item;
active--;
});
expect(sum, 6);
expect(maxActive, 2);
});

test('abort when error is thrown (maxParallel: 1)', () async {
var sum = 0;
await expectLater(
Stream.fromIterable([1, 2, 3]).parallelForEach(1, (item) async {
sum += item;
if (sum > 2) {
throw Exception('abort');
}
}),
throwsException,
);
expect(sum, 3);
});

test('abort will not comsume the entire stream', () async {
var countedTo = 0;
Stream<int> countToN(int N) async* {
for (var i = 1; i <= N; i++) {
await Future.delayed(Duration.zero);
yield i;
countedTo = i;
}
}

var sum = 0;
await countToN(20).parallelForEach(2, (item) async {
sum += item;
});
expect(sum, greaterThan(20));
expect(countedTo, 20);

countedTo = 0;
await expectLater(
countToN(20).parallelForEach(2, (item) async {
if (item > 10) throw Exception('abort');
}),
throwsException,
);
expect(countedTo, greaterThanOrEqualTo(10));
expect(countedTo, lessThan(20));
});

test('onError can ignore errors', () async {
var countedTo = 0;
Stream<int> countToN(int N) async* {
for (var i = 1; i <= N; i++) {
await Future.delayed(Duration.zero);
yield i;
countedTo = i;
}
}

var sum = 0;
await countToN(20).parallelForEach(2, (item) async {
sum += item;
if (sum > 10) {
throw Exception('ignore this');
}
}, onError: (_, __) => null);
expect(sum, greaterThan(20));
expect(countedTo, 20);

countedTo = 0;
await expectLater(
countToN(20).parallelForEach(
2,
(item) async {
sum += item;
if (countedTo > 15) {
throw Exception('break');
}
if (countedTo > 10) {
throw Exception('ignore this');
}
},
onError: (e, st) {
if (e.toString().contains('break')) {
throw e;
}
},
),
throwsException,
);
expect(countedTo, greaterThanOrEqualTo(10));
expect(countedTo, lessThan(20));
});
});
}