-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unexpected behavior of publish() and connect() #2596
Comments
I forgot to mention I'm using rxjava-1.0.4 |
Behaves the same with 1.0.5. What happens in case 1 is that there are practically two clients to pub1: your subscriber and the second publish(). Because publish() does backpressure, it can only request more from the source range if all of its child subscribers have consumed the values so far and have requested more: pub2 buffered the initial burst and stopped responding and your subscriber consumed all 128 elements and is ready for more, but can't get through. This is related to case 2 where you experience some replay: due to backpressure, publish() will buffer up to 128 elements regardless if there is anyone to observe it or not (instead what you'd expect from multicasting through a PublishSubject). /cc @benjchristensen In order to fix these, publish() without clients should drop events, but has the drawback that in case like the code above, sources may burst through all of its values before subscribers have any chance of subscribing to it. |
In case 1, it would be very surprising to some observer A that another observer B which A may not even know would affect its observation of the source. Would there be any better way of handling backpressure of publish()? I have been suffering from this for days suspecting a deadlock situation in my code... |
Just found this after posting on the mailing list, since we were running into the same issue with Publish behaving like Replay: https://groups.google.com/forum/#!topic/rxjava/tTly_OZyaIs I find this very confusing, as it seems to violate the behavioral contract? Unless I misunderstood the contract as more being one of minimum guarantees and "maybe replays" versus "always replays". What this means in practice is that unless your sequences have more than 128 items (which is an implementation detail), Publish and Replay behave the same. I wonder if this is in line with the operators on .NET, which don't support backpressure AFAIK? |
I think I know what the issue is. It works fine when a subscriber is subscribed, but if it is just connected but no subscribers then it will indeed buffer, which is wrong, as it thinks that "zero subscribers" means it is backpressured. Here are some examples of it working correctly when connect is immediately followed by a subscribe: TestScheduler scheduler = Schedulers.test();
ConnectableObservable<Long> p = Observable.interval(1, TimeUnit.SECONDS, scheduler).publish();
p.forEach(i -> System.out.println("A -> " + i));
p.connect();
scheduler.advanceTimeBy(2, TimeUnit.SECONDS);
p.forEach(i -> System.out.println("B -> " + i));
scheduler.advanceTimeBy(2, TimeUnit.SECONDS); This will emit:
Note that B only receives 2 and 3, not 0 and 1. Here is one with a slow consumer that joins first, then a fast one later: private static void slowConsumers() {
TestScheduler scheduler = Schedulers.test();
ConnectableObservable<Long> p = Observable.interval(1, TimeUnit.SECONDS, scheduler).publish();
p.observeOn(Schedulers.io()).map(i -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
return i;
}).forEach(i -> System.out.println("A -> " + i));
p.connect();
scheduler.advanceTimeBy(2, TimeUnit.SECONDS);
p.forEach(i -> System.out.println("B -> " + i));
scheduler.advanceTimeBy(2, TimeUnit.SECONDS);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
B still only gets the 2. Where it goes wrong is at this line of code: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorPublish.java#L357 It sees having zero subscribers as being "backpressured", but that is not valid in this case. It should just drop the data if there are no subscribers. |
@benjchristensen Another related problem: Observable<Integer> source = Observable.range(0, 1000);
Observable<Integer> shared = source.share();
ConnectableObservable<Integer> published = shared.publish();
published.subscribe(new PrintObserver<Integer>());
published.connect(); I expected to see 0 to 999, but again it gives me only 0 to 127. |
@FinalVersion the share + publish example works with master for me. |
@akarnokd Thanks. It works since 1.0.5. I didn't update in time. |
There were fixes merged relating to this issue. Could you check if 1.0.8 works for you and if so, close the issue? |
Case 2 is good now. However Case 1 still gives the same result. I suppose that pub2 has no subscriber and should not put backpressure on pub1. |
The |
All the cases are correct now in my test. |
Thanks for the confirmation. |
I have experienced some unexpected behaviors of publish() and connect() in my project.
Hopefully it's not my misunderstanding since I'm a newbie to Rx.
I will use an Observer that simply print onNext, onError, onCompleted.
The simplified scenarios are as follows:
I expected the output to be 'onNext: 0' up to 'onNext: 999', then 'onCompleted', but it stops at onNext: 127 in my test.
But if I uncomment
//pub2.subscribe();
, namely have something subscribe to pub2, it will work as expected. Or else if I do notpub2.connect()
, it will also work.The code may looks silly, but it may not if I put some operator which does expensive computations before publishing as pub2. BTW, what I want is to do some intermediate computations only once if pub2 is subscribed by many subscribers or piped to different downstream operators. If there're better ways to do that, please kindly let me know.
publish()
behaves likereplay()
I expected that after
pub.connect()
, source will start emitting, and whenpub
is subscribed after 2.5 sec, 'onNext: 0' and 'onNext: 1' will be missed by this subscriber. However, I got them all together at 2.5 sec, and from 3rd second, I got the following onNext: 2 and so on. This looks like areplay()
behavior to me.The text was updated successfully, but these errors were encountered: