From 4f68c5a21a1d792b966abfd93b8b705d82b8f857 Mon Sep 17 00:00:00 2001 From: Gerben van den Broeke Date: Wed, 5 Jun 2013 16:42:30 +0200 Subject: [PATCH 1/5] Add unit test for sub/unsub/sub bug. --- .../java/rx/operators/OperationGroupBy.java | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java index 270c2de96f..228a5165c7 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java @@ -16,6 +16,7 @@ package rx.operators; import static org.junit.Assert.*; +import static org.mockito.Mockito.*; import java.util.Arrays; import java.util.Collection; @@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; +import org.mockito.InOrder; import rx.Observable; import rx.Observer; @@ -556,6 +558,78 @@ public String toString() { } } + /* + * Test subscribing to a group, unsubscribing from it again, and subscribing to a next group + */ + @Test + public void testSubscribeAndImmediatelyUnsubscribeFirstGroup() { + CounterSource source = new CounterSource(); + @SuppressWarnings("unchecked") + final Observer observer = mock(Observer.class); + + Func1 modulo2 = new Func1() { + @Override + public Integer call(Integer x) { + return x%2; + } + }; + + Subscription outerSubscription = source.groupBy(modulo2).subscribe(new Action1>() { + @Override + public void call(GroupedObservable group) { + Subscription innerSubscription = group.subscribe(observer); + if (group.getKey() == 0) { + // We immediately unsubscribe again from the even numbers + innerSubscription.unsubscribe(); + // We should still get the group of odd numbers + } + } + }); + try { + source.thread.join(); + } catch (InterruptedException ex) { + } + + InOrder o = inOrder(observer); + // With a different implementation that subscribes to the group concurrently, we might actually receive 0. + o.verify(observer, never()).onNext(0); + o.verify(observer).onNext(1); + o.verify(observer, never()).onNext(2); + o.verify(observer).onNext(3); + o.verify(observer, never()).onNext(4); + o.verify(observer).onNext(5); + o.verify(observer, never()).onNext(6); + o.verify(observer).onNext(7); + o.verify(observer, never()).onNext(8); + o.verify(observer).onNext(9); + } + + private class CounterSource extends Observable { + public Thread thread = null; + @Override + public Subscription subscribe(final Observer observer) { + thread = new Thread(new Runnable() { + @Override + public void run() { + int i = 0; + while (i < 10) { + observer.onNext(i++); + if (Thread.interrupted()) { + return; + } + } + } + }); + thread.start(); + return new Subscription() { + @Override + public void unsubscribe() { + thread.interrupt(); + } + }; + } + } + } } From 22cd3189b968d4d4c43ad898e0a3caa3fb8c91fd Mon Sep 17 00:00:00 2001 From: Gerben van den Broeke Date: Wed, 5 Jun 2013 19:48:35 +0200 Subject: [PATCH 2/5] Fix sub/unsub/sub bug. --- rxjava-core/src/main/java/rx/operators/OperationGroupBy.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java index 228a5165c7..827136fffc 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java @@ -137,6 +137,7 @@ public void unsubscribe() { if (numGroupSubscriptions.get() == 0) { // if we have no group subscriptions we will unsubscribe actualParentSubscription.unsubscribe(); + } else { // otherwise we mark to not send any more groups (waiting on existing groups to finish) unsubscribeRequested.set(true); } @@ -160,7 +161,7 @@ private void subscribeKey(K key) { */ private void unsubscribeKey(K key) { int c = numGroupSubscriptions.decrementAndGet(); - if (c == 0) { + if (c == 0 && unsubscribeRequested.get()) { actualParentSubscription.unsubscribe(); } } From 787ae6ae3e5be70e4f904d293b531d0300d1bc55 Mon Sep 17 00:00:00 2001 From: Gerben van den Broeke Date: Wed, 5 Jun 2013 21:09:57 +0200 Subject: [PATCH 3/5] Add unit test for subscribing twice to GroupBy --- .../java/rx/operators/OperationGroupBy.java | 72 ++++++++++++++++++- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java index 827136fffc..85f8140c1f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java @@ -19,7 +19,9 @@ import static org.mockito.Mockito.*; import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -587,7 +589,9 @@ public void call(GroupedObservable group) { } }); try { - source.thread.join(); + for (Thread t : source.threads) { + t.join(); + } } catch (InterruptedException ex) { } @@ -605,11 +609,72 @@ public void call(GroupedObservable group) { o.verify(observer).onNext(9); } + @Test + public void testSubscribeTwoTimesToGroupBy() { + CounterSource source = new CounterSource(); + @SuppressWarnings("unchecked") + final Observer observer1 = mock(Observer.class); + @SuppressWarnings("unchecked") + final Observer observer2 = mock(Observer.class); + + Func1 modulo2 = new Func1() { + @Override + public Integer call(Integer x) { + return x % 2; + } + }; + + Observable> groups = source.groupBy(modulo2); + + Subscription outerSubscription1 = groups.subscribe(new Action1>() { + @Override + public void call(GroupedObservable group) { + group.subscribe(observer1); + } + }); + Subscription outerSubscription2 = groups.subscribe(new Action1>() { + @Override + public void call(GroupedObservable group) { + group.subscribe(observer2); + } + }); + try { + for (Thread t : source.threads) { + t.join(); + } + } catch (InterruptedException ex) { + } + + // Receival of values by observer1 and observer2 can be interleaved, so use inOrder on them separately + InOrder o1 = inOrder(observer1); + o1.verify(observer1).onNext(0); + o1.verify(observer1).onNext(1); + o1.verify(observer1).onNext(2); + o1.verify(observer1).onNext(3); + o1.verify(observer1).onNext(4); + o1.verify(observer1).onNext(5); + o1.verify(observer1).onNext(6); + o1.verify(observer1).onNext(7); + o1.verify(observer1).onNext(8); + o1.verify(observer1).onNext(9); + InOrder o2 = inOrder(observer2); + o2.verify(observer2).onNext(0); + o2.verify(observer2).onNext(1); + o2.verify(observer2).onNext(2); + o2.verify(observer2).onNext(3); + o2.verify(observer2).onNext(4); + o2.verify(observer2).onNext(5); + o2.verify(observer2).onNext(6); + o2.verify(observer2).onNext(7); + o2.verify(observer2).onNext(8); + o2.verify(observer2).onNext(9); + } + private class CounterSource extends Observable { - public Thread thread = null; + public List threads = new ArrayList(); @Override public Subscription subscribe(final Observer observer) { - thread = new Thread(new Runnable() { + final Thread thread = new Thread(new Runnable() { @Override public void run() { int i = 0; @@ -622,6 +687,7 @@ public void run() { } }); thread.start(); + threads.add(thread); return new Subscription() { @Override public void unsubscribe() { From fc89a79ccd679aaa8902ed247f06bfb6e120b96a Mon Sep 17 00:00:00 2001 From: Gerben van den Broeke Date: Wed, 5 Jun 2013 23:02:05 +0200 Subject: [PATCH 4/5] Fix bug to allow subscribing several times --- .../java/rx/operators/OperationGroupBy.java | 131 +++++++++--------- 1 file changed, 68 insertions(+), 63 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java index 85f8140c1f..61857fef9e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java @@ -68,10 +68,6 @@ public static Func1>, Subscription> grou private static class GroupBy implements Func1>, Subscription> { private final Observable> source; - private final ConcurrentHashMap> groupedObservables = new ConcurrentHashMap>(); - private final AtomicObservableSubscription actualParentSubscription = new AtomicObservableSubscription(); - private final AtomicInteger numGroupSubscriptions = new AtomicInteger(); - private final AtomicBoolean unsubscribeRequested = new AtomicBoolean(false); private GroupBy(Observable> source) { this.source = source; @@ -79,78 +75,89 @@ private GroupBy(Observable> source) { @Override public Subscription call(final Observer> observer) { - final GroupBy _this = this; - actualParentSubscription.wrap(source.subscribe(new Observer>() { - - @Override - public void onCompleted() { - // we need to propagate to all children I imagine ... we can't just leave all of those Observable/Observers hanging - for (GroupedSubject o : groupedObservables.values()) { - o.onCompleted(); - } - // now the parent - observer.onCompleted(); - } - - @Override - public void onError(Exception e) { - // we need to propagate to all children I imagine ... we can't just leave all of those Observable/Observers hanging - for (GroupedSubject o : groupedObservables.values()) { - o.onError(e); - } - // now the parent - observer.onError(e); - } - - @Override - public void onNext(KeyValue value) { - GroupedSubject gs = groupedObservables.get(value.key); - if (gs == null) { - if (unsubscribeRequested.get()) { - // unsubscribe has been requested so don't create new groups - // only send data to groups already created - return; - } - /* - * Technically the source should be single-threaded so we shouldn't need to do this but I am - * programming defensively as most operators are so this can work with a concurrent sequence - * if it ends up receiving one. - */ - GroupedSubject newGs = GroupedSubject. create(value.key, _this); - GroupedSubject existing = groupedObservables.putIfAbsent(value.key, newGs); - if (existing == null) { - // we won so use the one we created - gs = newGs; - // since we won the creation we emit this new GroupedObservable - observer.onNext(gs); - } else { - // another thread beat us so use the existing one - gs = existing; - } - } - gs.onNext(value.value); - } - })); + final AtomicObservableSubscription actualParentSubscription = new AtomicObservableSubscription(); + final GroupByObserver internalObserver = new GroupByObserver(observer, actualParentSubscription); + actualParentSubscription.wrap(source.subscribe(internalObserver)); return new Subscription() { @Override public void unsubscribe() { - if (numGroupSubscriptions.get() == 0) { + if (internalObserver.numGroupSubscriptions.get() == 0) { // if we have no group subscriptions we will unsubscribe actualParentSubscription.unsubscribe(); } else { // otherwise we mark to not send any more groups (waiting on existing groups to finish) - unsubscribeRequested.set(true); + internalObserver.unsubscribeRequested.set(true); } } }; } + } + + private static class GroupByObserver implements Observer> { + private final ConcurrentHashMap> groupedObservables = new ConcurrentHashMap>(); + private final AtomicInteger numGroupSubscriptions = new AtomicInteger(); + private final AtomicBoolean unsubscribeRequested = new AtomicBoolean(false); + private final Observer> observer; + private final Subscription actualParentSubscription; + + private GroupByObserver(Observer> observer, Subscription actualParentSubscription) { + this.observer = observer; + this.actualParentSubscription = actualParentSubscription; + } + + @Override + public void onCompleted() { + // we need to propagate to all children I imagine ... we can't just leave all of those Observable/Observers hanging + for (GroupedSubject o : groupedObservables.values()) { + o.onCompleted(); + } + // now the parent + observer.onCompleted(); + } + + @Override + public void onError(Exception e) { + // we need to propagate to all children I imagine ... we can't just leave all of those Observable/Observers hanging + for (GroupedSubject o : groupedObservables.values()) { + o.onError(e); + } + // now the parent + observer.onError(e); + } + + @Override + public void onNext(KeyValue value) { + GroupedSubject gs = groupedObservables.get(value.key); + if (gs == null) { + if (unsubscribeRequested.get()) { + // unsubscribe has been requested so don't create new groups + // only send data to groups already created + return; + } + /* + * Technically the source should be single-threaded so we shouldn't need to do this but I am + * programming defensively as most operators are so this can work with a concurrent sequence + * if it ends up receiving one. + */ + GroupedSubject newGs = GroupedSubject. create(value.key, this); + GroupedSubject existing = groupedObservables.putIfAbsent(value.key, newGs); + if (existing == null) { + // we won so use the one we created + gs = newGs; + // since we won the creation we emit this new GroupedObservable + observer.onNext(gs); + } else { + // another thread beat us so use the existing one + gs = existing; + } + } + gs.onNext(value.value); + } /** * Children notify of being subscribed to. - * - * @param key */ private void subscribeKey(K key) { numGroupSubscriptions.incrementAndGet(); @@ -158,8 +165,6 @@ private void subscribeKey(K key) { /** * Children notify of being unsubscribed from. - * - * @param key */ private void unsubscribeKey(K key) { int c = numGroupSubscriptions.decrementAndGet(); @@ -171,7 +176,7 @@ private void unsubscribeKey(K key) { private static class GroupedSubject extends GroupedObservable implements Observer { - static GroupedSubject create(final K key, final GroupBy parent) { + static GroupedSubject create(final K key, final GroupByObserver parent) { @SuppressWarnings("unchecked") final AtomicReference> subscribedObserver = new AtomicReference>(EMPTY_OBSERVER); From 42af69d1769f88a6d1f21d10b47112e09c743819 Mon Sep 17 00:00:00 2001 From: Gerben van den Broeke Date: Sun, 30 Jun 2013 16:16:33 +0200 Subject: [PATCH 5/5] Add a bit of documentation --- .../java/rx/operators/OperationGroupBy.java | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java index 61857fef9e..93a822699b 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java @@ -95,6 +95,17 @@ public void unsubscribe() { } } + /** + * The internal observer that is spawned on a subscription to GroupBy. + * Whenever it receives a new KeyValue from the source, it passes the value to the correct GroupedObservable(Subject), + * which is determined by the key. If no GroupedObservable existed for this key, a new one is created first and passed + * to the primary observer. + * When an emitted GroupedObservable is subscribed to, it notifies this internal observer, which keeps count of the + * number of these subscriptions. When both the primary observer has unsubscribed (thus no groups are emitted anymore), + * and no groups have any observers left, the whole thing is considered done and the upstream subscription is terminated. + * Note that to function reliably, this implementation therefore requires that emitted GroupedObservables are subscribed + * to immediately, and not stored for later use (after the GroupBy has been unsubscribed from). + */ private static class GroupByObserver implements Observer> { private final ConcurrentHashMap> groupedObservables = new ConcurrentHashMap>(); private final AtomicInteger numGroupSubscriptions = new AtomicInteger(); @@ -137,10 +148,10 @@ public void onNext(KeyValue value) { return; } /* - * Technically the source should be single-threaded so we shouldn't need to do this but I am - * programming defensively as most operators are so this can work with a concurrent sequence - * if it ends up receiving one. - */ + * Technically the source should be single-threaded so we shouldn't need to do this but I am + * programming defensively as most operators are so this can work with a concurrent sequence + * if it ends up receiving one. + */ GroupedSubject newGs = GroupedSubject. create(value.key, this); GroupedSubject existing = groupedObservables.putIfAbsent(value.key, newGs); if (existing == null) { @@ -174,6 +185,9 @@ private void unsubscribeKey(K key) { } } + /** + * The GroupedObservables that are passed to the observer. + */ private static class GroupedSubject extends GroupedObservable implements Observer { static GroupedSubject create(final K key, final GroupByObserver parent) {