Skip to content

Commit

Permalink
refactor(subject): Subject.stream now returns a read-only Stream (#…
Browse files Browse the repository at this point in the history
…699)

 - Streams returned by Subjects are read-only streams, ie. they don't support adding events (Previous, `Subject.stream` is identical to Subject).
 - Change return type of `ReplaySubject<T>.stream` to `ReplayStream<T>`
  • Loading branch information
hoc081098 authored Nov 16, 2022
1 parent 1a8fcef commit e1f6cd9
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 6 deletions.
61 changes: 60 additions & 1 deletion lib/src/subjects/behavior_subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class BehaviorSubject<T> extends Subject<T> implements ValueStream<T> {
_wrapper.setError(error, stackTrace);

@override
ValueStream<T> get stream => this;
ValueStream<T> get stream => _BehaviorSubjectStream(this);

@override
bool get hasValue => isNotEmpty(_wrapper.value);
Expand Down Expand Up @@ -191,3 +191,62 @@ class _Wrapper<T> {
isValue = false;
}
}

class _BehaviorSubjectStream<T> extends Stream<T> implements ValueStream<T> {
final BehaviorSubject<T> _subject;

_BehaviorSubjectStream(this._subject);

@override
bool get isBroadcast => true;

// Override == and hashCode so that new streams returned by the same
// subject are considered equal.
// The subject returns a new stream each time it's queried,
// but doesn't have to cache the result.

@override
int get hashCode => _subject.hashCode ^ 0x35323532;

@override
bool operator ==(Object other) {
if (identical(this, other)) return true;
return other is _BehaviorSubjectStream &&
identical(other._subject, _subject);
}

@override
StreamSubscription<T> listen(
void Function(T event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) =>
_subject.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);

@override
Object get error => _subject.error;

@override
Object? get errorOrNull => _subject.errorOrNull;

@override
bool get hasError => _subject.hasError;

@override
bool get hasValue => _subject.hasValue;

@override
StackTrace? get stackTrace => _subject.stackTrace;

@override
T get value => _subject.value;

@override
T? get valueOrNull => _subject.valueOrNull;
}
53 changes: 51 additions & 2 deletions lib/src/subjects/replay_subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,12 @@ class ReplaySubject<T> extends Subject<T> implements ReplayStream<T> {

@override
List<StackTrace?> get stackTraces => _queue
.where((event) => event.errorAndStackTrace != null)
.map((event) => event.errorAndStackTrace!.stackTrace)
.mapNotNull((event) => event.errorAndStackTrace)
.map((errorAndStackTrace) => errorAndStackTrace.stackTrace)
.toList(growable: false);

@override
ReplayStream<T> get stream => _ReplaySubjectStream(this);
}

class _Event<T> {
Expand All @@ -153,3 +156,49 @@ class _Event<T> {
factory _Event.error(ErrorAndStackTrace e) =>
_Event._(errorAndStackTrace: e, data: EMPTY);
}

class _ReplaySubjectStream<T> extends Stream<T> implements ReplayStream<T> {
final ReplaySubject<T> _subject;

_ReplaySubjectStream(this._subject);

@override
bool get isBroadcast => true;

@override
List<T> get values => _subject.values;

@override
List<Object> get errors => _subject.errors;

@override
List<StackTrace?> get stackTraces => _subject.stackTraces;

// Override == and hashCode so that new streams returned by the same
// subject are considered equal.
// The subject returns a new stream each time it's queried,
// but doesn't have to cache the result.

@override
int get hashCode => _subject.hashCode ^ 0x35323532;

@override
bool operator ==(Object other) {
if (identical(this, other)) return true;
return other is _ReplaySubjectStream && identical(other._subject, _subject);
}

@override
StreamSubscription<T> listen(
void Function(T event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) =>
_subject.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);
}
44 changes: 43 additions & 1 deletion lib/src/subjects/subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ abstract class Subject<T> extends StreamView<T> implements StreamController<T> {

/// Constructs a [Subject] which wraps the provided [controller].
/// This constructor is applicable only for classes that extend [Subject].
///
/// To guarantee the contract of a [Subject], the [controller] must be
/// a broadcast [StreamController] and the [stream] must also be a broadcast [Stream].
Subject(StreamController<T> controller, Stream<T> stream)
: _controller = controller,
assert(stream.isBroadcast, 'Subject requires a broadcast stream'),
super(stream);

@override
Expand All @@ -33,7 +37,7 @@ abstract class Subject<T> extends StreamView<T> implements StreamController<T> {
}

@override
Stream<T> get stream => this;
Stream<T> get stream => _SubjectStream(this);

@override
ControllerCallback get onPause =>
Expand Down Expand Up @@ -166,6 +170,44 @@ abstract class Subject<T> extends StreamView<T> implements StreamController<T> {
}
}

class _SubjectStream<T> extends Stream<T> {
final Subject<T> _subject;

_SubjectStream(this._subject);

@override
bool get isBroadcast => true;

// Override == and hashCode so that new streams returned by the same
// subject are considered equal.
// The subject returns a new stream each time it's queried,
// but doesn't have to cache the result.

@override
int get hashCode => _subject.hashCode ^ 0x35323532;

@override
bool operator ==(Object other) {
if (identical(this, other)) return true;
return other is _SubjectStream && identical(other._subject, _subject);
}

@override
StreamSubscription<T> listen(
void Function(T event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) =>
_subject.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);
}

/// A class that exposes only the [StreamSink] interface of an object.
class _StreamSinkWrapper<T> implements StreamSink<T> {
final StreamController<T> _target;

Expand Down
31 changes: 31 additions & 0 deletions test/subject/behavior_subject_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1220,5 +1220,36 @@ void main() {
}
});
});

test('stream returns a read-only stream', () async {
final subject = BehaviorSubject<int>()..add(1);

// streams returned by BehaviorSubject are read-only stream,
// ie. they don't support adding events.
expect(subject.stream, isNot(isA<BehaviorSubject<int>>()));
expect(subject.stream, isNot(isA<Sink<int>>()));

expect(
subject.stream,
isA<ValueStream<int>>().having(
(v) => v.value,
'BehaviorSubject.stream.value',
1,
),
);

// BehaviorSubject.stream is a broadcast stream
{
final stream = subject.stream;
expect(stream.isBroadcast, isTrue);
await expectLater(stream, emitsInOrder(<Object>[1]));
await expectLater(stream, emitsInOrder(<Object>[1]));
}

// streams returned by the same subject are considered equal,
// but not identical
expect(identical(subject.stream, subject.stream), isFalse);
expect(subject.stream == subject.stream, isTrue);
});
});
}
28 changes: 26 additions & 2 deletions test/subject/publish_subject_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import 'package:test/test.dart';

import '../utils.dart';

typedef AsyncVoidCallBack = Future<void> Function();

void main() {
group('PublishSubject', () {
test('emits items to every subscriber', () async {
Expand Down Expand Up @@ -295,5 +293,31 @@ void main() {
expect(subject.isBroadcast, isTrue);
expect(stream.isBroadcast, isTrue);
});

test('stream returns a read-only stream', () async {
final subject = PublishSubject<int>();

// streams returned by PublishSubject are read-only stream,
// ie. they don't support adding events.
expect(subject.stream, isNot(isA<PublishSubject<int>>()));
expect(subject.stream, isNot(isA<Sink<int>>()));

// PublishSubject.stream is a broadcast stream
{
final stream = subject.stream;
expect(stream.isBroadcast, isTrue);

scheduleMicrotask(() => subject.add(1));
await expectLater(stream, emitsInOrder(<Object>[1]));

scheduleMicrotask(() => subject.add(1));
await expectLater(stream, emitsInOrder(<Object>[1]));
}

// streams returned by the same subject are considered equal,
// but not identical
expect(identical(subject.stream, subject.stream), isFalse);
expect(subject.stream == subject.stream, isTrue);
});
});
}
31 changes: 31 additions & 0 deletions test/subject/replay_subject_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -443,5 +443,36 @@ void main() {
expect(() => subject.addError(Exception()), throwsStateError);
expect(subject.values, [1]);
});

test('stream returns a read-only stream', () async {
final subject = ReplaySubject<int>()..add(1);

// streams returned by ReplaySubject are read-only stream,
// ie. they don't support adding events.
expect(subject.stream, isNot(isA<ReplaySubject<int>>()));
expect(subject.stream, isNot(isA<Sink<int>>()));

expect(
subject.stream,
isA<ReplayStream<int>>().having(
(v) => v.values,
'ReplaySubject.stream.values',
[1],
),
);

// ReplaySubject.stream is a broadcast stream
{
final stream = subject.stream;
expect(stream.isBroadcast, isTrue);
await expectLater(stream, emitsInOrder(<Object>[1]));
await expectLater(stream, emitsInOrder(<Object>[1]));
}

// streams returned by the same subject are considered equal,
// but not identical
expect(identical(subject.stream, subject.stream), isFalse);
expect(subject.stream == subject.stream, isTrue);
});
});
}

0 comments on commit e1f6cd9

Please sign in to comment.