(
subscriptions);
subscriptions.clear();
reference.set(subscriptions);
-
- for (final Subscription subscription : copy) {
- subscription.unsubscribe();
- }
+
+ unsubscribeAll(copy);
break;
}
} while (true);
}
-
+ /**
+ * Unsubscribe from the collection of subscriptions.
+ *
+ * Exceptions thrown by any of the {@code unsubscribe()} methods are
+ * collected into a {@link CompositeException} and thrown once
+ * all unsubscriptions have been attempted.
+ * @param subs the collection of subscriptions
+ */
+ private void unsubscribeAll(Collection subs) {
+ final Collection es = new ArrayList();
+ for (final Subscription s : subs) {
+ try {
+ s.unsubscribe();
+ } catch (final Throwable e) {
+ es.add(e);
+ }
+ }
+ if (!es.isEmpty()) {
+ throw new CompositeException(
+ "Failed to unsubscribe to 1 or more subscriptions.", es);
+ }
+ }
@Override
public void unsubscribe() {
do {
final Set subscriptions = reference.get();
- if (subscriptions == UNSUBSCRIBED_STATE) {
+ if (subscriptions == UNSUBSCRIBED_SENTINEL) {
break;
}
-
- if (subscriptions == MUTATE_STATE) {
+
+ if (subscriptions == MUTATE_SENTINEL) {
continue;
}
-
- if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_STATE)) {
- final Collection es = new ArrayList();
- for (final Subscription s : subscriptions) {
- try {
- s.unsubscribe();
- } catch (final Throwable e) {
- es.add(e);
- }
- }
- if (es.isEmpty()) {
- break;
- }
- throw new CompositeException(
- "Failed to unsubscribe to 1 or more subscriptions.", es);
+
+ if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_SENTINEL)) {
+ unsubscribeAll(subscriptions);
+ break;
}
} while (true);
}
diff --git a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java
index 74ed285f77..8fed35fbbf 100644
--- a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java
+++ b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java
@@ -27,34 +27,41 @@
* @see Rx.Net equivalent MultipleAssignmentDisposable
*/
public class MultipleAssignmentSubscription implements Subscription {
-
- private final AtomicBoolean unsubscribed = new AtomicBoolean(false);
- private AtomicReference subscription = new AtomicReference();
-
+ private AtomicReference reference = new AtomicReference();
+ /** Sentinel for the unsubscribed state. */
+ private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() {
+ @Override
+ public void unsubscribe() {
+ }
+ };
public boolean isUnsubscribed() {
- return unsubscribed.get();
+ return reference.get() == UNSUBSCRIBED_SENTINEL;
}
@Override
- public synchronized void unsubscribe() {
- unsubscribed.set(true);
- Subscription s = getSubscription();
+ public void unsubscribe() {
+ Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL);
if (s != null) {
s.unsubscribe();
}
-
}
- public synchronized void setSubscription(Subscription s) {
- if (unsubscribed.get()) {
- s.unsubscribe();
- } else {
- subscription.set(s);
- }
+ public void setSubscription(Subscription s) {
+ do {
+ Subscription r = reference.get();
+ if (r == UNSUBSCRIBED_SENTINEL) {
+ s.unsubscribe();
+ return;
+ }
+ if (reference.compareAndSet(r, s)) {
+ break;
+ }
+ } while (true);
}
public Subscription getSubscription() {
- return subscription.get();
+ Subscription s = reference.get();
+ return s != UNSUBSCRIBED_SENTINEL ? s : Subscriptions.empty();
}
}
diff --git a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java
index bea8c82f1e..a4747caa9b 100644
--- a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java
+++ b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java
@@ -16,6 +16,8 @@
package rx.subscriptions;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import rx.Subscription;
/**
@@ -25,10 +27,24 @@
* @see MSDN RefCountDisposable
*/
public class RefCountSubscription implements Subscription {
- private final Object guard = new Object();
- private Subscription main;
- private boolean done;
- private int count;
+ /** The state for the atomic operations. */
+ private enum State {
+ ACTIVE,
+ MUTATING,
+ UNSUBSCRIBED
+ }
+ /** The reference to the actual subscription. */
+ private volatile Subscription main;
+ /** The current state. */
+ private final AtomicReference state = new AtomicReference();
+ /** Counts the number of sub-subscriptions. */
+ private final AtomicInteger count = new AtomicInteger();
+ /** Indicate the request to unsubscribe from the main. */
+ private final AtomicBoolean mainDone = new AtomicBoolean();
+ /**
+ * Create a RefCountSubscription by wrapping the given non-null Subscription.
+ * @param s
+ */
public RefCountSubscription(Subscription s) {
if (s == null) {
throw new IllegalArgumentException("s");
@@ -39,54 +55,76 @@ public RefCountSubscription(Subscription s) {
* Returns a new sub-subscription.
*/
public Subscription getSubscription() {
- synchronized (guard) {
- if (main == null) {
+ do {
+ State s = state.get();
+ if (s == State.UNSUBSCRIBED) {
return Subscriptions.empty();
- } else {
- count++;
+ }
+ if (s == State.MUTATING) {
+ continue;
+ }
+ if (state.compareAndSet(s, State.MUTATING)) {
+ count.incrementAndGet();
+ state.set(State.ACTIVE);
return new InnerSubscription();
}
- }
+ } while(true);
}
/**
* Check if this subscription is already unsubscribed.
*/
public boolean isUnsubscribed() {
- synchronized (guard) {
- return main == null;
- }
+ return state.get() == State.UNSUBSCRIBED;
}
@Override
public void unsubscribe() {
- Subscription s = null;
- synchronized (guard) {
- if (main != null && !done) {
- done = true;
- if (count == 0) {
- s = main;
- main = null;
+ do {
+ State s = state.get();
+ if (s == State.UNSUBSCRIBED) {
+ return;
+ }
+ if (s == State.MUTATING) {
+ continue;
+ }
+ if (state.compareAndSet(s, State.MUTATING)) {
+ if (mainDone.compareAndSet(false, true) && count.get() == 0) {
+ terminate();
+ return;
}
+ state.set(State.ACTIVE);
+ break;
}
- }
- if (s != null) {
- s.unsubscribe();
- }
+ } while (true);
+ }
+ /**
+ * Terminate this subscription by unsubscribing from main and setting the
+ * state to UNSUBSCRIBED.
+ */
+ private void terminate() {
+ state.set(State.UNSUBSCRIBED);
+ Subscription r = main;
+ main = null;
+ r.unsubscribe();
}
/** Remove an inner subscription. */
void innerDone() {
- Subscription s = null;
- synchronized (guard) {
- if (main != null) {
- count--;
- if (done && count == 0) {
- s = main;
- main = null;
+ do {
+ State s = state.get();
+ if (s == State.UNSUBSCRIBED) {
+ return;
+ }
+ if (s == State.MUTATING) {
+ continue;
+ }
+ if (state.compareAndSet(s, State.MUTATING)) {
+ if (count.decrementAndGet() == 0 && mainDone.get()) {
+ terminate();
+ return;
}
+ state.set(State.ACTIVE);
+ break;
}
- }
- if (s != null) {
- s.unsubscribe();
- }
+ } while (true);
}
/** The individual sub-subscriptions. */
class InnerSubscription implements Subscription {
diff --git a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java
index 9aaa3e9a59..e26a258acc 100644
--- a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java
+++ b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java
@@ -29,22 +29,27 @@
*/
public class SerialSubscription implements Subscription {
private final AtomicReference reference = new AtomicReference(empty());
-
- private static final Subscription UNSUBSCRIBED = new Subscription() {
+ /** Sentinel for the unsubscribed state. */
+ private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() {
@Override
public void unsubscribe() {
}
};
-
+ public boolean isUnsubscribed() {
+ return reference.get() == UNSUBSCRIBED_SENTINEL;
+ }
@Override
public void unsubscribe() {
- setSubscription(UNSUBSCRIBED);
+ Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL);
+ if (s != null) {
+ s.unsubscribe();
+ }
}
public void setSubscription(final Subscription subscription) {
do {
final Subscription current = reference.get();
- if (current == UNSUBSCRIBED) {
+ if (current == UNSUBSCRIBED_SENTINEL) {
subscription.unsubscribe();
break;
}
@@ -57,6 +62,6 @@ public void setSubscription(final Subscription subscription) {
public Subscription getSubscription() {
final Subscription subscription = reference.get();
- return subscription == UNSUBSCRIBED ? null : subscription;
+ return subscription == UNSUBSCRIBED_SENTINEL ? Subscriptions.empty() : subscription;
}
}
diff --git a/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java
index ae79bc62a1..c960db2ea4 100644
--- a/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java
+++ b/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java
@@ -32,7 +32,7 @@ public final class SingleAssignmentSubscription implements Subscription {
/** Holds the current resource. */
private final AtomicReference current = new AtomicReference();
/** Sentinel for the unsubscribed state. */
- private static final Subscription SENTINEL = new Subscription() {
+ private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() {
@Override
public void unsubscribe() {
}
@@ -42,7 +42,7 @@ public void unsubscribe() {
*/
public Subscription get() {
Subscription s = current.get();
- if (s == SENTINEL) {
+ if (s == UNSUBSCRIBED_SENTINEL) {
return Subscriptions.empty();
}
return s;
@@ -57,7 +57,7 @@ public void set(Subscription s) {
if (current.compareAndSet(null, s)) {
return;
}
- if (current.get() != SENTINEL) {
+ if (current.get() != UNSUBSCRIBED_SENTINEL) {
throw new IllegalStateException("Subscription already set");
}
if (s != null) {
@@ -66,7 +66,7 @@ public void set(Subscription s) {
}
@Override
public void unsubscribe() {
- Subscription old = current.getAndSet(SENTINEL);
+ Subscription old = current.getAndSet(UNSUBSCRIBED_SENTINEL);
if (old != null) {
old.unsubscribe();
}
@@ -75,7 +75,7 @@ public void unsubscribe() {
* Test if this subscription is already unsubscribed.
*/
public boolean isUnsubscribed() {
- return current.get() == SENTINEL;
+ return current.get() == UNSUBSCRIBED_SENTINEL;
}
}
diff --git a/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java
new file mode 100644
index 0000000000..9fbed89153
--- /dev/null
+++ b/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.subscriptions;
+
+import junit.framework.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import rx.Subscription;
+import static rx.subscriptions.Subscriptions.create;
+import rx.util.functions.Action0;
+
+public class MultipleAssignmentSubscriptionTest {
+ Action0 unsubscribe;
+ Subscription s;
+ @Before
+ public void before() {
+ unsubscribe = mock(Action0.class);
+ s = create(unsubscribe);
+ }
+ @Test
+ public void testNoUnsubscribeWhenReplaced() {
+ MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
+
+ mas.setSubscription(s);
+ mas.setSubscription(null);
+ mas.unsubscribe();
+
+ verify(unsubscribe, never()).call();
+
+ }
+ @Test
+ public void testUnsubscribeWhenParentUnsubscribes() {
+ MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
+ mas.setSubscription(s);
+ mas.unsubscribe();
+ mas.unsubscribe();
+
+ verify(unsubscribe, times(1)).call();
+
+ Assert.assertEquals(true, mas.isUnsubscribed());
+ }
+ @Test
+ public void testUnsubscribedDoesntLeakSentinel() {
+ MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
+
+ mas.setSubscription(s);
+ mas.unsubscribe();
+
+ Assert.assertEquals(true, mas.getSubscription() == Subscriptions.empty());
+ }
+}
\ No newline at end of file
diff --git a/rxjava-core/src/test/java/rx/subscriptions/RefCountSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/RefCountSubscriptionTest.java
new file mode 100644
index 0000000000..d11899f903
--- /dev/null
+++ b/rxjava-core/src/test/java/rx/subscriptions/RefCountSubscriptionTest.java
@@ -0,0 +1,108 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.subscriptions;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+import static org.mockito.Mockito.*;
+import rx.Subscription;
+import static rx.subscriptions.Subscriptions.create;
+import rx.util.functions.Action0;
+
+public class RefCountSubscriptionTest {
+ Action0 main;
+ RefCountSubscription rcs;
+ @Before
+ public void before() {
+ main = mock(Action0.class);
+ rcs = new RefCountSubscription(create(main));
+ }
+ @Test
+ public void testImmediateUnsubscribe() {
+ InOrder inOrder = inOrder(main);
+
+ rcs.unsubscribe();
+
+ inOrder.verify(main, times(1)).call();
+
+ rcs.unsubscribe();
+
+ inOrder.verifyNoMoreInteractions();
+ }
+ @Test
+ public void testRCSUnsubscribeBeforeClient() {
+ InOrder inOrder = inOrder(main);
+
+ Subscription s = rcs.getSubscription();
+
+ rcs.unsubscribe();
+
+ inOrder.verify(main, never()).call();
+
+ s.unsubscribe();
+
+ inOrder.verify(main, times(1)).call();
+
+ rcs.unsubscribe();
+ s.unsubscribe();
+
+ inOrder.verifyNoMoreInteractions();
+
+ }
+ @Test
+ public void testMultipleClientsUnsubscribeFirst() {
+ InOrder inOrder = inOrder(main);
+
+ Subscription s1 = rcs.getSubscription();
+ Subscription s2 = rcs.getSubscription();
+
+ s1.unsubscribe();
+ inOrder.verify(main, never()).call();
+ s2.unsubscribe();
+ inOrder.verify(main, never()).call();
+
+ rcs.unsubscribe();
+ inOrder.verify(main, times(1)).call();
+
+ s1.unsubscribe();
+ s2.unsubscribe();
+ rcs.unsubscribe();
+
+ inOrder.verifyNoMoreInteractions();
+ }
+ @Test
+ public void testMultipleClientsMainUnsubscribeFirst() {
+ InOrder inOrder = inOrder(main);
+
+ Subscription s1 = rcs.getSubscription();
+ Subscription s2 = rcs.getSubscription();
+
+ rcs.unsubscribe();
+ inOrder.verify(main, never()).call();
+ s1.unsubscribe();
+ inOrder.verify(main, never()).call();
+ s2.unsubscribe();
+
+ inOrder.verify(main, times(1)).call();
+
+ s1.unsubscribe();
+ s2.unsubscribe();
+ rcs.unsubscribe();
+
+ inOrder.verifyNoMoreInteractions();
+ }
+}
diff --git a/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java b/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java
index 1569d9c7df..4afadb6f9c 100644
--- a/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java
+++ b/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java
@@ -49,11 +49,11 @@ public void unsubscribingWithoutUnderlyingDoesNothing() {
}
@Test
- public void getSubscriptionShouldReturnSubscriptionAfterUnsubscribe() {
+ public void getSubscriptionShouldReturnEmptySubscriptionAfterUnsubscribe() {
final Subscription underlying = mock(Subscription.class);
serialSubscription.setSubscription(underlying);
serialSubscription.unsubscribe();
- assertEquals(null, serialSubscription.getSubscription());
+ assertEquals(Subscriptions.empty(), serialSubscription.getSubscription());
}
@Test