Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking zip operator #871

Closed
wants to merge 3 commits into from
Closed

Conversation

akarnokd
Copy link
Member

Proposed solution to #867

  • Added overloads to zip many and zip 2 only. If my approach is favorable, other overloads can be added.
    • Javadoc might need some rephrasing.
  • Contains fix for Zip Bug UnitTest: Never Completes When Zero Observables (Obsolete) #868 but no test.
  • Overloads take a bufferSize > 0 indicating the queue size, bufferSize <= 0 behaves as the original zip.
    • Due to the internals of zip, a bufferSize of 0 does not work (i.e., everyone blocks on add and noone can call tick). For that, zip would need different internals with CyclicBarrier and other magic. Should I work on this case as well?
  • Source completion is blocking as well right now; for example, an empty producer with a slow partner will complete only if the slow partner fires its first event. Should it terminate as soon as possible?

Update:

  • Added 0-length buffering mode, bufferSize < 0 now indicates unlimited buffering.
  • Changed buffered mode to finish as soon as possible

@cloudbees-pull-request-builder

RxJava-pull-requests #799 SUCCESS
This pull request looks good

@headinthebox
Copy link
Contributor

Source completion is blocking as well right now; for example,
an empty producer with a slow partner will complete only if the slow
partner fires its first event. Should it terminate as soon as possible?

Definitively empty[T]().zip(never[S](), (s,t)=>(s,t)) = empty[(S,T)]

@akarnokd
Copy link
Member Author

Thought so, started working on the question points anyway.

@tnine
Copy link
Contributor

tnine commented Feb 13, 2014

Hey guys,
IMHO throwing an IllegalArgumentException when Observable.zip(empty, zip) is invoked seems like a valid solution to me. I'm not sure that zip should allow zipping nothing. By definition, there's nothing to observe. I just think it should validate input, I only discovered this problem due to a bug in our code.

@cloudbees-pull-request-builder

RxJava-pull-requests #800 FAILURE
Looks like there's a problem with this pull request

@akarnokd
Copy link
Member Author

signArchives failure again.

@akarnokd akarnokd closed this Feb 13, 2014
@akarnokd
Copy link
Member Author

Let's try again.

@akarnokd akarnokd reopened this Feb 13, 2014
@cloudbees-pull-request-builder

RxJava-pull-requests #801 SUCCESS
This pull request looks good

@kirkshoop
Copy link
Member

I am not sure where the main discussion is happening, so I am placing this response here.

I looked at this because I am experimenting with rxcpp improvements and have been applying some of the choices made in RxJava. I was interested to see how this would map to C++ as well.

My understanding is that that pause is called from onnext when the queue is filled to bufferSize. pause will block the stack that called onnext until resume is called.

I saw some discussion about using SubscribeOn to prevent deadlocks. I was doing thought experiments of my own and wondered how that would be applied to Hot Observables?

It seems like many Hot Observables have stacks rooted in threads that are dedicated to that source (UI thread, Sensor thread, IO thread) and that blocking those threads have consequences for the system. A UI thread would hang the app when blocked momentarily or deadlock if two UI events are being zipped. A Sensor thread might drop or queue readings for all the sensors being monitored or deadlock if two sensors readings are being zipped.

Using SubscribeOn will not fix this and using ObserveOn only moves the infinite queue from zip to ObserveOn.

I would suggest that a Hot Observable needs to be unsubscribed when the buffer is full. This sounds like ConnectableObservable is the surface that supports backpressure for HotObservables. I futher think that ColdObservables can be implemented using ConnectableObservable as well.

speculation..

I also think that the buffering method, and depth and the backpressure algorithm should be unique to each source Observable. Thus, I am wondering if back-pressure can instead be implemented as overloads of a BackPressure operator that returns a ConnectableObservable and then all the operators that combine multiple observables have backpressure capable overloads that take only ConnectableObservable. Zip from N ConnectableObservables will have a fixed buffer depth of, for example, 1 and would thus start by calling Connect() on each source then unsubscribe the connect in the first onnext call from each source. When the onnext call is received from the last source the output onnext is called and then Connect() is called on each source - etc..

two examples of potential backpressure operators.

A BlockingBackPressure operator would take Cold observables. It would block the source when it is disconnected and would take the Scheduler to use in SubscribeOn. No buffer needed.

A LossyBackPressure operator would take hot observables and unsubscribe when the buffer is full and re-subscribe when the buffer has room.

Critiques and explanations welcome.

Thanks,
Kirk

@akarnokd
Copy link
Member Author

akarnokd commented Mar 2, 2014

I would suggest that a Hot Observable needs to be unsubscribed when the buffer is full. This sounds like ConnectableObservable is the surface that supports backpressure for HotObservables. I futher think that ColdObservables can be implemented using ConnectableObservable as well.

