-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug: shared Observables not actually repeatable/retryable #678
Comments
Weird, I thought I had tests around that from the beginning. It's probably my most recent changes to that that broke the behavior. I was using |
We have this test: it('should retry just fine', function () {
var e1 = cold('---a--b--c--d--e--#');
var expected = '---a--b--c--d--e-----a--b--c--d--e--#';
expectObservable(e1.share().retry(1)).toBe(expected);
}) But notice how there is just one subscriber in this case. |
Also, this test doesn't have multiple subscribers: https://github.com/ReactiveX/RxJS/blob/master/spec/operators/multicast-spec.js#L88 |
Hmm... it looks like It doesn't seem like a problem with ConnectableObservable, my suspicion lies with the Subscribers for |
It looks like it resubscribes whenever it completes: https://github.com/ReactiveX/RxJS/blob/e9b13d0178a7ebad2cccd217fc0249449804831d/spec/operators/multicast-spec.js#L102 (also, pro-tip: Hit Y before you copy a link in GitHub to get the SHA in the URL so it doesn't change over time.) |
Ok, I'm narrowing down the problem. RetryWhen works because it usually retries asynchronously some time later. Retry is synchronous, and a race condition happens with RefCounting. UPDATE: I changed this message to include more details var source = Rx.Observable.create(observer => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.error(new Error('BOOM');
}).share();
// Subscriber A
source.retry(1).subscribe(
x => console.log(x),
e => console.error(e.message),
() => console.log('|'));
// Subscriber B
source.retry(1).subscribe(
x => console.log(' '+x),
e => console.error(' '+e.message),
() => console.log(' |'));
As you can see, the refCount counter never goes to zero, which means the cold source is not resubscribed. Working on a solution... |
I tried a solution where the RefCountSubscriber resets the counter when it encounters an error, but I still have problems with retry's synchronous behavior kicking in too soon (because Subject errors are propagated depth-first to observers, not breadth-first):
Conclusion: only subscriber B actually performs a retry, the subscriber A doesn't because it gets interrupted by "subject1.observer[subscriberB].error()". A sloppy solution for this would be to change |
Update to this issue: I've been exploring how does RxJava solve this problem. (I verified that RxJava gives the correct expected output for the same code snippet in this issue) My current guess is it has all to do with refCount and ConnectableObservable, we might need to treat synchronous Observables as a special case. E.g. public final class OnSubscribeRefCount<T> implements OnSubscribe<T> {
@Override
public void call(final Subscriber<? super T> subscriber) {
lock.lock();
if (subscriptionCount.incrementAndGet() == 1) {
final AtomicBoolean writeLocked = new AtomicBoolean(true);
try {
// need to use this overload of connect to ensure that
// baseSubscription is set in the case that source is a
// synchronous Observable
source.connect(onSubscribe(subscriber, writeLocked));
} finally {
// need to cover the case where the source is subscribed to
// outside of this class thus preventing the Action1 passed
// to source.connect above being called
if (writeLocked.get()) {
// Action1 passed to source.connect was not called
lock.unlock();
}
}
} else {
try {
// ready to subscribe to source so do it
doSubscribe(subscriber, baseSubscription);
} finally {
// release the read lock
lock.unlock();
}
}
}
} |
Also, confirmed that I have to "RefCountSubscriber resets the counter when it encounters an error", because RxJava does it too, see public final class OnSubscribeRefCount<T> implements OnSubscribe<T> {
void doSubscribe(final Subscriber<? super T> subscriber, final CompositeSubscription currentBase) {
// handle unsubscribing from the base subscription
subscriber.add(disconnect(currentBase));
source.unsafeSubscribe(new Subscriber<T>(subscriber) {
@Override
public void onError(Throwable e) {
cleanup();
subscriber.onError(e);
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onCompleted() {
cleanup();
subscriber.onCompleted();
}
void cleanup() {
// on error or completion we need to unsubscribe the base subscription
// and set the subscriptionCount to 0
lock.lock();
try {
if (baseSubscription == currentBase) {
baseSubscription.unsubscribe();
baseSubscription = new CompositeSubscription();
subscriptionCount.set(0);
}
} finally {
lock.unlock();
}
}
});
}
} |
Another mind dump: I noticed RxJava does the following, and RxJS doesn't, but should. Seems very obvious the refCount producer should emit synchronously to the first subscriber everything it has, then error, then retry, and only after that, start the same for the second subscriber. In other words, RxJS subscribers are interleaved but should execute serially when the source is entirely synchronous.
|
Yes, I was wondering about this myself. Let me know if you'd like me to take a look at this as well. |
I solved that one already, using this: /**
* Instructs the ConnectableObservable to begin emitting the items from its
* underlying source to its Subscribers.
*
* @param onSubscribe a function that receives the connection subscription
* before the subscription to source happens, allowing the caller to
* synchronously disconnect a synchronous source.
*/
_callbackConnect(onSubscribe: (subscription: Subscription<T>) => void): void {
let subscription = this.subscription;
if (subscription && !subscription.isUnsubscribed) {
onSubscribe(subscription);
return;
}
this.subscription = subscription = new Subscription();
onSubscribe(subscription);
subscription.add(this.source.subscribe(this._getSubject()));
subscription.add(new ConnectableSubscription(this));
} As per RxJava. |
@staltz and @trxcllnt ... we have a big of unfinished business with the es7 observable spec that relates to this I think. We need to add a |
…and refCounting Fix ConnectableObservable, its connect() method, and the RefCountObservable to support synchronous retry/repeat in the presence of multiple subscribers, and to support retry/repeat in other asynchronous scenarios. This commit is a major rework of ConnectableObservable. Resolves bug ReactiveX#678.
@staltz, once #726 lands, we may be able to leverage the |
…and refCounting Fix ConnectableObservable, its connect() method, and the RefCountObservable to support synchronous retry/repeat in the presence of multiple subscribers, and to support retry/repeat in other asynchronous scenarios. This commit is a major rework of ConnectableObservable. Resolves bug ReactiveX#678.
…and refCounting Fix ConnectableObservable, its connect() method, and the RefCountObservable to support synchronous retry/repeat in the presence of multiple subscribers, and to support retry/repeat in other asynchronous scenarios. This commit is a major rework of ConnectableObservable. Resolves bug ReactiveX#678.
…antics When the ConnectableObservable with refCount always shares the same instance of the underlying subject (such as in publish, publishReplay, publishBehavior), the subscription to the connectable observable should NOT incur additional subscriptions to the underlying cold source. See how tests for publish/publishBehavior/publishReplay were updated to assert that only one subscription to the underlying cold source happens, not multiple, because as soon as the multicasting subject raises an error, this error impedes subsequent subscriptions to the cold source from happening. Related to ReactiveX#678.
…and refCounting When the ConnectableObservable with refCount always shares the same instance of the underlying subject (such as in publish, publishReplay, publishBehavior), the subscription to the connectable observable should NOT incur additional subscriptions to the underlying cold source. See how tests for publish/publishBehavior/publishReplay were updated to assert that only one subscription to the underlying cold source happens, not multiple, because as soon as the multicasting subject raises an error, this error impedes subsequent subscriptions to the cold source from happening. Fix ConnectableObservable, its connect() method, and the RefCountObservable to support synchronous retry/repeat in the presence of multiple subscribers, and to support retry/repeat in other asynchronous scenarios. Resolves bug ReactiveX#678.
…and refCounting When the ConnectableObservable with refCount always shares the same instance of the underlying subject (such as in publish, publishReplay, publishBehavior), the subscription to the connectable observable should NOT incur additional subscriptions to the underlying cold source. See how tests for publish/publishBehavior/publishReplay were updated to assert that only one subscription to the underlying cold source happens, not multiple, because as soon as the multicasting subject raises an error, this error impedes subsequent subscriptions to the cold source from happening. Fix ConnectableObservable, its connect() method, and the RefCountObservable to support synchronous retry/repeat in the presence of multiple subscribers, and to support retry/repeat in other asynchronous scenarios. Resolves bug #678.
@staltz are you satisfied that this can be closed now? |
Definitely can be closed. |
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs. |
Runs as
while we expected
I'm investigating some solutions in ConnectableObservable to be able to support @Blesh's use cases for retryable/repeatable Observables, but it seems like this is a bug.
The text was updated successfully, but these errors were encountered: