Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues that GroupBy and Sample doesn't call 'unsubscribe' and also NPE when the key is null in GroupBy #1959

Closed
wants to merge 8 commits into from
72 changes: 59 additions & 13 deletions src/main/java/rx/internal/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand All @@ -34,6 +35,7 @@
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;

/**
* Groups the items emitted by an Observable according to a specified criterion, and emits these
Expand Down Expand Up @@ -76,6 +78,10 @@ static final class GroupBySubscriber<K, T, R> extends Subscriber<T> {
final Func1<? super T, ? extends R> elementSelector;
final Subscriber<? super GroupedObservable<K, R>> child;

@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<GroupBySubscriber> WIP_FOR_UNSUBSCRIBE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "wipForUnsubscribe");
volatile int wipForUnsubscribe = 1;

public GroupBySubscriber(
Func1<? super T, ? extends K> keySelector,
Func1<? super T, ? extends R> elementSelector,
Expand All @@ -84,6 +90,16 @@ public GroupBySubscriber(
this.keySelector = keySelector;
this.elementSelector = elementSelector;
this.child = child;
child.add(Subscriptions.create(new Action0() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good change. It was already correctly not emitting groups if it was unsubscribed, so this is only applicable to a scenario where there no groups have been emitted, such as a stream with no data, correct?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or I suppose a stream where all groups have been unsubscribed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a race condition. I used a lock to fix it... Any suggestion for a lock-free approach?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also use isEmpty instead of size() == 0. For ConcurrentXXX, size is usually more expensive than isEmpty

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a atomic wip counter I guess. But then you need a CAS loop to check if wip == 0 and do nothing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akarnokd could you take a look at my latest commit? Is it exactly what you mean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. I'd do

volatile int wip = 1;
...
@Override
public void call() {
    if (WIP.decrementAndGet(this) == 0) {
        self.unsubscribe();
    }
}

... createNewGroup()
for (;;) {
    int w = wip;
    if (w <= 0) {
        return;
    }
    if (WIP.compareAndSwap(this, w, w + 1)) {
      groups.putIfAbsent(key, groupState);
      ...
      break;
    }
}
...completeInner() {
...
if (WIP.decrementAndGet(this) == 0) {
    unsubscribe();
}   

In words, start out wip as 1 because there is the main subscription. Each group created ++wip conditionally: if wip = 0 then an unsubscription happened and there was no active group and thus don't create a new group. Once each group terminates or gets unsubscribed, --wip and if it reaches zero, upstream is unsubscribed. This of course assuming completeInner is idempotent per group.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Brilliant. Done.


@Override
public void call() {
if (WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(self) == 0) {
self.unsubscribe();
}
}

}));
}

private static class GroupState<K, T> {
Expand All @@ -102,7 +118,7 @@ public Observer<T> getObserver() {

}

private final ConcurrentHashMap<K, GroupState<K, T>> groups = new ConcurrentHashMap<K, GroupState<K, T>>();
private final ConcurrentHashMap<Object, GroupState<K, T>> groups = new ConcurrentHashMap<Object, GroupState<K, T>>();

private static final NotificationLite<Object> nl = NotificationLite.instance();

Expand Down Expand Up @@ -138,7 +154,7 @@ public void onCompleted() {
}

// special case (no groups emitted ... or all unsubscribed)
if (groups.size() == 0) {
if (groups.isEmpty()) {
// we must track 'completionEmitted' seperately from 'completed' since `completeInner` can result in childObserver.onCompleted() being emitted
if (COMPLETION_EMITTED_UPDATER.compareAndSet(this, 0, 1)) {
child.onCompleted();
Expand All @@ -150,8 +166,13 @@ public void onCompleted() {
@Override
public void onError(Throwable e) {
if (TERMINATED_UPDATER.compareAndSet(this, 0, 1)) {
// we immediately tear everything down if we receive an error
child.onError(e);
try {
// we immediately tear everything down if we receive an error
child.onError(e);
} finally {
// We have not chained the subscribers, so need to call it explicitly.
unsubscribe();
}
}
}

Expand All @@ -166,10 +187,18 @@ void requestFromGroupedObservable(long n, GroupState<K, T> group) {
}
}

private Object groupedKey(K key) {
return key == null ? NULL_KEY : key;
}

private K getKey(Object groupedKey) {
return groupedKey == NULL_KEY ? null : (K) groupedKey;
}

@Override
public void onNext(T t) {
try {
final K key = keySelector.call(t);
final Object key = groupedKey(keySelector.call(t));
GroupState<K, T> group = groups.get(key);
if (group == null) {
// this group doesn't exist
Expand All @@ -179,16 +208,18 @@ public void onNext(T t) {
}
group = createNewGroup(key);
}
emitItem(group, nl.next(t));
if (group != null) {
emitItem(group, nl.next(t));
}
} catch (Throwable e) {
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}

private GroupState<K, T> createNewGroup(final K key) {
private GroupState<K, T> createNewGroup(final Object key) {
final GroupState<K, T> groupState = new GroupState<K, T>();

GroupedObservable<K, R> go = GroupedObservable.create(key, new OnSubscribe<R>() {
GroupedObservable<K, R> go = GroupedObservable.create(getKey(key), new OnSubscribe<R>() {

@Override
public void call(final Subscriber<? super R> o) {
Expand Down Expand Up @@ -242,7 +273,17 @@ public void onNext(T t) {
}
});

GroupState<K, T> putIfAbsent = groups.putIfAbsent(key, groupState);
GroupState<K, T> putIfAbsent;
for (;;) {
int wip = wipForUnsubscribe;
if (wip <= 0) {
return null;
}
if (WIP_FOR_UNSUBSCRIBE_UPDATER.compareAndSet(this, wip, wip + 1)) {
putIfAbsent = groups.putIfAbsent(key, groupState);
break;
}
}
if (putIfAbsent != null) {
// this shouldn't happen (because we receive onNext sequentially) and would mean we have a bug
throw new IllegalStateException("Group already existed while creating a new one");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I forgot about this. This shouldn't happen if source is serialized, but if it happens, unsubscription can't happen because it would look like there is an active group even if there wasn't any GroupState for it. To defend against this, here we need that common if (wip.decrementAndGet() == 0) { unsubscribe(); }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also found onError didn't call unsubscribe. For this one, since it will throw an exception, if we call unsubscribe in onError, it would be safe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onError seems to be simplistic. I'd expect an onError comes down, the whole contraption should be shut down and both main and groups be notified of the error. Such termination happens in onCompleted. In addition, since we use BufferUntilSubscriber, its client may throw in its onNext while replaying (no idea where it will go) or in the direct-mode phase (where it will bubble back to the emitItem) and again not tearing down anything. Now if the main was observed through unsafeSubscribe, we can't know if the downstream will eventually unsubscribe upwards or not. In addition, if a group's onNext throws, does it need to tear down everything at all or just that specific group just like the group's unsubscribe()?

Expand All @@ -252,11 +293,11 @@ public void onNext(T t) {
return groupState;
}

private void cleanupGroup(K key) {
private void cleanupGroup(Object key) {
GroupState<K, T> removed;
removed = groups.remove(key);
if (removed != null) {
if (removed.buffer.size() > 0) {
if (!removed.buffer.isEmpty()) {
BUFFERED_COUNT.addAndGet(self, -removed.buffer.size());
}
completeInner();
Expand Down Expand Up @@ -334,16 +375,20 @@ private void drainIfPossible(GroupState<K, T> groupState) {
}

private void completeInner() {
if (WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(this) == 0) {
unsubscribe();
}
// if we have no outstanding groups (all completed or unsubscribe) and terminated/unsubscribed on outer
if (groups.size() == 0 && (terminated == 1 || child.isUnsubscribed())) {
if (groups.isEmpty() && (terminated == 1 || child.isUnsubscribed())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel some redundancy here. Maybe it is worth reviewing the other counters and state indicators.

// completionEmitted ensures we only emit onCompleted once
if (COMPLETION_EMITTED_UPDATER.compareAndSet(this, 0, 1)) {

if (child.isUnsubscribed()) {
// if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
unsubscribe();
} else {
child.onCompleted();
}
child.onCompleted();
}
}
}
Expand All @@ -357,4 +402,5 @@ public Object call(Object t) {
}
};

private static final Object NULL_KEY = new Object();
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ static final class SamplerSubscriber<T> extends Subscriber<T> implements Action0
static final AtomicReferenceFieldUpdater<SamplerSubscriber, Object> VALUE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(SamplerSubscriber.class, Object.class, "value");
public SamplerSubscriber(Subscriber<? super T> subscriber) {
super(subscriber);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will unsubscribe the downstream. I suggest instead having child.add(sampler); after L52.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't had a chance to try this code yet, but I want to make sure this doesn't break the backpressure functionality where this operator requests Long.MAX_VALUE up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will unsubscribe the downstream. I suggest instead having child.add(sampler); after L52.

Could you elaborate? I think unsubscribe the downstream would be OK for this operator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't had a chance to try this code yet, but I want to make sure this doesn't break the backpressure functionality where this operator requests Long.MAX_VALUE up.

sample with time does not support backpressure as it uses time to control data flow, right?

this.subscriber = subscriber;
}

Expand Down
52 changes: 51 additions & 1 deletion src/test/java/rx/internal/operators/OperatorGroupByTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -46,6 +47,7 @@
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.TestException;
import rx.functions.Action0;
import rx.functions.Action1;
Expand Down Expand Up @@ -1357,4 +1359,52 @@ public Observable<Integer> call(GroupedObservable<Integer, Integer> t) {

};

}
@Test
public void testGroupByUnsubscribe() {
final Subscription s = mock(Subscription.class);
Observable<Integer> o = Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.add(s);
}
}
);
o.groupBy(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer integer) {
return null;
}
}).subscribe().unsubscribe();
verify(s).unsubscribe();
}

@Test
public void testGroupWithNullKey() {
final String[] key = new String[]{"uninitialized"};
final List<String> values = new ArrayList<String>();
Observable.just("a", "b", "c").groupBy(new Func1<String, String>() {

@Override
public String call(String value) {
return null;
}
}).subscribe(new Action1<GroupedObservable<String, String>>() {

@Override
public void call(GroupedObservable<String, String> groupedObservable) {
key[0] = groupedObservable.getKey();
groupedObservable.subscribe(new Action1<String>() {

@Override
public void call(String s) {
values.add(s);
}
});
}
});
assertEquals(null, key[0]);
assertEquals(Arrays.asList("a", "b", "c"), values);
}
}
21 changes: 17 additions & 4 deletions src/test/java/rx/internal/operators/OperatorSampleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@
import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.schedulers.TestScheduler;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

public class OperatorSampleTest {
private TestScheduler scheduler;
Expand Down Expand Up @@ -271,4 +269,19 @@ public void sampleWithSamplerThrows() {
inOrder.verify(observer2, times(1)).onError(any(RuntimeException.class));
verify(observer, never()).onCompleted();
}

@Test
public void testSampleUnsubscribe() {
final Subscription s = mock(Subscription.class);
Observable<Integer> o = Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.add(s);
}
}
);
o.throttleLast(1, TimeUnit.MILLISECONDS).subscribe().unsubscribe();
verify(s).unsubscribe();
}
}