This doesn't work. If you unsubscribe from range() then resubscribe later, range starts over again. If you unsubscribe from PublishSubject, you lose events. Ben reported about a new approach in #802 but no details.

@kirkshoop
Copy link
Member

Of course, but some implementations of ConnectableObservable can block while others do not. You might not have read my whole comment. near the end you will find:

two examples of potential backpressure operators.

A BlockingBackPressure operator would take Cold observables. It would block the source when it is disconnected and would take the Scheduler to use in SubscribeOn. No buffer needed.

A LossyBackPressure operator would take hot observables and unsubscribe when the buffer is full and re-subscribe when the buffer has room.

@abersnaze
Copy link
Contributor

This seems like as good as any other place to try out the proposal. Here are the basics.

Add three methods to Subscription: void pause(), boolean isPaused() and void resumeWith(Action0 onResume). Pause/isPause behave the same as unsubscribe/isUnsubscribe but non-destructive (not reading vs. closing the socket). When the source first detects that a Subscriber isPaused it is obliged to call resumeWith(Action0) with a function that closed over the current state of the stream and then return from the OnSubscribe function as soon as possible. The action passed should resume calling of onNext/onComplete. onError does not get paused. I have an experimentally 'pause' branch on my fork with an implementation of zip with an internal buffer size of 1 that can apply back pressure. I've also implemented a few new operators on Observable to allow users to enforce the desired pause behavior on a stream called: whilePausedBlock(), whilePausedDrop(), whilePausedBuffer() and whilePausedUnsubscribe().

All of this will be coming in a day or two once cherry pick the changes from the prototype.

@kirkshoop
Copy link
Member

Cool, can't wait to see the code :)

I like the four operators, they make sense. I feel like one operation is missing from subscriber. How does Zip resume the subscribers?

Additionally, one of the ways in which pause/isPaused are not like unsubscribe/isUnsubscribed is that pause will flip back and forth over time while unsubscribe is one-shot. This seems to introduce a few additional conditions that would need to be addressed:

I am interested in the full semantics of a paused Subscriber.

  • What happens when resumeWith is called twice during the same pause?
  • What happens when pause is called while paused?
  • What is the policy for multithread access to pause, resumeWith and resume?

This appears to force backpressure support in every source. This has tradeoffs between adoption and uniformity. Is there some pattern here that would allow opt in?

Thanks,
Kirk

@abersnaze
Copy link
Contributor

Zip resumes the inner subscribers internally when other subscriber has caught up by clearing the internal state that makes isPaused return true and then it invokes the actions passed in to resumeWith while it was paused.

All actions passed in resumeWith are invoked when the subscriber is ready for more data.

The second pause is treated the same as a second unsubscribe, probably ignored. Also actions passed in to resumeWith while not paused are immediately invoked.

There is a lot of logic in the CompositeSubscription for handling all of this. These changes makes implementing Subscription directly very tricky. I'm inclined to try and make Subscription a final class and not an interface. That makes me wonder if there isn't something to the relationship between Subject = Observer + Observable and Subscriber = Observer + Subscription

@kirkshoop
Copy link
Member

I like the idea that Subscriber is a concrete type, like Subject, and is a composition of Observer and Subscription.

Thanks for the explanation.

However, given those properties, why would pause be on the interface? Zip has to use private access to the Subscriber in order to resume, why not pause? Is there any case where a pause would be invoked outside Zip?

@kirkshoop
Copy link
Member

I am also thinking that it would be a good idea to pass the scheduler that should be used to schedule the action to resumeWith. The Action supplied could have a call to schedule but I think it is important to be explicit, it allows the scheduler to be known and optionally overridden.

Going with the Subject, Subscriber theme, perhaps resumeWith actually takes a Resumer that is a Scheduler and Action. Resumer may be a functor or have a named resume function that schedules the action.

@zsxwing
Copy link
Member

zsxwing commented Mar 4, 2014

Add three methods to Subscription: void pause(), boolean isPaused() and void resumeWith(Action0 onResume).

@abersnaze, I'm curious that how do you handle resumeWith with subscribeOn. Look forward to your PR.

@benjchristensen
Copy link
Member

After experimenting with these approaches on observeOn, subscribeOn, and zip we can't pursue this route. Any form of blocking becomes a liability in systems using event loops and breaks the mental model of what people expect from Rx.

For example, even merge as currently implemented (and following the Rx.Net pattern) is a problem since it synchronizes multiple streams into one. This means it can block threads which can be event loops.

I'll post another issue soon to pull the conversation together.

@akarnokd akarnokd deleted the ZipBackpressure1 branch May 6, 2014 13:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants