diff --git a/rxjava-core/src/main/java/rx/joins/JoinObserver1.java b/rxjava-core/src/main/java/rx/joins/JoinObserver1.java index 873d3d1a7f..ede55f0584 100644 --- a/rxjava-core/src/main/java/rx/joins/JoinObserver1.java +++ b/rxjava-core/src/main/java/rx/joins/JoinObserver1.java @@ -19,9 +19,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; + import rx.Notification; import rx.Observable; -import rx.subscriptions.SingleAssignmentSubscription; +import rx.operators.SafeObservableSubscription; import rx.util.functions.Action1; /** @@ -33,14 +35,15 @@ public final class JoinObserver1 extends ObserverBase> implem private final Action1 onError; private final List activePlans; private final Queue> queue; - private final SingleAssignmentSubscription subscription; + private final SafeObservableSubscription subscription; private volatile boolean done; + private final AtomicBoolean subscribed = new AtomicBoolean(false); public JoinObserver1(Observable source, Action1 onError) { this.source = source; this.onError = onError; queue = new LinkedList>(); - subscription = new SingleAssignmentSubscription(); + subscription = new SafeObservableSubscription(); activePlans = new ArrayList(); } public Queue> queue() { @@ -51,8 +54,12 @@ public void addActivePlan(ActivePlan0 activePlan) { } @Override public void subscribe(Object gate) { - this.gate = gate; - subscription.set(source.materialize().subscribe(this)); + if (subscribed.compareAndSet(false, true)) { + this.gate = gate; + subscription.wrap(source.materialize().subscribe(this)); + } else { + throw new IllegalStateException("Can only be subscribed to once."); + } } @Override diff --git a/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java index 7b18b35efc..d6dfedd549 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java @@ -19,23 +19,45 @@ import rx.Observable; import rx.Subscription; +import rx.util.functions.Action0; /** * Subscription that can be checked for status such as in a loop inside an {@link Observable} to exit the loop if unsubscribed. * * @see Rx.Net equivalent BooleanDisposable */ -public class BooleanSubscription implements Subscription { +public final class BooleanSubscription implements Subscription { private final AtomicBoolean unsubscribed = new AtomicBoolean(false); + private final Action0 action; + + public BooleanSubscription() { + action = null; + } + + private BooleanSubscription(Action0 action) { + this.action = action; + } + + public static BooleanSubscription create() { + return new BooleanSubscription(); + } + + public static BooleanSubscription create(Action0 onUnsubscribe) { + return new BooleanSubscription(onUnsubscribe); + } public boolean isUnsubscribed() { return unsubscribed.get(); } @Override - public void unsubscribe() { - unsubscribed.set(true); + public final void unsubscribe() { + if (unsubscribed.compareAndSet(false, true)) { + if (action != null) { + action.call(); + } + } } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index c19331c8ec..b0cf7cac23 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -1,28 +1,24 @@ - /** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.subscriptions; -import static java.util.Arrays.asList; -import static java.util.Collections.unmodifiableSet; - import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import rx.Subscription; @@ -31,106 +27,118 @@ /** * Subscription that represents a group of Subscriptions that are unsubscribed * together. - * - * @see Rx.Net - * equivalent CompositeDisposable + * + * @see Rx.Net equivalent CompositeDisposable */ -public class CompositeSubscription implements Subscription { - /** Sentinel to indicate a thread is modifying the subscription set. */ - private static final Set MUTATE_SENTINEL = unmodifiableSet(Collections.emptySet()); - /** Sentinel to indicate the entire CompositeSubscription has been unsubscribed.*/ - private static final Set UNSUBSCRIBED_SENTINEL = unmodifiableSet(Collections.emptySet()); - /** The reference to the set of subscriptions. */ - private final AtomicReference> reference = new AtomicReference>(); - +public final class CompositeSubscription implements Subscription { + + private final AtomicReference state = new AtomicReference(); + + private static final class State { + final boolean isUnsubscribed; + final List subscriptions; + + State(boolean u, List s) { + this.isUnsubscribed = u; + this.subscriptions = s; + } + + State unsubscribe() { + return new State(true, subscriptions); + } + + State add(Subscription s) { + List newSubscriptions = new ArrayList(); + newSubscriptions.addAll(subscriptions); + newSubscriptions.add(s); + return new State(isUnsubscribed, newSubscriptions); + } + + State remove(Subscription s) { + List newSubscriptions = new ArrayList(); + newSubscriptions.addAll(subscriptions); + newSubscriptions.remove(s); // only first occurrence + return new State(isUnsubscribed, newSubscriptions); + } + + State clear() { + return new State(isUnsubscribed, new ArrayList()); + } + } + public CompositeSubscription(final Subscription... subscriptions) { - reference.set(new HashSet(asList(subscriptions))); + state.set(new State(false, Arrays.asList(subscriptions))); } - + public boolean isUnsubscribed() { - return reference.get() == UNSUBSCRIBED_SENTINEL; + return state.get().isUnsubscribed; } - + public void add(final Subscription s) { + State oldState; + State newState; do { - final Set existing = reference.get(); - if (existing == UNSUBSCRIBED_SENTINEL) { + oldState = state.get(); + if (oldState.isUnsubscribed) { s.unsubscribe(); - break; - } - - if (existing == MUTATE_SENTINEL) { - continue; - } - - if (reference.compareAndSet(existing, MUTATE_SENTINEL)) { - existing.add(s); - reference.set(existing); - break; + return; + } else { + newState = oldState.add(s); } - } while (true); + } while (!state.compareAndSet(oldState, newState)); } - + public void remove(final Subscription s) { + State oldState; + State newState; do { - final Set subscriptions = reference.get(); - if (subscriptions == UNSUBSCRIBED_SENTINEL) { - s.unsubscribe(); - break; - } - - if (subscriptions == MUTATE_SENTINEL) { - continue; - } - - if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) { - // also unsubscribe from it: - // http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx - subscriptions.remove(s); - reference.set(subscriptions); - s.unsubscribe(); - break; + oldState = state.get(); + if (oldState.isUnsubscribed) { + return; + } else { + newState = oldState.remove(s); } - } while (true); + } while (!state.compareAndSet(oldState, newState)); + // if we removed successfully we then need to call unsubscribe on it + s.unsubscribe(); } - + public void clear() { + State oldState; + State newState; do { - final Set subscriptions = reference.get(); - if (subscriptions == UNSUBSCRIBED_SENTINEL) { - break; - } - - if (subscriptions == MUTATE_SENTINEL) { - continue; + oldState = state.get(); + if (oldState.isUnsubscribed) { + return; + } else { + newState = oldState.clear(); } - - if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) { - final Set copy = new HashSet( - subscriptions); - subscriptions.clear(); - reference.set(subscriptions); - - unsubscribeAll(copy); - break; + } while (!state.compareAndSet(oldState, newState)); + // if we cleared successfully we then need to call unsubscribe on all previous + unsubscribeFromAll(oldState.subscriptions); + } + + @Override + public void unsubscribe() { + State oldState; + State newState; + do { + oldState = state.get(); + if (oldState.isUnsubscribed) { + return; + } else { + newState = oldState.unsubscribe(); } - } while (true); + } while (!state.compareAndSet(oldState, newState)); + unsubscribeFromAll(oldState.subscriptions); } - /** - * Unsubscribe from the collection of subscriptions. - *

- * Exceptions thrown by any of the {@code unsubscribe()} methods are - * collected into a {@link CompositeException} and thrown once - * all unsubscriptions have been attempted. - * @param subs the collection of subscriptions - */ - private void unsubscribeAll(Collection subs) { + + private static void unsubscribeFromAll(Collection subscriptions) { final Collection es = new ArrayList(); - for (final Subscription s : subs) { + for (Subscription s : subscriptions) { try { s.unsubscribe(); - } catch (final Throwable e) { + } catch (Throwable e) { es.add(e); } } @@ -139,22 +147,4 @@ private void unsubscribeAll(Collection subs) { "Failed to unsubscribe to 1 or more subscriptions.", es); } } - @Override - public void unsubscribe() { - do { - final Set subscriptions = reference.get(); - if (subscriptions == UNSUBSCRIBED_SENTINEL) { - break; - } - - if (subscriptions == MUTATE_SENTINEL) { - continue; - } - - if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_SENTINEL)) { - unsubscribeAll(subscriptions); - break; - } - } while (true); - } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java index 8fed35fbbf..f6a4da6c70 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,7 +15,6 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import rx.Observable; @@ -26,42 +25,77 @@ * * @see Rx.Net equivalent MultipleAssignmentDisposable */ -public class MultipleAssignmentSubscription implements Subscription { - private AtomicReference reference = new AtomicReference(); - /** Sentinel for the unsubscribed state. */ - private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() { - @Override - public void unsubscribe() { +public final class MultipleAssignmentSubscription implements Subscription { + + private final AtomicReference state = new AtomicReference(new State(false, Subscriptions.empty())); + + private static final class State { + final boolean isUnsubscribed; + final Subscription subscription; + + State(boolean u, Subscription s) { + this.isUnsubscribed = u; + this.subscription = s; } - }; + + State unsubscribe() { + return new State(true, subscription); + } + + State set(Subscription s) { + return new State(isUnsubscribed, s); + } + + } + public boolean isUnsubscribed() { - return reference.get() == UNSUBSCRIBED_SENTINEL; + return state.get().isUnsubscribed; } @Override public void unsubscribe() { - Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL); - if (s != null) { - s.unsubscribe(); - } + State oldState; + State newState; + do { + oldState = state.get(); + if (oldState.isUnsubscribed) { + return; + } else { + newState = oldState.unsubscribe(); + } + } while (!state.compareAndSet(oldState, newState)); + oldState.subscription.unsubscribe(); } + @Deprecated public void setSubscription(Subscription s) { + set(s); + } + + public void set(Subscription s) { + if (s == null) { + throw new IllegalArgumentException("Subscription can not be null"); + } + State oldState; + State newState; do { - Subscription r = reference.get(); - if (r == UNSUBSCRIBED_SENTINEL) { + oldState = state.get(); + if (oldState.isUnsubscribed) { s.unsubscribe(); return; + } else { + newState = oldState.set(s); } - if (reference.compareAndSet(r, s)) { - break; - } - } while (true); + } while (!state.compareAndSet(oldState, newState)); } + @Deprecated public Subscription getSubscription() { - Subscription s = reference.get(); - return s != UNSUBSCRIBED_SENTINEL ? s : Subscriptions.empty(); + return get(); + } + + public Subscription get() { + return state.get().subscription; } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java index a4747caa9b..cb310e315e 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java @@ -16,8 +16,8 @@ package rx.subscriptions; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; + import rx.Subscription; /** @@ -26,113 +26,109 @@ * * @see MSDN RefCountDisposable */ -public class RefCountSubscription implements Subscription { - /** The state for the atomic operations. */ - private enum State { - ACTIVE, - MUTATING, - UNSUBSCRIBED +public final class RefCountSubscription implements Subscription { + private final Subscription actual; + private final AtomicReference state = new AtomicReference(new State(false, 0)); + + private static final class State { + final boolean isUnsubscribed; + final int children; + + State(boolean u, int c) { + this.isUnsubscribed = u; + this.children = c; + } + + State addChild() { + return new State(isUnsubscribed, children + 1); + } + + State removeChild() { + return new State(isUnsubscribed, children - 1); + } + + State unsubscribe() { + return new State(true, children); + } + } - /** The reference to the actual subscription. */ - private volatile Subscription main; - /** The current state. */ - private final AtomicReference state = new AtomicReference(); - /** Counts the number of sub-subscriptions. */ - private final AtomicInteger count = new AtomicInteger(); - /** Indicate the request to unsubscribe from the main. */ - private final AtomicBoolean mainDone = new AtomicBoolean(); + /** * Create a RefCountSubscription by wrapping the given non-null Subscription. - * @param s + * + * @param s */ public RefCountSubscription(Subscription s) { if (s == null) { throw new IllegalArgumentException("s"); } - this.main = s; + this.actual = s; } + + @Deprecated + public Subscription getSubscription() { + return get(); + } + /** * Returns a new sub-subscription. */ - public Subscription getSubscription() { + public Subscription get() { + State oldState; + State newState; do { - State s = state.get(); - if (s == State.UNSUBSCRIBED) { + oldState = state.get(); + if (oldState.isUnsubscribed) { return Subscriptions.empty(); + } else { + newState = oldState.addChild(); } - if (s == State.MUTATING) { - continue; - } - if (state.compareAndSet(s, State.MUTATING)) { - count.incrementAndGet(); - state.set(State.ACTIVE); - return new InnerSubscription(); - } - } while(true); + } while (!state.compareAndSet(oldState, newState)); + + return new InnerSubscription(); } + /** * Check if this subscription is already unsubscribed. */ public boolean isUnsubscribed() { - return state.get() == State.UNSUBSCRIBED; + return state.get().isUnsubscribed; } + @Override public void unsubscribe() { + State oldState; + State newState; do { - State s = state.get(); - if (s == State.UNSUBSCRIBED) { + oldState = state.get(); + if (oldState.isUnsubscribed) { return; } - if (s == State.MUTATING) { - continue; - } - if (state.compareAndSet(s, State.MUTATING)) { - if (mainDone.compareAndSet(false, true) && count.get() == 0) { - terminate(); - return; - } - state.set(State.ACTIVE); - break; - } - } while (true); - } - /** - * Terminate this subscription by unsubscribing from main and setting the - * state to UNSUBSCRIBED. - */ - private void terminate() { - state.set(State.UNSUBSCRIBED); - Subscription r = main; - main = null; - r.unsubscribe(); + newState = oldState.unsubscribe(); + } while (!state.compareAndSet(oldState, newState)); + unsubscribeActualIfApplicable(newState); } - /** Remove an inner subscription. */ - void innerDone() { - do { - State s = state.get(); - if (s == State.UNSUBSCRIBED) { - return; - } - if (s == State.MUTATING) { - continue; - } - if (state.compareAndSet(s, State.MUTATING)) { - if (count.decrementAndGet() == 0 && mainDone.get()) { - terminate(); - return; - } - state.set(State.ACTIVE); - break; - } - } while (true); + + private void unsubscribeActualIfApplicable(State state) { + if (state.isUnsubscribed && state.children == 0) { + actual.unsubscribe(); + } } + /** The individual sub-subscriptions. */ - class InnerSubscription implements Subscription { + private final class InnerSubscription implements Subscription { final AtomicBoolean innerDone = new AtomicBoolean(); + @Override public void unsubscribe() { if (innerDone.compareAndSet(false, true)) { - innerDone(); + State oldState; + State newState; + do { + oldState = state.get(); + newState = oldState.removeChild(); + } while (!state.compareAndSet(oldState, newState)); + unsubscribeActualIfApplicable(newState); } } }; diff --git a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java index e26a258acc..73a27ca62f 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java @@ -15,8 +15,6 @@ */ package rx.subscriptions; -import static rx.subscriptions.Subscriptions.empty; - import java.util.concurrent.atomic.AtomicReference; import rx.Subscription; @@ -27,41 +25,78 @@ * * @see Rx.Net equivalent SerialDisposable */ -public class SerialSubscription implements Subscription { - private final AtomicReference reference = new AtomicReference(empty()); - /** Sentinel for the unsubscribed state. */ - private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() { - @Override - public void unsubscribe() { +public final class SerialSubscription implements Subscription { + + private final AtomicReference state = new AtomicReference(new State(false, Subscriptions.empty())); + + private static final class State { + final boolean isUnsubscribed; + final Subscription subscription; + + State(boolean u, Subscription s) { + this.isUnsubscribed = u; + this.subscription = s; + } + + State unsubscribe() { + return new State(true, subscription); } - }; + + State set(Subscription s) { + return new State(isUnsubscribed, s); + } + + } + public boolean isUnsubscribed() { - return reference.get() == UNSUBSCRIBED_SENTINEL; + return state.get().isUnsubscribed; } + @Override public void unsubscribe() { - Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL); - if (s != null) { - s.unsubscribe(); - } + State oldState; + State newState; + do { + oldState = state.get(); + if (oldState.isUnsubscribed) { + return; + } else { + newState = oldState.unsubscribe(); + } + } while (!state.compareAndSet(oldState, newState)); + oldState.subscription.unsubscribe(); } - public void setSubscription(final Subscription subscription) { + @Deprecated + public void setSubscription(Subscription s) { + set(s); + } + + public void set(Subscription s) { + if (s == null) { + throw new IllegalArgumentException("Subscription can not be null"); + } + State oldState; + State newState; do { - final Subscription current = reference.get(); - if (current == UNSUBSCRIBED_SENTINEL) { - subscription.unsubscribe(); - break; + oldState = state.get(); + if (oldState.isUnsubscribed) { + s.unsubscribe(); + return; + } else { + newState = oldState.set(s); } - if (reference.compareAndSet(current, subscription)) { - current.unsubscribe(); - break; - } - } while (true); + } while (!state.compareAndSet(oldState, newState)); + oldState.subscription.unsubscribe(); } - + + @Deprecated public Subscription getSubscription() { - final Subscription subscription = reference.get(); - return subscription == UNSUBSCRIBED_SENTINEL ? Subscriptions.empty() : subscription; + return get(); } + + public Subscription get() { + return state.get().subscription; + } + } diff --git a/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java deleted file mode 100644 index c960db2ea4..0000000000 --- a/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package rx.subscriptions; - -import java.util.concurrent.atomic.AtomicReference; -import rx.Subscription; - -/** - * A subscription that allows only a single resource to be assigned. - *

- * If this subscription is live, no other subscription may be set() and - * yields an {@link IllegalStateException}. - *

- * If the unsubscribe has been called, setting a new subscription will - * unsubscribe it immediately. - */ -public final class SingleAssignmentSubscription implements Subscription { - /** Holds the current resource. */ - private final AtomicReference current = new AtomicReference(); - /** Sentinel for the unsubscribed state. */ - private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() { - @Override - public void unsubscribe() { - } - }; - /** - * Returns the current subscription or null if not yet set. - */ - public Subscription get() { - Subscription s = current.get(); - if (s == UNSUBSCRIBED_SENTINEL) { - return Subscriptions.empty(); - } - return s; - } - /** - * Sets a new subscription if not already set. - * @param s the new subscription - * @throws IllegalStateException if this subscription is live and contains - * another subscription. - */ - public void set(Subscription s) { - if (current.compareAndSet(null, s)) { - return; - } - if (current.get() != UNSUBSCRIBED_SENTINEL) { - throw new IllegalStateException("Subscription already set"); - } - if (s != null) { - s.unsubscribe(); - } - } - @Override - public void unsubscribe() { - Subscription old = current.getAndSet(UNSUBSCRIBED_SENTINEL); - if (old != null) { - old.unsubscribe(); - } - } - /** - * Test if this subscription is already unsubscribed. - */ - public boolean isUnsubscribed() { - return current.get() == UNSUBSCRIBED_SENTINEL; - } - -} diff --git a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java index 61fd6b3f6c..c636184dad 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java +++ b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java @@ -24,7 +24,7 @@ /** * Helper methods and utilities for creating and working with {@link Subscription} objects */ -public class Subscriptions { +public final class Subscriptions { /** * A {@link Subscription} that does nothing. * diff --git a/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java index 9fbed89153..3f82df5891 100644 --- a/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java +++ b/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java @@ -15,54 +15,66 @@ */ package rx.subscriptions; +import static org.mockito.Mockito.*; +import static rx.subscriptions.Subscriptions.*; import junit.framework.Assert; + import org.junit.Before; import org.junit.Test; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; + import rx.Subscription; -import static rx.subscriptions.Subscriptions.create; import rx.util.functions.Action0; public class MultipleAssignmentSubscriptionTest { + Action0 unsubscribe; Subscription s; + @Before public void before() { - unsubscribe = mock(Action0.class); - s = create(unsubscribe); + unsubscribe = mock(Action0.class); + s = create(unsubscribe); } + @Test public void testNoUnsubscribeWhenReplaced() { MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); - - mas.setSubscription(s); - mas.setSubscription(null); - mas.unsubscribe(); - + + mas.set(s); + mas.set(Subscriptions.empty()); + mas.unsubscribe(); + verify(unsubscribe, never()).call(); - } + @Test public void testUnsubscribeWhenParentUnsubscribes() { MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); - mas.setSubscription(s); + mas.set(s); mas.unsubscribe(); mas.unsubscribe(); - + verify(unsubscribe, times(1)).call(); - + Assert.assertEquals(true, mas.isUnsubscribed()); } + @Test - public void testUnsubscribedDoesntLeakSentinel() { + public void subscribingWhenUnsubscribedCausesImmediateUnsubscription() { MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + mas.unsubscribe(); + Subscription underlying = mock(Subscription.class); + mas.set(underlying); + verify(underlying).unsubscribe(); + } - mas.setSubscription(s); + @Test + public void testSubscriptionRemainsAfterUnsubscribe() { + MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + + mas.set(s); mas.unsubscribe(); - - Assert.assertEquals(true, mas.getSubscription() == Subscriptions.empty()); + + Assert.assertEquals(true, mas.get() == s); } } \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/subscriptions/RefCountSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/RefCountSubscriptionTest.java index d11899f903..4d4cec95e1 100644 --- a/rxjava-core/src/test/java/rx/subscriptions/RefCountSubscriptionTest.java +++ b/rxjava-core/src/test/java/rx/subscriptions/RefCountSubscriptionTest.java @@ -47,7 +47,7 @@ public void testImmediateUnsubscribe() { public void testRCSUnsubscribeBeforeClient() { InOrder inOrder = inOrder(main); - Subscription s = rcs.getSubscription(); + Subscription s = rcs.get(); rcs.unsubscribe(); @@ -67,8 +67,8 @@ public void testRCSUnsubscribeBeforeClient() { public void testMultipleClientsUnsubscribeFirst() { InOrder inOrder = inOrder(main); - Subscription s1 = rcs.getSubscription(); - Subscription s2 = rcs.getSubscription(); + Subscription s1 = rcs.get(); + Subscription s2 = rcs.get(); s1.unsubscribe(); inOrder.verify(main, never()).call(); @@ -88,8 +88,8 @@ public void testMultipleClientsUnsubscribeFirst() { public void testMultipleClientsMainUnsubscribeFirst() { InOrder inOrder = inOrder(main); - Subscription s1 = rcs.getSubscription(); - Subscription s2 = rcs.getSubscription(); + Subscription s1 = rcs.get(); + Subscription s2 = rcs.get(); rcs.unsubscribe(); inOrder.verify(main, never()).call(); diff --git a/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java b/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java index 4afadb6f9c..9459242549 100644 --- a/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java +++ b/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java @@ -49,28 +49,20 @@ public void unsubscribingWithoutUnderlyingDoesNothing() { } @Test - public void getSubscriptionShouldReturnEmptySubscriptionAfterUnsubscribe() { + public void getSubscriptionShouldReturnset() { final Subscription underlying = mock(Subscription.class); - serialSubscription.setSubscription(underlying); - serialSubscription.unsubscribe(); - assertEquals(Subscriptions.empty(), serialSubscription.getSubscription()); - } - - @Test - public void getSubscriptionShouldReturnSetSubscription() { - final Subscription underlying = mock(Subscription.class); - serialSubscription.setSubscription(underlying); - assertSame(underlying, serialSubscription.getSubscription()); + serialSubscription.set(underlying); + assertSame(underlying, serialSubscription.get()); final Subscription another = mock(Subscription.class); - serialSubscription.setSubscription(another); - assertSame(another, serialSubscription.getSubscription()); + serialSubscription.set(another); + assertSame(another, serialSubscription.get()); } @Test public void unsubscribingTwiceDoesUnsubscribeOnce() { Subscription underlying = mock(Subscription.class); - serialSubscription.setSubscription(underlying); + serialSubscription.set(underlying); serialSubscription.unsubscribe(); verify(underlying).unsubscribe(); @@ -82,16 +74,16 @@ public void unsubscribingTwiceDoesUnsubscribeOnce() { @Test public void settingSameSubscriptionTwiceDoesUnsubscribeIt() { Subscription underlying = mock(Subscription.class); - serialSubscription.setSubscription(underlying); + serialSubscription.set(underlying); verifyZeroInteractions(underlying); - serialSubscription.setSubscription(underlying); + serialSubscription.set(underlying); verify(underlying).unsubscribe(); } @Test public void unsubscribingWithSingleUnderlyingUnsubscribes() { Subscription underlying = mock(Subscription.class); - serialSubscription.setSubscription(underlying); + serialSubscription.set(underlying); underlying.unsubscribe(); verify(underlying).unsubscribe(); } @@ -99,18 +91,18 @@ public void unsubscribingWithSingleUnderlyingUnsubscribes() { @Test public void replacingFirstUnderlyingCausesUnsubscription() { Subscription first = mock(Subscription.class); - serialSubscription.setSubscription(first); + serialSubscription.set(first); Subscription second = mock(Subscription.class); - serialSubscription.setSubscription(second); + serialSubscription.set(second); verify(first).unsubscribe(); } @Test public void whenUnsubscribingSecondUnderlyingUnsubscribed() { Subscription first = mock(Subscription.class); - serialSubscription.setSubscription(first); + serialSubscription.set(first); Subscription second = mock(Subscription.class); - serialSubscription.setSubscription(second); + serialSubscription.set(second); serialSubscription.unsubscribe(); verify(second).unsubscribe(); } @@ -119,7 +111,7 @@ public void whenUnsubscribingSecondUnderlyingUnsubscribed() { public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscription() { serialSubscription.unsubscribe(); Subscription underlying = mock(Subscription.class); - serialSubscription.setSubscription(underlying); + serialSubscription.set(underlying); verify(underlying).unsubscribe(); } @@ -127,7 +119,7 @@ public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscription() { public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscriptionConcurrently() throws InterruptedException { final Subscription firstSet = mock(Subscription.class); - serialSubscription.setSubscription(firstSet); + serialSubscription.set(firstSet); final CountDownLatch start = new CountDownLatch(1); @@ -155,7 +147,7 @@ public void run() { final Subscription underlying = mock(Subscription.class); start.countDown(); - serialSubscription.setSubscription(underlying); + serialSubscription.set(underlying); end.await(); verify(firstSet).unsubscribe(); verify(underlying).unsubscribe(); @@ -184,7 +176,7 @@ public void concurrentSetSubscriptionShouldNotInterleave() public void run() { try { start.await(); - serialSubscription.setSubscription(subscription); + serialSubscription.set(subscription); } catch (InterruptedException e) { fail(e.getMessage()); } finally {