-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implemented the AsyncOnSubscribe #3203
Conversation
What's the purpose of this class? |
This is the asynchronous complement to the SyncOnSubscribe as described in #3003. |
f699993
to
3ad5036
Compare
cddd743
to
78c38fc
Compare
The remaining issue with this implementation is that the inner observables are subscribed to an unbounded buffer. This could be improved to a back pressure supporting |
f54e46c
to
a68a219
Compare
l still don't understand this class. Looks very similar to a SyncOnSubscribe that emits Observables and flatMaps it for you. |
I understand your concern but I'm not sure about your comment. This operator does not call flatMap. This issue comes from use cases at Netflix with large services and streaming work. I think it's pretty clear to see from the unit tests that this gives a user direct access to the request and allows fulfillment by any observable (which may or may not be asynchronous). |
This is very similar to the work that you tried with the AbstractProducer. It allows batching and eager subscription. |
This is supposed to help with creating observables from asynchronous data sources in a non-blocking way. Consider the example below... public static void main(String[] args) {
Observable.<Integer>create((Subscriber<? super Integer> s) -> {
s.setProducer((long requested) -> {
getData(requested)
.observe()
.subscribe(s::onNext, s::onError, s::onCompleted);
});
});
}
public static HystrixCommand<Integer> getData(long requested) {
return null;
}
public static interface HystrixCommand<T> {
public Observable<T> observe();
} The use case here is that for each request we want to issue an asynchronous service request and when that request is fulfilled then we want to start onNexting the data for that request. The problem with this naive implementation is that it's very easy and certainly possible to have overlapping requests. You could implement the cas addToRequestAndGet and make sure that only one thread at a time is fulfilling the request at a time but you still have the possibility of onNext events interleaving or overlapping. |
a68a219
to
70eea84
Compare
it doesn't help with truly hot sources but it'll work great for progressive async sources. 👍 |
I have reviewed this work with @stealthcode as he's been working on it and am quite interested in where this goes. It can greatly help us solve the most difficult part of using RxJava: creating correct sources that support backpressure. I'll open an issue to summarize the needs for 1.x and 2.x so we can all get aligned on this topic. I'm +1 on experimenting with this and eventually getting |
Actually, there is already an issue: #3003 |
Implemented the AsyncOnSubscribe.
|
||
@Override | ||
protected S generateState() { | ||
return generator == null ? null : generator.call(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great, if this is to be migrated to 2.x, that user-supplied functions are called in try-catch. Perhaps not here but in call().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's merged into 1.x. Feel free to open a pull request to 2.x if you have time or to 1.x if you feel its necessary.
if (onNextCalled) | ||
r = poll(); | ||
if (hasTerminated || isUnsubscribed()) { | ||
parent.onUnsubscribe(state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If nextIteration
has terminated, the queue may still hold requested values so it might be worth considering calling clear()
before quitting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. This would affect resubscriptions.
} else { | ||
state.subscriber = s; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this class could get used by multiple concurrent subscribers (?) but if it does then I would expect to see an atomic compareAndSet here rather than just !=null.
This includes the first functioning unit tests. This still has a few iterations to go but opening PR for review.