Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Add additional CancelableOperation utilities #194

Merged
merged 6 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

* Add `StreamExtensions.firstOrNull`.

* Add a `CancelableOperation.fromSubscription()` static factory.

* Add a `CancelableOperation.race()` static method.

## 2.8.2

* Deprecate `EventSinkBase`, `StreamSinkBase`, `IOSinkBase`.
Expand Down
56 changes: 56 additions & 0 deletions lib/src/cancelable_operation.dart
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,62 @@ class CancelableOperation<T> {
return completer.operation;
}

/// Creates a [CancelableOperation] wrapping [subscription].
///
/// This overrides [subscription.onDone] and [subscription.onError] so that
/// the returned operation will complete when the subscription completes or
/// emits an error. When this operation is canceled or when it emits an error,
/// the subscription will be canceled (unlike
/// `CancelableOperation.fromFuture(subscription.asFuture())`).
static CancelableOperation<void> fromSubscription(
StreamSubscription<void> subscription) {
var completer = CancelableCompleter<void>(onCancel: subscription.cancel);
subscription.onDone(completer.complete);
subscription.onError((Object error, StackTrace stackTrace) {
subscription.cancel().whenComplete(() {
completer.completeError(error, stackTrace);
});
});
return completer.operation;
}

/// Returns a [CancelableOperation] that completes with the value of the first
/// of [operations] to complete.
///
/// Once any of [operations] completes, its result is forwarded to the
/// returned [CancelableOperation] and the rest are cancelled. When the
/// returned operation is cancelled, all the [operations] are cancelled as
/// well.
static CancelableOperation<T> race<T>(
Iterable<CancelableOperation<T>> operations) {
operations = operations.toList();
if (operations.isEmpty) {
throw ArgumentError.value("May not be empty", "operations");
}

nex3 marked this conversation as resolved.
Show resolved Hide resolved
var done = false;
// Note: if one of the completers has already completed, it's not actually
// cancelled by this.
Future<void> _cancelAll() {
done = true;
return Future.wait(operations.map((operation) => operation.cancel()));
}

var completer = CancelableCompleter<T>(onCancel: _cancelAll);
for (var operation in operations) {
operation.then((value) {
if (!done) completer.complete(_cancelAll().then((_) => value));
lrhn marked this conversation as resolved.
Show resolved Hide resolved
}, onError: (error, stackTrace) {
if (!done) {
completer.complete(
_cancelAll().then((_) => Future.error(error, stackTrace)));
lrhn marked this conversation as resolved.
Show resolved Hide resolved
}
});
}

return completer.operation;
}

/// The value returned by the operation.
Future<T> get value => _completer._inner.future;

Expand Down
116 changes: 116 additions & 0 deletions test/cancelable_operation_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,30 @@ void main() {
expect(operation.value, throwsA('error'));
});
});

group('CancelableOperation.fromSubscription', () {
test('forwards a done event once it completes', () async {
var controller = StreamController<void>();
var operationCompleted = false;
CancelableOperation.fromSubscription(controller.stream.listen(null))
.then((_) {
operationCompleted = true;
});

await flushMicrotasks();
expect(operationCompleted, isFalse);

controller.close();
await flushMicrotasks();
expect(operationCompleted, isTrue);
});

test('forwards errors', () {
var operation = CancelableOperation.fromSubscription(
Stream.error('error').listen(null));
expect(operation.value, throwsA('error'));
});
});
});

group('when canceled', () {
Expand Down Expand Up @@ -237,6 +261,37 @@ void main() {
await flushMicrotasks();
expect(fired, isTrue);
});

test('CancelableOperation.fromSubscription() cancels the subscription',
() async {
var cancelCompleter = Completer<void>();
var canceled = false;
var controller = StreamController<void>(onCancel: () {
canceled = true;
return cancelCompleter.future;
});
var operation =
CancelableOperation.fromSubscription(controller.stream.listen(null));

await flushMicrotasks();
expect(canceled, isFalse);

// The `cancel()` call shouldn't complete until
// `StreamSubscription.cancel` completes.
var cancelCompleted = false;
expect(
operation.cancel().then((_) {
cancelCompleted = true;
}),
completes);
await flushMicrotasks();
expect(canceled, isTrue);
expect(cancelCompleted, isFalse);

cancelCompleter.complete();
await flushMicrotasks();
expect(cancelCompleted, isTrue);
});
});

group('asStream()', () {
Expand Down Expand Up @@ -440,4 +495,65 @@ void main() {
});
});
});

group('race()', () {
late bool canceled1;
late CancelableCompleter<int> completer1;
late bool canceled2;
late CancelableCompleter<int> completer2;
late bool canceled3;
late CancelableCompleter<int> completer3;
late CancelableOperation<int> operation;
setUp(() {
canceled1 = false;
completer1 = CancelableCompleter<int>(onCancel: () {
canceled1 = true;
});

canceled2 = false;
completer2 = CancelableCompleter<int>(onCancel: () {
canceled2 = true;
});

canceled3 = false;
completer3 = CancelableCompleter<int>(onCancel: () {
canceled3 = true;
});

operation = CancelableOperation.race(
[completer1.operation, completer2.operation, completer3.operation]);
});

test('returns the first value to complete', () {
completer1.complete(1);
completer2.complete(2);
completer3.complete(3);

expect(operation.value, completion(equals(1)));
});

test('throws the first error to complete', () {
completer1.completeError("error 1");
completer2.completeError("error 2");
completer3.completeError("error 3");

expect(operation.value, throwsA("error 1"));
});

test('cancels any completers that haven\'t completed', () async {
completer1.complete(1);
await expectLater(operation.value, completion(equals(1)));
expect(canceled1, isFalse);
expect(canceled2, isTrue);
expect(canceled3, isTrue);
});

test('cancels all completers when the operation is completed', () async {
await operation.cancel();

expect(canceled1, isTrue);
expect(canceled2, isTrue);
expect(canceled3, isTrue);
});
});
}