diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index dd415d887c2..c19331c8ec0 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -1,18 +1,18 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ + /** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.subscriptions; import static java.util.Arrays.asList; @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -30,50 +31,60 @@ /** * Subscription that represents a group of Subscriptions that are unsubscribed * together. - * + * * @see Rx.Net * equivalent CompositeDisposable */ public class CompositeSubscription implements Subscription { - private static final Set MUTATE_STATE = unmodifiableSet(new HashSet()); - private static final Set UNSUBSCRIBED_STATE = unmodifiableSet(new HashSet()); - + /** Sentinel to indicate a thread is modifying the subscription set. */ + private static final Set MUTATE_SENTINEL = unmodifiableSet(Collections.emptySet()); + /** Sentinel to indicate the entire CompositeSubscription has been unsubscribed.*/ + private static final Set UNSUBSCRIBED_SENTINEL = unmodifiableSet(Collections.emptySet()); + /** The reference to the set of subscriptions. */ private final AtomicReference> reference = new AtomicReference>(); - + public CompositeSubscription(final Subscription... subscriptions) { reference.set(new HashSet(asList(subscriptions))); } - + public boolean isUnsubscribed() { - return reference.get() == UNSUBSCRIBED_STATE; + return reference.get() == UNSUBSCRIBED_SENTINEL; } - + public void add(final Subscription s) { do { final Set existing = reference.get(); - if (existing == UNSUBSCRIBED_STATE) { + if (existing == UNSUBSCRIBED_SENTINEL) { s.unsubscribe(); break; } - - if (reference.compareAndSet(existing, MUTATE_STATE)) { + + if (existing == MUTATE_SENTINEL) { + continue; + } + + if (reference.compareAndSet(existing, MUTATE_SENTINEL)) { existing.add(s); reference.set(existing); break; } } while (true); } - + public void remove(final Subscription s) { do { final Set subscriptions = reference.get(); - if (subscriptions == UNSUBSCRIBED_STATE) { + if (subscriptions == UNSUBSCRIBED_SENTINEL) { s.unsubscribe(); break; } - - if (reference.compareAndSet(subscriptions, MUTATE_STATE)) { + + if (subscriptions == MUTATE_SENTINEL) { + continue; + } + + if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) { // also unsubscribe from it: // http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx subscriptions.remove(s); @@ -83,54 +94,66 @@ public void remove(final Subscription s) { } } while (true); } - + public void clear() { do { final Set subscriptions = reference.get(); - if (subscriptions == UNSUBSCRIBED_STATE) { + if (subscriptions == UNSUBSCRIBED_SENTINEL) { break; } - - if (reference.compareAndSet(subscriptions, MUTATE_STATE)) { + + if (subscriptions == MUTATE_SENTINEL) { + continue; + } + + if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) { final Set copy = new HashSet( subscriptions); subscriptions.clear(); reference.set(subscriptions); - - for (final Subscription subscription : copy) { - subscription.unsubscribe(); - } + + unsubscribeAll(copy); break; } } while (true); } - + /** + * Unsubscribe from the collection of subscriptions. + *

+ * Exceptions thrown by any of the {@code unsubscribe()} methods are + * collected into a {@link CompositeException} and thrown once + * all unsubscriptions have been attempted. + * @param subs the collection of subscriptions + */ + private void unsubscribeAll(Collection subs) { + final Collection es = new ArrayList(); + for (final Subscription s : subs) { + try { + s.unsubscribe(); + } catch (final Throwable e) { + es.add(e); + } + } + if (!es.isEmpty()) { + throw new CompositeException( + "Failed to unsubscribe to 1 or more subscriptions.", es); + } + } @Override public void unsubscribe() { do { final Set subscriptions = reference.get(); - if (subscriptions == UNSUBSCRIBED_STATE) { + if (subscriptions == UNSUBSCRIBED_SENTINEL) { break; } - - if (subscriptions == MUTATE_STATE) { + + if (subscriptions == MUTATE_SENTINEL) { continue; } - - if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_STATE)) { - final Collection es = new ArrayList(); - for (final Subscription s : subscriptions) { - try { - s.unsubscribe(); - } catch (final Throwable e) { - es.add(e); - } - } - if (es.isEmpty()) { - break; - } - throw new CompositeException( - "Failed to unsubscribe to 1 or more subscriptions.", es); + + if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_SENTINEL)) { + unsubscribeAll(subscriptions); + break; } } while (true); } diff --git a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java index 74ed285f773..8fed35fbbfc 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java @@ -27,34 +27,41 @@ * @see Rx.Net equivalent MultipleAssignmentDisposable */ public class MultipleAssignmentSubscription implements Subscription { - - private final AtomicBoolean unsubscribed = new AtomicBoolean(false); - private AtomicReference subscription = new AtomicReference(); - + private AtomicReference reference = new AtomicReference(); + /** Sentinel for the unsubscribed state. */ + private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() { + @Override + public void unsubscribe() { + } + }; public boolean isUnsubscribed() { - return unsubscribed.get(); + return reference.get() == UNSUBSCRIBED_SENTINEL; } @Override - public synchronized void unsubscribe() { - unsubscribed.set(true); - Subscription s = getSubscription(); + public void unsubscribe() { + Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL); if (s != null) { s.unsubscribe(); } - } - public synchronized void setSubscription(Subscription s) { - if (unsubscribed.get()) { - s.unsubscribe(); - } else { - subscription.set(s); - } + public void setSubscription(Subscription s) { + do { + Subscription r = reference.get(); + if (r == UNSUBSCRIBED_SENTINEL) { + s.unsubscribe(); + return; + } + if (reference.compareAndSet(r, s)) { + break; + } + } while (true); } public Subscription getSubscription() { - return subscription.get(); + Subscription s = reference.get(); + return s != UNSUBSCRIBED_SENTINEL ? s : Subscriptions.empty(); } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java index bea8c82f1ee..a4747caa9b7 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java @@ -16,6 +16,8 @@ package rx.subscriptions; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import rx.Subscription; /** @@ -25,10 +27,24 @@ * @see MSDN RefCountDisposable */ public class RefCountSubscription implements Subscription { - private final Object guard = new Object(); - private Subscription main; - private boolean done; - private int count; + /** The state for the atomic operations. */ + private enum State { + ACTIVE, + MUTATING, + UNSUBSCRIBED + } + /** The reference to the actual subscription. */ + private volatile Subscription main; + /** The current state. */ + private final AtomicReference state = new AtomicReference(); + /** Counts the number of sub-subscriptions. */ + private final AtomicInteger count = new AtomicInteger(); + /** Indicate the request to unsubscribe from the main. */ + private final AtomicBoolean mainDone = new AtomicBoolean(); + /** + * Create a RefCountSubscription by wrapping the given non-null Subscription. + * @param s + */ public RefCountSubscription(Subscription s) { if (s == null) { throw new IllegalArgumentException("s"); @@ -39,54 +55,76 @@ public RefCountSubscription(Subscription s) { * Returns a new sub-subscription. */ public Subscription getSubscription() { - synchronized (guard) { - if (main == null) { + do { + State s = state.get(); + if (s == State.UNSUBSCRIBED) { return Subscriptions.empty(); - } else { - count++; + } + if (s == State.MUTATING) { + continue; + } + if (state.compareAndSet(s, State.MUTATING)) { + count.incrementAndGet(); + state.set(State.ACTIVE); return new InnerSubscription(); } - } + } while(true); } /** * Check if this subscription is already unsubscribed. */ public boolean isUnsubscribed() { - synchronized (guard) { - return main == null; - } + return state.get() == State.UNSUBSCRIBED; } @Override public void unsubscribe() { - Subscription s = null; - synchronized (guard) { - if (main != null && !done) { - done = true; - if (count == 0) { - s = main; - main = null; + do { + State s = state.get(); + if (s == State.UNSUBSCRIBED) { + return; + } + if (s == State.MUTATING) { + continue; + } + if (state.compareAndSet(s, State.MUTATING)) { + if (mainDone.compareAndSet(false, true) && count.get() == 0) { + terminate(); + return; } + state.set(State.ACTIVE); + break; } - } - if (s != null) { - s.unsubscribe(); - } + } while (true); + } + /** + * Terminate this subscription by unsubscribing from main and setting the + * state to UNSUBSCRIBED. + */ + private void terminate() { + state.set(State.UNSUBSCRIBED); + Subscription r = main; + main = null; + r.unsubscribe(); } /** Remove an inner subscription. */ void innerDone() { - Subscription s = null; - synchronized (guard) { - if (main != null) { - count--; - if (done && count == 0) { - s = main; - main = null; + do { + State s = state.get(); + if (s == State.UNSUBSCRIBED) { + return; + } + if (s == State.MUTATING) { + continue; + } + if (state.compareAndSet(s, State.MUTATING)) { + if (count.decrementAndGet() == 0 && mainDone.get()) { + terminate(); + return; } + state.set(State.ACTIVE); + break; } - } - if (s != null) { - s.unsubscribe(); - } + } while (true); } /** The individual sub-subscriptions. */ class InnerSubscription implements Subscription { diff --git a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java index 9aaa3e9a59b..e26a258acc5 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java @@ -29,22 +29,27 @@ */ public class SerialSubscription implements Subscription { private final AtomicReference reference = new AtomicReference(empty()); - - private static final Subscription UNSUBSCRIBED = new Subscription() { + /** Sentinel for the unsubscribed state. */ + private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() { @Override public void unsubscribe() { } }; - + public boolean isUnsubscribed() { + return reference.get() == UNSUBSCRIBED_SENTINEL; + } @Override public void unsubscribe() { - setSubscription(UNSUBSCRIBED); + Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL); + if (s != null) { + s.unsubscribe(); + } } public void setSubscription(final Subscription subscription) { do { final Subscription current = reference.get(); - if (current == UNSUBSCRIBED) { + if (current == UNSUBSCRIBED_SENTINEL) { subscription.unsubscribe(); break; } @@ -57,6 +62,6 @@ public void setSubscription(final Subscription subscription) { public Subscription getSubscription() { final Subscription subscription = reference.get(); - return subscription == UNSUBSCRIBED ? null : subscription; + return subscription == UNSUBSCRIBED_SENTINEL ? Subscriptions.empty() : subscription; } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java index ae79bc62a1d..c960db2ea40 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java @@ -32,7 +32,7 @@ public final class SingleAssignmentSubscription implements Subscription { /** Holds the current resource. */ private final AtomicReference current = new AtomicReference(); /** Sentinel for the unsubscribed state. */ - private static final Subscription SENTINEL = new Subscription() { + private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() { @Override public void unsubscribe() { } @@ -42,7 +42,7 @@ public void unsubscribe() { */ public Subscription get() { Subscription s = current.get(); - if (s == SENTINEL) { + if (s == UNSUBSCRIBED_SENTINEL) { return Subscriptions.empty(); } return s; @@ -57,7 +57,7 @@ public void set(Subscription s) { if (current.compareAndSet(null, s)) { return; } - if (current.get() != SENTINEL) { + if (current.get() != UNSUBSCRIBED_SENTINEL) { throw new IllegalStateException("Subscription already set"); } if (s != null) { @@ -66,7 +66,7 @@ public void set(Subscription s) { } @Override public void unsubscribe() { - Subscription old = current.getAndSet(SENTINEL); + Subscription old = current.getAndSet(UNSUBSCRIBED_SENTINEL); if (old != null) { old.unsubscribe(); } @@ -75,7 +75,7 @@ public void unsubscribe() { * Test if this subscription is already unsubscribed. */ public boolean isUnsubscribed() { - return current.get() == SENTINEL; + return current.get() == UNSUBSCRIBED_SENTINEL; } } diff --git a/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java new file mode 100644 index 00000000000..9fbed891534 --- /dev/null +++ b/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java @@ -0,0 +1,68 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import rx.Subscription; +import static rx.subscriptions.Subscriptions.create; +import rx.util.functions.Action0; + +public class MultipleAssignmentSubscriptionTest { + Action0 unsubscribe; + Subscription s; + @Before + public void before() { + unsubscribe = mock(Action0.class); + s = create(unsubscribe); + } + @Test + public void testNoUnsubscribeWhenReplaced() { + MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + + mas.setSubscription(s); + mas.setSubscription(null); + mas.unsubscribe(); + + verify(unsubscribe, never()).call(); + + } + @Test + public void testUnsubscribeWhenParentUnsubscribes() { + MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + mas.setSubscription(s); + mas.unsubscribe(); + mas.unsubscribe(); + + verify(unsubscribe, times(1)).call(); + + Assert.assertEquals(true, mas.isUnsubscribed()); + } + @Test + public void testUnsubscribedDoesntLeakSentinel() { + MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + + mas.setSubscription(s); + mas.unsubscribe(); + + Assert.assertEquals(true, mas.getSubscription() == Subscriptions.empty()); + } +} \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/subscriptions/RefCountSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/RefCountSubscriptionTest.java new file mode 100644 index 00000000000..d11899f9030 --- /dev/null +++ b/rxjava-core/src/test/java/rx/subscriptions/RefCountSubscriptionTest.java @@ -0,0 +1,108 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Mockito.*; +import rx.Subscription; +import static rx.subscriptions.Subscriptions.create; +import rx.util.functions.Action0; + +public class RefCountSubscriptionTest { + Action0 main; + RefCountSubscription rcs; + @Before + public void before() { + main = mock(Action0.class); + rcs = new RefCountSubscription(create(main)); + } + @Test + public void testImmediateUnsubscribe() { + InOrder inOrder = inOrder(main); + + rcs.unsubscribe(); + + inOrder.verify(main, times(1)).call(); + + rcs.unsubscribe(); + + inOrder.verifyNoMoreInteractions(); + } + @Test + public void testRCSUnsubscribeBeforeClient() { + InOrder inOrder = inOrder(main); + + Subscription s = rcs.getSubscription(); + + rcs.unsubscribe(); + + inOrder.verify(main, never()).call(); + + s.unsubscribe(); + + inOrder.verify(main, times(1)).call(); + + rcs.unsubscribe(); + s.unsubscribe(); + + inOrder.verifyNoMoreInteractions(); + + } + @Test + public void testMultipleClientsUnsubscribeFirst() { + InOrder inOrder = inOrder(main); + + Subscription s1 = rcs.getSubscription(); + Subscription s2 = rcs.getSubscription(); + + s1.unsubscribe(); + inOrder.verify(main, never()).call(); + s2.unsubscribe(); + inOrder.verify(main, never()).call(); + + rcs.unsubscribe(); + inOrder.verify(main, times(1)).call(); + + s1.unsubscribe(); + s2.unsubscribe(); + rcs.unsubscribe(); + + inOrder.verifyNoMoreInteractions(); + } + @Test + public void testMultipleClientsMainUnsubscribeFirst() { + InOrder inOrder = inOrder(main); + + Subscription s1 = rcs.getSubscription(); + Subscription s2 = rcs.getSubscription(); + + rcs.unsubscribe(); + inOrder.verify(main, never()).call(); + s1.unsubscribe(); + inOrder.verify(main, never()).call(); + s2.unsubscribe(); + + inOrder.verify(main, times(1)).call(); + + s1.unsubscribe(); + s2.unsubscribe(); + rcs.unsubscribe(); + + inOrder.verifyNoMoreInteractions(); + } +} diff --git a/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java b/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java index 1569d9c7dff..4afadb6f9c3 100644 --- a/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java +++ b/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java @@ -49,11 +49,11 @@ public void unsubscribingWithoutUnderlyingDoesNothing() { } @Test - public void getSubscriptionShouldReturnSubscriptionAfterUnsubscribe() { + public void getSubscriptionShouldReturnEmptySubscriptionAfterUnsubscribe() { final Subscription underlying = mock(Subscription.class); serialSubscription.setSubscription(underlying); serialSubscription.unsubscribe(); - assertEquals(null, serialSubscription.getSubscription()); + assertEquals(Subscriptions.empty(), serialSubscription.getSubscription()); } @Test