-
Notifications
You must be signed in to change notification settings - Fork 226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel flowable support #155
Parallel flowable support #155
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good so far! Thanks for doing this. Few comments around implementation and some minor nits
@@ -32,5 +33,6 @@ | |||
ObservableConverter<T, ObservableSubscribeProxy<T>>, | |||
MaybeConverter<T, MaybeSubscribeProxy<T>>, | |||
SingleConverter<T, SingleSubscribeProxy<T>>, | |||
CompletableConverter<CompletableSubscribeProxy> { | |||
CompletableConverter<CompletableSubscribeProxy>, | |||
ParallelFlowableConverter<T, ParallelFlowableSubscribeProxy<T>>{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's move this next to FlowableConverter
in the declaration
import io.reactivex.parallel.ParallelFlowable; | ||
|
||
class ParallelFlowableScoper<T> extends Scoper | ||
implements Function<ParallelFlowable<? extends T>, ParallelFlowableSubscribeProxy<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is never going to be used with to()
, let's make this implement the ParallelFlowableConverter
directly and skip the indirection of the others. I mostly haven't migrated the others to this pattern yet to give users time to migrate, but they'll all be moved to this for 1.0. This one can start with it since it's new
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I'll make the change. You want me to open an issue for this and maybe we can work on it as a part of 0.7.0 release?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't worry about it for now. I'll migrate the rest en masse for the 1.0 release when the time comes
|
||
@Override public void subscribe(Subscriber<? super T>[] subscribers) { | ||
Subscriber<? super T>[] newSubscribers = new Subscriber[subscribers.length]; | ||
for (int i = 0; i < subscribers.length; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting... I'm not super familiar with ParallelFlowable
, but are there any thread safety concerns here? Is this how other subscribe()
calls are handled in its operators?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that's what is happening in the subscribe()
method of its operators. It iterates over the passed in array of subscribers, creates a new array of subscribers and passes it to the source Flowable
.
One very important thing which I forgot to do was checking if the number of subscribers is equal to the parallelism of the flowable (subscribe()'s
contract says that they should be equal.) I'll push another commit tonight.
|
||
private static final int DEFAULT_PARALLELISM = 2; | ||
|
||
@Rule public RxErrorsRule rule = new RxErrorsRule(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can be final
source.onNext(1); | ||
source.onNext(2); | ||
firstSubscriber.assertValue(1); | ||
secondSubscriber.assertValue(2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain this part a bit? Not quite sure I followed how this works (probably my lack of familiarity with ParallelFlowable
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the ParallelFlowable
emits the items to the subscribers in a round robin fashion, and here we are emitting 1 & 2 to two subscribers. This means that the firstSubscriber
will only receive 1 and the secondSubscriber
would only receive 2 and nothing else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha 👍
Summary: Added the missing check in subscribe method which checks if parallelism == subscribers.count(). Also replaced ParallelFlowableScoper with AutoDisposeParallelFlowableConverter.
Sidenote - it helps me with reviewing if you address each comment (or logical set of comments) with a single commit and then link its sha as a response to the comment |
|
||
Subscriber<? super T>[] newSubscribers = new Subscriber[subscribers.length]; | ||
for (int i = 0; i < subscribers.length; i++) { | ||
AutoDisposingSubscriberImpl<? super T> subscriber = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kind of wonder if we do need a separate subscriber that manages all the subscribers here. @akarnokd penny for your thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parallel rails are largely independent and act as their own Flowables. That's why many parallel operators just delegate to their sequential versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool, in that case we can leave this as-is 👍
@VisheshVadhera do you have any other changes planned for this? (just checking) |
@hzsweers No other changes as of now. We can land this if everything looks fine. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, thanks for this!!
Closes #142