-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix issues that GroupBy and Sample doesn't call 'unsubscribe' and also NPE when the key is null in GroupBy #1959
Conversation
Find two more bugs:
|
Fixed them in this PR. |
@@ -75,6 +77,7 @@ public OperatorGroupBy( | |||
final Func1<? super T, ? extends K> keySelector; | |||
final Func1<? super T, ? extends R> elementSelector; | |||
final Subscriber<? super GroupedObservable<K, R>> child; | |||
final Subscription parentSubscription = this; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a self
member variable which also points to this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
@@ -68,6 +68,7 @@ public OperatorSampleWithTime(long time, TimeUnit unit, Scheduler scheduler) { | |||
static final AtomicReferenceFieldUpdater<SamplerSubscriber, Object> VALUE_UPDATER | |||
= AtomicReferenceFieldUpdater.newUpdater(SamplerSubscriber.class, Object.class, "value"); | |||
public SamplerSubscriber(Subscriber<? super T> subscriber) { | |||
super(subscriber); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will unsubscribe the downstream. I suggest instead having child.add(sampler);
after L52.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't had a chance to try this code yet, but I want to make sure this doesn't break the backpressure functionality where this operator requests Long.MAX_VALUE
up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will unsubscribe the downstream. I suggest instead having child.add(sampler); after L52.
Could you elaborate? I think unsubscribe the downstream
would be OK for this operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't had a chance to try this code yet, but I want to make sure this doesn't break the backpressure functionality where this operator requests Long.MAX_VALUE up.
sample with time does not support backpressure as it uses time to control data flow, right?
// if we have no outstanding groups (all completed or unsubscribe) and terminated/unsubscribed on outer | ||
if (groups.size() == 0 && (terminated == 1 || child.isUnsubscribed())) { | ||
if (groups.isEmpty() && (terminated == 1 || child.isUnsubscribed())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel some redundancy here. Maybe it is worth reviewing the other counters and state indicators.
This PR contains a lot. I will split it to 3 PRs. |
Fix #1958