Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues that GroupBy and Sample doesn't call 'unsubscribe' and also NPE when the key is null in GroupBy #1959

Closed
wants to merge 8 commits into from
36 changes: 30 additions & 6 deletions src/main/java/rx/internal/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import rx.Observer;
import rx.Producer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;

/**
* Groups the items emitted by an Observable according to a specified criterion, and emits these
Expand Down Expand Up @@ -75,6 +77,7 @@ static final class GroupBySubscriber<K, T, R> extends Subscriber<T> {
final Func1<? super T, ? extends K> keySelector;
final Func1<? super T, ? extends R> elementSelector;
final Subscriber<? super GroupedObservable<K, R>> child;
final Subscription parentSubscription = this;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a self member variable which also points to this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.


public GroupBySubscriber(
Func1<? super T, ? extends K> keySelector,
Expand All @@ -84,6 +87,17 @@ public GroupBySubscriber(
this.keySelector = keySelector;
this.elementSelector = elementSelector;
this.child = child;
child.add(Subscriptions.create(new Action0() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good change. It was already correctly not emitting groups if it was unsubscribed, so this is only applicable to a scenario where there no groups have been emitted, such as a stream with no data, correct?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or I suppose a stream where all groups have been unsubscribed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a race condition. I used a lock to fix it... Any suggestion for a lock-free approach?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also use isEmpty instead of size() == 0. For ConcurrentXXX, size is usually more expensive than isEmpty

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a atomic wip counter I guess. But then you need a CAS loop to check if wip == 0 and do nothing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akarnokd could you take a look at my latest commit? Is it exactly what you mean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. I'd do

volatile int wip = 1;
...
@Override
public void call() {
    if (WIP.decrementAndGet(this) == 0) {
        self.unsubscribe();
    }
}

... createNewGroup()
for (;;) {
    int w = wip;
    if (w <= 0) {
        return;
    }
    if (WIP.compareAndSwap(this, w, w + 1)) {
      groups.putIfAbsent(key, groupState);
      ...
      break;
    }
}
...completeInner() {
...
if (WIP.decrementAndGet(this) == 0) {
    unsubscribe();
}   

In words, start out wip as 1 because there is the main subscription. Each group created ++wip conditionally: if wip = 0 then an unsubscription happened and there was no active group and thus don't create a new group. Once each group terminates or gets unsubscribed, --wip and if it reaches zero, upstream is unsubscribed. This of course assuming completeInner is idempotent per group.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Brilliant. Done.


@Override
public void call() {
// if no group we unsubscribe up otherwise wait until group ends
if (groups.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not certain what the correct thing to do here. If there are multiple groups in flight and the outer observable is unsubscribed, do we want to unsubscribe the open groups, enqueue onCompleted elements on them but let them run, or something else? Window has this anomaly as well where the unsubscribed outer may never deliver an onCompleted on the inner windows and thus they stall.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should have the same behavior for groupby and window, so I followed the discussion in #1546.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is crazy complicated logic. This operator was re-written 3 times I think to get all of that right so I'm trusting that the unit tests are making sure we're still correct.

We can not unsubscribe the inner groups when the outer receives an unsubscribe, they must all run but now we just don't emit any new groups.

The reason is that something like a take(2) on the outer is saying "take 2 groups", it's not saying "take 2 items from whatever groups they come from". So "take 2 groups" and then let the data flow through those groups. It's then up to those groups to finish or unsubscribe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the code makes more sense now with that requirement. Might worth checking Window for this as well.

parentSubscription.unsubscribe();
}
}

}));
}

private static class GroupState<K, T> {
Expand All @@ -102,7 +116,7 @@ public Observer<T> getObserver() {

}

private final ConcurrentHashMap<K, GroupState<K, T>> groups = new ConcurrentHashMap<K, GroupState<K, T>>();
private final ConcurrentHashMap<Object, GroupState<K, T>> groups = new ConcurrentHashMap<Object, GroupState<K, T>>();

private static final NotificationLite<Object> nl = NotificationLite.instance();

Expand Down Expand Up @@ -166,10 +180,18 @@ void requestFromGroupedObservable(long n, GroupState<K, T> group) {
}
}

private Object groupedKey(K key) {
return key == null ? NULL_KEY : key;
}

private K getKey(Object groupedKey) {
return groupedKey == NULL_KEY ? null : (K) groupedKey;
}

@Override
public void onNext(T t) {
try {
final K key = keySelector.call(t);
final Object key = groupedKey(keySelector.call(t));
GroupState<K, T> group = groups.get(key);
if (group == null) {
// this group doesn't exist
Expand All @@ -185,10 +207,10 @@ public void onNext(T t) {
}
}

private GroupState<K, T> createNewGroup(final K key) {
private GroupState<K, T> createNewGroup(final Object key) {
final GroupState<K, T> groupState = new GroupState<K, T>();

GroupedObservable<K, R> go = GroupedObservable.create(key, new OnSubscribe<R>() {
GroupedObservable<K, R> go = GroupedObservable.create(getKey(key), new OnSubscribe<R>() {

@Override
public void call(final Subscriber<? super R> o) {
Expand Down Expand Up @@ -252,7 +274,7 @@ public void onNext(T t) {
return groupState;
}

private void cleanupGroup(K key) {
private void cleanupGroup(Object key) {
GroupState<K, T> removed;
removed = groups.remove(key);
if (removed != null) {
Expand Down Expand Up @@ -342,8 +364,9 @@ private void completeInner() {
if (child.isUnsubscribed()) {
// if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
unsubscribe();
} else {
child.onCompleted();
}
child.onCompleted();
}
}
}
Expand All @@ -357,4 +380,5 @@ public Object call(Object t) {
}
};

private static final Object NULL_KEY = new Object();
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ static final class SamplerSubscriber<T> extends Subscriber<T> implements Action0
static final AtomicReferenceFieldUpdater<SamplerSubscriber, Object> VALUE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(SamplerSubscriber.class, Object.class, "value");
public SamplerSubscriber(Subscriber<? super T> subscriber) {
super(subscriber);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will unsubscribe the downstream. I suggest instead having child.add(sampler); after L52.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't had a chance to try this code yet, but I want to make sure this doesn't break the backpressure functionality where this operator requests Long.MAX_VALUE up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will unsubscribe the downstream. I suggest instead having child.add(sampler); after L52.

Could you elaborate? I think unsubscribe the downstream would be OK for this operator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't had a chance to try this code yet, but I want to make sure this doesn't break the backpressure functionality where this operator requests Long.MAX_VALUE up.

sample with time does not support backpressure as it uses time to control data flow, right?

this.subscriber = subscriber;
}

Expand Down
52 changes: 51 additions & 1 deletion src/test/java/rx/internal/operators/OperatorGroupByTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -46,6 +47,7 @@
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.TestException;
import rx.functions.Action0;
import rx.functions.Action1;
Expand Down Expand Up @@ -1357,4 +1359,52 @@ public Observable<Integer> call(GroupedObservable<Integer, Integer> t) {

};

}
@Test
public void testGroupByUnsubscribe() {
final Subscription s = mock(Subscription.class);
Observable<Integer> o = Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.add(s);
}
}
);
o.groupBy(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer integer) {
return null;
}
}).subscribe().unsubscribe();
verify(s).unsubscribe();
}

@Test
public void testGroupWithNullKey() {
final String[] key = new String[]{"uninitialized"};
final List<String> values = new ArrayList<String>();
Observable.just("a", "b", "c").groupBy(new Func1<String, String>() {

@Override
public String call(String value) {
return null;
}
}).subscribe(new Action1<GroupedObservable<String, String>>() {

@Override
public void call(GroupedObservable<String, String> groupedObservable) {
key[0] = groupedObservable.getKey();
groupedObservable.subscribe(new Action1<String>() {

@Override
public void call(String s) {
values.add(s);
}
});
}
});
assertEquals(null, key[0]);
assertEquals(Arrays.asList("a", "b", "c"), values);
}
}
21 changes: 17 additions & 4 deletions src/test/java/rx/internal/operators/OperatorSampleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@
import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.schedulers.TestScheduler;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

public class OperatorSampleTest {
private TestScheduler scheduler;
Expand Down Expand Up @@ -271,4 +269,19 @@ public void sampleWithSamplerThrows() {
inOrder.verify(observer2, times(1)).onError(any(RuntimeException.class));
verify(observer, never()).onCompleted();
}

@Test
public void testSampleUnsubscribe() {
final Subscription s = mock(Subscription.class);
Observable<Integer> o = Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.add(s);
}
}
);
o.throttleLast(1, TimeUnit.MILLISECONDS).subscribe().unsubscribe();
verify(s).unsubscribe();
}
}