From 38f21dc7ac406ad483809197b1516a4b0c72c3c4 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 23 Dec 2013 11:16:42 -0800 Subject: [PATCH 1/8] BooleanSubscription: Add Action Support --- .../rx/subscriptions/BooleanSubscription.java | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java index 7b18b35efc..31c904e47f 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java @@ -19,6 +19,7 @@ import rx.Observable; import rx.Subscription; +import rx.util.functions.Action0; /** * Subscription that can be checked for status such as in a loop inside an {@link Observable} to exit the loop if unsubscribed. @@ -28,14 +29,35 @@ public class BooleanSubscription implements Subscription { private final AtomicBoolean unsubscribed = new AtomicBoolean(false); + private final Action0 action; + + public BooleanSubscription() { + action = null; + } + + private BooleanSubscription(Action0 action) { + this.action = action; + } + + public static BooleanSubscription create() { + return new BooleanSubscription(); + } + + public static BooleanSubscription create(Action0 onUnsubscribe) { + return new BooleanSubscription(onUnsubscribe); + } public boolean isUnsubscribed() { return unsubscribed.get(); } @Override - public void unsubscribe() { - unsubscribed.set(true); + public final void unsubscribe() { + if (unsubscribed.compareAndSet(false, true)) { + if (action != null) { + action.call(); + } + } } } From 10b18d2b292c9dc3797d6d2112a61394d1023473 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 23 Dec 2013 11:17:09 -0800 Subject: [PATCH 2/8] Remove Unnecessary Subscription - be explicit for error case in JoinObserver --- .../src/main/java/rx/joins/JoinObserver1.java | 17 ++-- .../SingleAssignmentSubscription.java | 81 ------------------- 2 files changed, 12 insertions(+), 86 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java diff --git a/rxjava-core/src/main/java/rx/joins/JoinObserver1.java b/rxjava-core/src/main/java/rx/joins/JoinObserver1.java index 873d3d1a7f..ede55f0584 100644 --- a/rxjava-core/src/main/java/rx/joins/JoinObserver1.java +++ b/rxjava-core/src/main/java/rx/joins/JoinObserver1.java @@ -19,9 +19,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; + import rx.Notification; import rx.Observable; -import rx.subscriptions.SingleAssignmentSubscription; +import rx.operators.SafeObservableSubscription; import rx.util.functions.Action1; /** @@ -33,14 +35,15 @@ public final class JoinObserver1 extends ObserverBase> implem private final Action1 onError; private final List activePlans; private final Queue> queue; - private final SingleAssignmentSubscription subscription; + private final SafeObservableSubscription subscription; private volatile boolean done; + private final AtomicBoolean subscribed = new AtomicBoolean(false); public JoinObserver1(Observable source, Action1 onError) { this.source = source; this.onError = onError; queue = new LinkedList>(); - subscription = new SingleAssignmentSubscription(); + subscription = new SafeObservableSubscription(); activePlans = new ArrayList(); } public Queue> queue() { @@ -51,8 +54,12 @@ public void addActivePlan(ActivePlan0 activePlan) { } @Override public void subscribe(Object gate) { - this.gate = gate; - subscription.set(source.materialize().subscribe(this)); + if (subscribed.compareAndSet(false, true)) { + this.gate = gate; + subscription.wrap(source.materialize().subscribe(this)); + } else { + throw new IllegalStateException("Can only be subscribed to once."); + } } @Override diff --git a/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java deleted file mode 100644 index c960db2ea4..0000000000 --- a/rxjava-core/src/main/java/rx/subscriptions/SingleAssignmentSubscription.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package rx.subscriptions; - -import java.util.concurrent.atomic.AtomicReference; -import rx.Subscription; - -/** - * A subscription that allows only a single resource to be assigned. - *

- * If this subscription is live, no other subscription may be set() and - * yields an {@link IllegalStateException}. - *

- * If the unsubscribe has been called, setting a new subscription will - * unsubscribe it immediately. - */ -public final class SingleAssignmentSubscription implements Subscription { - /** Holds the current resource. */ - private final AtomicReference current = new AtomicReference(); - /** Sentinel for the unsubscribed state. */ - private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() { - @Override - public void unsubscribe() { - } - }; - /** - * Returns the current subscription or null if not yet set. - */ - public Subscription get() { - Subscription s = current.get(); - if (s == UNSUBSCRIBED_SENTINEL) { - return Subscriptions.empty(); - } - return s; - } - /** - * Sets a new subscription if not already set. - * @param s the new subscription - * @throws IllegalStateException if this subscription is live and contains - * another subscription. - */ - public void set(Subscription s) { - if (current.compareAndSet(null, s)) { - return; - } - if (current.get() != UNSUBSCRIBED_SENTINEL) { - throw new IllegalStateException("Subscription already set"); - } - if (s != null) { - s.unsubscribe(); - } - } - @Override - public void unsubscribe() { - Subscription old = current.getAndSet(UNSUBSCRIBED_SENTINEL); - if (old != null) { - old.unsubscribe(); - } - } - /** - * Test if this subscription is already unsubscribed. - */ - public boolean isUnsubscribed() { - return current.get() == UNSUBSCRIBED_SENTINEL; - } - -} From 831252ec1f1a9e4906a0696f7efb9c23db74c7dc Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 23 Dec 2013 11:17:20 -0800 Subject: [PATCH 3/8] Code Reformatting --- .../subscriptions/CompositeSubscription.java | 76 ++++++++++--------- .../MultipleAssignmentSubscription.java | 10 +-- .../subscriptions/RefCountSubscription.java | 16 +++- .../rx/subscriptions/SerialSubscription.java | 4 +- 4 files changed, 60 insertions(+), 46 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index c19331c8ec..ee715df7f7 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -1,18 +1,18 @@ - /** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.subscriptions; import static java.util.Arrays.asList; @@ -31,27 +31,25 @@ /** * Subscription that represents a group of Subscriptions that are unsubscribed * together. - * - * @see Rx.Net - * equivalent CompositeDisposable + * + * @see Rx.Net equivalent CompositeDisposable */ public class CompositeSubscription implements Subscription { /** Sentinel to indicate a thread is modifying the subscription set. */ - private static final Set MUTATE_SENTINEL = unmodifiableSet(Collections.emptySet()); - /** Sentinel to indicate the entire CompositeSubscription has been unsubscribed.*/ - private static final Set UNSUBSCRIBED_SENTINEL = unmodifiableSet(Collections.emptySet()); + private static final Set MUTATE_SENTINEL = unmodifiableSet(Collections. emptySet()); + /** Sentinel to indicate the entire CompositeSubscription has been unsubscribed. */ + private static final Set UNSUBSCRIBED_SENTINEL = unmodifiableSet(Collections. emptySet()); /** The reference to the set of subscriptions. */ private final AtomicReference> reference = new AtomicReference>(); - + public CompositeSubscription(final Subscription... subscriptions) { reference.set(new HashSet(asList(subscriptions))); } - + public boolean isUnsubscribed() { return reference.get() == UNSUBSCRIBED_SENTINEL; } - + public void add(final Subscription s) { do { final Set existing = reference.get(); @@ -59,11 +57,11 @@ public void add(final Subscription s) { s.unsubscribe(); break; } - + if (existing == MUTATE_SENTINEL) { continue; } - + if (reference.compareAndSet(existing, MUTATE_SENTINEL)) { existing.add(s); reference.set(existing); @@ -71,7 +69,7 @@ public void add(final Subscription s) { } } while (true); } - + public void remove(final Subscription s) { do { final Set subscriptions = reference.get(); @@ -79,11 +77,11 @@ public void remove(final Subscription s) { s.unsubscribe(); break; } - + if (subscriptions == MUTATE_SENTINEL) { continue; } - + if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) { // also unsubscribe from it: // http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx @@ -94,36 +92,39 @@ public void remove(final Subscription s) { } } while (true); } - + public void clear() { do { final Set subscriptions = reference.get(); if (subscriptions == UNSUBSCRIBED_SENTINEL) { break; } - + if (subscriptions == MUTATE_SENTINEL) { continue; } - + if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) { final Set copy = new HashSet( subscriptions); subscriptions.clear(); reference.set(subscriptions); - + unsubscribeAll(copy); break; } } while (true); } + /** * Unsubscribe from the collection of subscriptions. *

* Exceptions thrown by any of the {@code unsubscribe()} methods are * collected into a {@link CompositeException} and thrown once * all unsubscriptions have been attempted. - * @param subs the collection of subscriptions + * + * @param subs + * the collection of subscriptions */ private void unsubscribeAll(Collection subs) { final Collection es = new ArrayList(); @@ -139,6 +140,7 @@ private void unsubscribeAll(Collection subs) { "Failed to unsubscribe to 1 or more subscriptions.", es); } } + @Override public void unsubscribe() { do { @@ -146,11 +148,11 @@ public void unsubscribe() { if (subscriptions == UNSUBSCRIBED_SENTINEL) { break; } - + if (subscriptions == MUTATE_SENTINEL) { continue; } - + if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_SENTINEL)) { unsubscribeAll(subscriptions); break; diff --git a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java index 8fed35fbbf..9f734dcccf 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,7 +15,6 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import rx.Observable; @@ -34,6 +33,7 @@ public class MultipleAssignmentSubscription implements Subscription { public void unsubscribe() { } }; + public boolean isUnsubscribed() { return reference.get() == UNSUBSCRIBED_SENTINEL; } diff --git a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java index a4747caa9b..18aa16cc43 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java @@ -33,6 +33,7 @@ private enum State { MUTATING, UNSUBSCRIBED } + /** The reference to the actual subscription. */ private volatile Subscription main; /** The current state. */ @@ -41,9 +42,11 @@ private enum State { private final AtomicInteger count = new AtomicInteger(); /** Indicate the request to unsubscribe from the main. */ private final AtomicBoolean mainDone = new AtomicBoolean(); + /** * Create a RefCountSubscription by wrapping the given non-null Subscription. - * @param s + * + * @param s */ public RefCountSubscription(Subscription s) { if (s == null) { @@ -51,6 +54,7 @@ public RefCountSubscription(Subscription s) { } this.main = s; } + /** * Returns a new sub-subscription. */ @@ -68,14 +72,16 @@ public Subscription getSubscription() { state.set(State.ACTIVE); return new InnerSubscription(); } - } while(true); + } while (true); } + /** * Check if this subscription is already unsubscribed. */ public boolean isUnsubscribed() { return state.get() == State.UNSUBSCRIBED; } + @Override public void unsubscribe() { do { @@ -96,7 +102,8 @@ public void unsubscribe() { } } while (true); } - /** + + /** * Terminate this subscription by unsubscribing from main and setting the * state to UNSUBSCRIBED. */ @@ -106,6 +113,7 @@ private void terminate() { main = null; r.unsubscribe(); } + /** Remove an inner subscription. */ void innerDone() { do { @@ -126,9 +134,11 @@ void innerDone() { } } while (true); } + /** The individual sub-subscriptions. */ class InnerSubscription implements Subscription { final AtomicBoolean innerDone = new AtomicBoolean(); + @Override public void unsubscribe() { if (innerDone.compareAndSet(false, true)) { diff --git a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java index e26a258acc..a0d8478130 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java @@ -35,9 +35,11 @@ public class SerialSubscription implements Subscription { public void unsubscribe() { } }; + public boolean isUnsubscribed() { return reference.get() == UNSUBSCRIBED_SENTINEL; } + @Override public void unsubscribe() { Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL); @@ -59,7 +61,7 @@ public void setSubscription(final Subscription subscription) { } } while (true); } - + public Subscription getSubscription() { final Subscription subscription = reference.get(); return subscription == UNSUBSCRIBED_SENTINEL ? Subscriptions.empty() : subscription; From 141ce98a07d79236a1493d682273075e62e9cf06 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 23 Dec 2013 11:41:42 -0800 Subject: [PATCH 4/8] Refactor RefCountSubscription - simplified logic - remove unnecessary busy spins and state changes --- .../subscriptions/RefCountSubscription.java | 129 ++++++++---------- 1 file changed, 56 insertions(+), 73 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java index 18aa16cc43..091b9dcb1e 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java @@ -16,8 +16,8 @@ package rx.subscriptions; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; + import rx.Subscription; /** @@ -27,21 +27,33 @@ * @see MSDN RefCountDisposable */ public class RefCountSubscription implements Subscription { - /** The state for the atomic operations. */ - private enum State { - ACTIVE, - MUTATING, - UNSUBSCRIBED - } - /** The reference to the actual subscription. */ - private volatile Subscription main; - /** The current state. */ - private final AtomicReference state = new AtomicReference(); - /** Counts the number of sub-subscriptions. */ - private final AtomicInteger count = new AtomicInteger(); - /** Indicate the request to unsubscribe from the main. */ - private final AtomicBoolean mainDone = new AtomicBoolean(); + private final Subscription actual; + /** Counts the number of subscriptions (1 parent + multiple children) */ + private final AtomicReference state = new AtomicReference(new State(false, 0)); + + private static final class State { + final boolean isUnsubscribed; + final int children; + + State(boolean u, int c) { + this.isUnsubscribed = u; + this.children = c; + } + + State addChild() { + return new State(isUnsubscribed, children + 1); + } + + State removeChild() { + return new State(isUnsubscribed, children - 1); + } + + State unsubscribe() { + return new State(true, children); + } + + } /** * Create a RefCountSubscription by wrapping the given non-null Subscription. @@ -52,87 +64,52 @@ public RefCountSubscription(Subscription s) { if (s == null) { throw new IllegalArgumentException("s"); } - this.main = s; + this.actual = s; } /** * Returns a new sub-subscription. */ public Subscription getSubscription() { + State current; + State newState; do { - State s = state.get(); - if (s == State.UNSUBSCRIBED) { + current = state.get(); + if (current.isUnsubscribed) { return Subscriptions.empty(); + } else { + newState = current.addChild(); } - if (s == State.MUTATING) { - continue; - } - if (state.compareAndSet(s, State.MUTATING)) { - count.incrementAndGet(); - state.set(State.ACTIVE); - return new InnerSubscription(); - } - } while (true); + } while (!state.compareAndSet(current, newState)); + + return new InnerSubscription(); } /** * Check if this subscription is already unsubscribed. */ public boolean isUnsubscribed() { - return state.get() == State.UNSUBSCRIBED; + return state.get().isUnsubscribed; } @Override public void unsubscribe() { + State current; + State newState; do { - State s = state.get(); - if (s == State.UNSUBSCRIBED) { + current = state.get(); + if (current.isUnsubscribed) { return; } - if (s == State.MUTATING) { - continue; - } - if (state.compareAndSet(s, State.MUTATING)) { - if (mainDone.compareAndSet(false, true) && count.get() == 0) { - terminate(); - return; - } - state.set(State.ACTIVE); - break; - } - } while (true); - } - - /** - * Terminate this subscription by unsubscribing from main and setting the - * state to UNSUBSCRIBED. - */ - private void terminate() { - state.set(State.UNSUBSCRIBED); - Subscription r = main; - main = null; - r.unsubscribe(); + newState = current.unsubscribe(); + } while (!state.compareAndSet(current, newState)); + unsubscribeActualIfApplicable(newState); } - /** Remove an inner subscription. */ - void innerDone() { - do { - State s = state.get(); - if (s == State.UNSUBSCRIBED) { - return; - } - if (s == State.MUTATING) { - continue; - } - if (state.compareAndSet(s, State.MUTATING)) { - if (count.decrementAndGet() == 0 && mainDone.get()) { - terminate(); - return; - } - state.set(State.ACTIVE); - break; - } - } while (true); + private void unsubscribeActualIfApplicable(State state) { + if (state.isUnsubscribed && state.children == 0) { + actual.unsubscribe(); + } } /** The individual sub-subscriptions. */ @@ -142,7 +119,13 @@ class InnerSubscription implements Subscription { @Override public void unsubscribe() { if (innerDone.compareAndSet(false, true)) { - innerDone(); + State current; + State newState; + do { + current = state.get(); + newState = current.removeChild(); + } while (!state.compareAndSet(current, newState)); + unsubscribeActualIfApplicable(newState); } } }; From cc87d1a8ac435c154c2d411e92d1dd4f545b8a25 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 23 Dec 2013 12:03:39 -0800 Subject: [PATCH 5/8] Refactor CompositeSubscription - simplified state machine - removed busy spin state --- .../subscriptions/CompositeSubscription.java | 178 +++++++++--------- 1 file changed, 84 insertions(+), 94 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index ee715df7f7..2b298c35b2 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -15,14 +15,10 @@ */ package rx.subscriptions; -import static java.util.Arrays.asList; -import static java.util.Collections.unmodifiableSet; - import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import rx.Subscription; @@ -35,103 +31,116 @@ * @see Rx.Net equivalent CompositeDisposable */ public class CompositeSubscription implements Subscription { - /** Sentinel to indicate a thread is modifying the subscription set. */ - private static final Set MUTATE_SENTINEL = unmodifiableSet(Collections. emptySet()); - /** Sentinel to indicate the entire CompositeSubscription has been unsubscribed. */ - private static final Set UNSUBSCRIBED_SENTINEL = unmodifiableSet(Collections. emptySet()); - /** The reference to the set of subscriptions. */ - private final AtomicReference> reference = new AtomicReference>(); + + private final AtomicReference state = new AtomicReference(); + + private static final class State { + final boolean isUnsubscribed; + final List subscriptions; + + State(boolean u, List s) { + this.isUnsubscribed = u; + this.subscriptions = s; + } + + State unsubscribe() { + return new State(true, subscriptions); + } + + State add(Subscription s) { + List newSubscriptions = new ArrayList(); + newSubscriptions.addAll(subscriptions); + newSubscriptions.add(s); + return new State(isUnsubscribed, newSubscriptions); + } + + State remove(Subscription s) { + List newSubscriptions = new ArrayList(); + newSubscriptions.addAll(subscriptions); + newSubscriptions.remove(s); // only first occurrence + return new State(isUnsubscribed, newSubscriptions); + } + + State clear() { + return new State(isUnsubscribed, new ArrayList()); + } + } public CompositeSubscription(final Subscription... subscriptions) { - reference.set(new HashSet(asList(subscriptions))); + state.set(new State(false, Arrays.asList(subscriptions))); } public boolean isUnsubscribed() { - return reference.get() == UNSUBSCRIBED_SENTINEL; + return state.get().isUnsubscribed; } public void add(final Subscription s) { + State current; + State newState; do { - final Set existing = reference.get(); - if (existing == UNSUBSCRIBED_SENTINEL) { + current = state.get(); + if (current.isUnsubscribed) { s.unsubscribe(); - break; - } - - if (existing == MUTATE_SENTINEL) { - continue; + return; + } else { + newState = current.add(s); } - - if (reference.compareAndSet(existing, MUTATE_SENTINEL)) { - existing.add(s); - reference.set(existing); - break; - } - } while (true); + } while (!state.compareAndSet(current, newState)); } public void remove(final Subscription s) { + State current; + State newState; do { - final Set subscriptions = reference.get(); - if (subscriptions == UNSUBSCRIBED_SENTINEL) { - s.unsubscribe(); - break; - } - - if (subscriptions == MUTATE_SENTINEL) { - continue; + current = state.get(); + if (current.isUnsubscribed) { + return; + } else { + newState = current.remove(s); } - - if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) { - // also unsubscribe from it: - // http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx - subscriptions.remove(s); - reference.set(subscriptions); - s.unsubscribe(); - break; - } - } while (true); + } while (!state.compareAndSet(current, newState)); + // if we removed successfully we then need to call unsubscribe on it + s.unsubscribe(); } public void clear() { + State current; + State newState; do { - final Set subscriptions = reference.get(); - if (subscriptions == UNSUBSCRIBED_SENTINEL) { - break; - } - - if (subscriptions == MUTATE_SENTINEL) { - continue; + current = state.get(); + if (current.isUnsubscribed) { + return; + } else { + newState = current.clear(); } + } while (!state.compareAndSet(current, newState)); + // if we cleared successfully we then need to call unsubscribe on all previous + // current is now "previous" + unsubscribeFromAll(current.subscriptions); + } - if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) { - final Set copy = new HashSet( - subscriptions); - subscriptions.clear(); - reference.set(subscriptions); - - unsubscribeAll(copy); - break; + @Override + public void unsubscribe() { + State current; + State newState; + do { + current = state.get(); + if (current.isUnsubscribed) { + return; + } else { + newState = current.unsubscribe(); } - } while (true); + } while (!state.compareAndSet(current, newState)); + // current is now "previous" + unsubscribeFromAll(current.subscriptions); } - /** - * Unsubscribe from the collection of subscriptions. - *

- * Exceptions thrown by any of the {@code unsubscribe()} methods are - * collected into a {@link CompositeException} and thrown once - * all unsubscriptions have been attempted. - * - * @param subs - * the collection of subscriptions - */ - private void unsubscribeAll(Collection subs) { + private static void unsubscribeFromAll(Collection subscriptions) { final Collection es = new ArrayList(); - for (final Subscription s : subs) { + for (Subscription s : subscriptions) { try { s.unsubscribe(); - } catch (final Throwable e) { + } catch (Throwable e) { es.add(e); } } @@ -140,23 +149,4 @@ private void unsubscribeAll(Collection subs) { "Failed to unsubscribe to 1 or more subscriptions.", es); } } - - @Override - public void unsubscribe() { - do { - final Set subscriptions = reference.get(); - if (subscriptions == UNSUBSCRIBED_SENTINEL) { - break; - } - - if (subscriptions == MUTATE_SENTINEL) { - continue; - } - - if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_SENTINEL)) { - unsubscribeAll(subscriptions); - break; - } - } while (true); - } } From 74b9d557a77109bbb354cd42f7195a8278534f38 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 23 Dec 2013 12:30:12 -0800 Subject: [PATCH 6/8] Refactor MultipleAssignment - simplified state machine --- .../MultipleAssignmentSubscription.java | 74 ++++++++++++++----- 1 file changed, 54 insertions(+), 20 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java index 9f734dcccf..f6a4da6c70 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java @@ -25,43 +25,77 @@ * * @see Rx.Net equivalent MultipleAssignmentDisposable */ -public class MultipleAssignmentSubscription implements Subscription { - private AtomicReference reference = new AtomicReference(); - /** Sentinel for the unsubscribed state. */ - private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() { - @Override - public void unsubscribe() { +public final class MultipleAssignmentSubscription implements Subscription { + + private final AtomicReference state = new AtomicReference(new State(false, Subscriptions.empty())); + + private static final class State { + final boolean isUnsubscribed; + final Subscription subscription; + + State(boolean u, Subscription s) { + this.isUnsubscribed = u; + this.subscription = s; + } + + State unsubscribe() { + return new State(true, subscription); } - }; + + State set(Subscription s) { + return new State(isUnsubscribed, s); + } + + } public boolean isUnsubscribed() { - return reference.get() == UNSUBSCRIBED_SENTINEL; + return state.get().isUnsubscribed; } @Override public void unsubscribe() { - Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL); - if (s != null) { - s.unsubscribe(); - } + State oldState; + State newState; + do { + oldState = state.get(); + if (oldState.isUnsubscribed) { + return; + } else { + newState = oldState.unsubscribe(); + } + } while (!state.compareAndSet(oldState, newState)); + oldState.subscription.unsubscribe(); } + @Deprecated public void setSubscription(Subscription s) { + set(s); + } + + public void set(Subscription s) { + if (s == null) { + throw new IllegalArgumentException("Subscription can not be null"); + } + State oldState; + State newState; do { - Subscription r = reference.get(); - if (r == UNSUBSCRIBED_SENTINEL) { + oldState = state.get(); + if (oldState.isUnsubscribed) { s.unsubscribe(); return; + } else { + newState = oldState.set(s); } - if (reference.compareAndSet(r, s)) { - break; - } - } while (true); + } while (!state.compareAndSet(oldState, newState)); } + @Deprecated public Subscription getSubscription() { - Subscription s = reference.get(); - return s != UNSUBSCRIBED_SENTINEL ? s : Subscriptions.empty(); + return get(); + } + + public Subscription get() { + return state.get().subscription; } } From 3ebaa3db5aaafd938d5ef034edf6f07aab01dfa9 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 23 Dec 2013 12:30:54 -0800 Subject: [PATCH 7/8] Refactor SerialSubscription - simplified state machine --- .../rx/subscriptions/SerialSubscription.java | 85 +++++++++++++------ 1 file changed, 59 insertions(+), 26 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java index a0d8478130..73a27ca62f 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java @@ -15,8 +15,6 @@ */ package rx.subscriptions; -import static rx.subscriptions.Subscriptions.empty; - import java.util.concurrent.atomic.AtomicReference; import rx.Subscription; @@ -27,43 +25,78 @@ * * @see Rx.Net equivalent SerialDisposable */ -public class SerialSubscription implements Subscription { - private final AtomicReference reference = new AtomicReference(empty()); - /** Sentinel for the unsubscribed state. */ - private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() { - @Override - public void unsubscribe() { +public final class SerialSubscription implements Subscription { + + private final AtomicReference state = new AtomicReference(new State(false, Subscriptions.empty())); + + private static final class State { + final boolean isUnsubscribed; + final Subscription subscription; + + State(boolean u, Subscription s) { + this.isUnsubscribed = u; + this.subscription = s; + } + + State unsubscribe() { + return new State(true, subscription); + } + + State set(Subscription s) { + return new State(isUnsubscribed, s); } - }; + + } public boolean isUnsubscribed() { - return reference.get() == UNSUBSCRIBED_SENTINEL; + return state.get().isUnsubscribed; } @Override public void unsubscribe() { - Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL); - if (s != null) { - s.unsubscribe(); - } + State oldState; + State newState; + do { + oldState = state.get(); + if (oldState.isUnsubscribed) { + return; + } else { + newState = oldState.unsubscribe(); + } + } while (!state.compareAndSet(oldState, newState)); + oldState.subscription.unsubscribe(); + } + + @Deprecated + public void setSubscription(Subscription s) { + set(s); } - public void setSubscription(final Subscription subscription) { + public void set(Subscription s) { + if (s == null) { + throw new IllegalArgumentException("Subscription can not be null"); + } + State oldState; + State newState; do { - final Subscription current = reference.get(); - if (current == UNSUBSCRIBED_SENTINEL) { - subscription.unsubscribe(); - break; - } - if (reference.compareAndSet(current, subscription)) { - current.unsubscribe(); - break; + oldState = state.get(); + if (oldState.isUnsubscribed) { + s.unsubscribe(); + return; + } else { + newState = oldState.set(s); } - } while (true); + } while (!state.compareAndSet(oldState, newState)); + oldState.subscription.unsubscribe(); } + @Deprecated public Subscription getSubscription() { - final Subscription subscription = reference.get(); - return subscription == UNSUBSCRIBED_SENTINEL ? Subscriptions.empty() : subscription; + return get(); } + + public Subscription get() { + return state.get().subscription; + } + } From 06642350d30362f9a1fe519f07873dd763309860 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 23 Dec 2013 12:32:07 -0800 Subject: [PATCH 8/8] Naming and Class Conventions - make concrete implementations final as extending them is dangerous (use composition and implement Subscription instead) - deprecated long get/setSubscription methods in favor of short verbs (add/get/set/clear/remove) - updated unit tests with changes --- .../rx/subscriptions/BooleanSubscription.java | 2 +- .../subscriptions/CompositeSubscription.java | 48 ++++++++--------- .../subscriptions/RefCountSubscription.java | 41 ++++++++------- .../java/rx/subscriptions/Subscriptions.java | 2 +- .../MultipleAssignmentSubscriptionTest.java | 52 ++++++++++++------- .../RefCountSubscriptionTest.java | 10 ++-- .../SerialSubscriptionTests.java | 42 ++++++--------- 7 files changed, 101 insertions(+), 96 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java index 31c904e47f..d6dfedd549 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java @@ -26,7 +26,7 @@ * * @see Rx.Net equivalent BooleanDisposable */ -public class BooleanSubscription implements Subscription { +public final class BooleanSubscription implements Subscription { private final AtomicBoolean unsubscribed = new AtomicBoolean(false); private final Action0 action; diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index 2b298c35b2..b0cf7cac23 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -30,7 +30,7 @@ * * @see Rx.Net equivalent CompositeDisposable */ -public class CompositeSubscription implements Subscription { +public final class CompositeSubscription implements Subscription { private final AtomicReference state = new AtomicReference(); @@ -75,64 +75,62 @@ public boolean isUnsubscribed() { } public void add(final Subscription s) { - State current; + State oldState; State newState; do { - current = state.get(); - if (current.isUnsubscribed) { + oldState = state.get(); + if (oldState.isUnsubscribed) { s.unsubscribe(); return; } else { - newState = current.add(s); + newState = oldState.add(s); } - } while (!state.compareAndSet(current, newState)); + } while (!state.compareAndSet(oldState, newState)); } public void remove(final Subscription s) { - State current; + State oldState; State newState; do { - current = state.get(); - if (current.isUnsubscribed) { + oldState = state.get(); + if (oldState.isUnsubscribed) { return; } else { - newState = current.remove(s); + newState = oldState.remove(s); } - } while (!state.compareAndSet(current, newState)); + } while (!state.compareAndSet(oldState, newState)); // if we removed successfully we then need to call unsubscribe on it s.unsubscribe(); } public void clear() { - State current; + State oldState; State newState; do { - current = state.get(); - if (current.isUnsubscribed) { + oldState = state.get(); + if (oldState.isUnsubscribed) { return; } else { - newState = current.clear(); + newState = oldState.clear(); } - } while (!state.compareAndSet(current, newState)); + } while (!state.compareAndSet(oldState, newState)); // if we cleared successfully we then need to call unsubscribe on all previous - // current is now "previous" - unsubscribeFromAll(current.subscriptions); + unsubscribeFromAll(oldState.subscriptions); } @Override public void unsubscribe() { - State current; + State oldState; State newState; do { - current = state.get(); - if (current.isUnsubscribed) { + oldState = state.get(); + if (oldState.isUnsubscribed) { return; } else { - newState = current.unsubscribe(); + newState = oldState.unsubscribe(); } - } while (!state.compareAndSet(current, newState)); - // current is now "previous" - unsubscribeFromAll(current.subscriptions); + } while (!state.compareAndSet(oldState, newState)); + unsubscribeFromAll(oldState.subscriptions); } private static void unsubscribeFromAll(Collection subscriptions) { diff --git a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java index 091b9dcb1e..cb310e315e 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/RefCountSubscription.java @@ -26,10 +26,8 @@ * * @see MSDN RefCountDisposable */ -public class RefCountSubscription implements Subscription { - /** The reference to the actual subscription. */ +public final class RefCountSubscription implements Subscription { private final Subscription actual; - /** Counts the number of subscriptions (1 parent + multiple children) */ private final AtomicReference state = new AtomicReference(new State(false, 0)); private static final class State { @@ -67,20 +65,25 @@ public RefCountSubscription(Subscription s) { this.actual = s; } + @Deprecated + public Subscription getSubscription() { + return get(); + } + /** * Returns a new sub-subscription. */ - public Subscription getSubscription() { - State current; + public Subscription get() { + State oldState; State newState; do { - current = state.get(); - if (current.isUnsubscribed) { + oldState = state.get(); + if (oldState.isUnsubscribed) { return Subscriptions.empty(); } else { - newState = current.addChild(); + newState = oldState.addChild(); } - } while (!state.compareAndSet(current, newState)); + } while (!state.compareAndSet(oldState, newState)); return new InnerSubscription(); } @@ -94,15 +97,15 @@ public boolean isUnsubscribed() { @Override public void unsubscribe() { - State current; + State oldState; State newState; do { - current = state.get(); - if (current.isUnsubscribed) { + oldState = state.get(); + if (oldState.isUnsubscribed) { return; } - newState = current.unsubscribe(); - } while (!state.compareAndSet(current, newState)); + newState = oldState.unsubscribe(); + } while (!state.compareAndSet(oldState, newState)); unsubscribeActualIfApplicable(newState); } @@ -113,18 +116,18 @@ private void unsubscribeActualIfApplicable(State state) { } /** The individual sub-subscriptions. */ - class InnerSubscription implements Subscription { + private final class InnerSubscription implements Subscription { final AtomicBoolean innerDone = new AtomicBoolean(); @Override public void unsubscribe() { if (innerDone.compareAndSet(false, true)) { - State current; + State oldState; State newState; do { - current = state.get(); - newState = current.removeChild(); - } while (!state.compareAndSet(current, newState)); + oldState = state.get(); + newState = oldState.removeChild(); + } while (!state.compareAndSet(oldState, newState)); unsubscribeActualIfApplicable(newState); } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java index 61fd6b3f6c..c636184dad 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java +++ b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java @@ -24,7 +24,7 @@ /** * Helper methods and utilities for creating and working with {@link Subscription} objects */ -public class Subscriptions { +public final class Subscriptions { /** * A {@link Subscription} that does nothing. * diff --git a/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java index 9fbed89153..3f82df5891 100644 --- a/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java +++ b/rxjava-core/src/test/java/rx/subscriptions/MultipleAssignmentSubscriptionTest.java @@ -15,54 +15,66 @@ */ package rx.subscriptions; +import static org.mockito.Mockito.*; +import static rx.subscriptions.Subscriptions.*; import junit.framework.Assert; + import org.junit.Before; import org.junit.Test; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; + import rx.Subscription; -import static rx.subscriptions.Subscriptions.create; import rx.util.functions.Action0; public class MultipleAssignmentSubscriptionTest { + Action0 unsubscribe; Subscription s; + @Before public void before() { - unsubscribe = mock(Action0.class); - s = create(unsubscribe); + unsubscribe = mock(Action0.class); + s = create(unsubscribe); } + @Test public void testNoUnsubscribeWhenReplaced() { MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); - - mas.setSubscription(s); - mas.setSubscription(null); - mas.unsubscribe(); - + + mas.set(s); + mas.set(Subscriptions.empty()); + mas.unsubscribe(); + verify(unsubscribe, never()).call(); - } + @Test public void testUnsubscribeWhenParentUnsubscribes() { MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); - mas.setSubscription(s); + mas.set(s); mas.unsubscribe(); mas.unsubscribe(); - + verify(unsubscribe, times(1)).call(); - + Assert.assertEquals(true, mas.isUnsubscribed()); } + @Test - public void testUnsubscribedDoesntLeakSentinel() { + public void subscribingWhenUnsubscribedCausesImmediateUnsubscription() { MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + mas.unsubscribe(); + Subscription underlying = mock(Subscription.class); + mas.set(underlying); + verify(underlying).unsubscribe(); + } - mas.setSubscription(s); + @Test + public void testSubscriptionRemainsAfterUnsubscribe() { + MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + + mas.set(s); mas.unsubscribe(); - - Assert.assertEquals(true, mas.getSubscription() == Subscriptions.empty()); + + Assert.assertEquals(true, mas.get() == s); } } \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/subscriptions/RefCountSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/RefCountSubscriptionTest.java index d11899f903..4d4cec95e1 100644 --- a/rxjava-core/src/test/java/rx/subscriptions/RefCountSubscriptionTest.java +++ b/rxjava-core/src/test/java/rx/subscriptions/RefCountSubscriptionTest.java @@ -47,7 +47,7 @@ public void testImmediateUnsubscribe() { public void testRCSUnsubscribeBeforeClient() { InOrder inOrder = inOrder(main); - Subscription s = rcs.getSubscription(); + Subscription s = rcs.get(); rcs.unsubscribe(); @@ -67,8 +67,8 @@ public void testRCSUnsubscribeBeforeClient() { public void testMultipleClientsUnsubscribeFirst() { InOrder inOrder = inOrder(main); - Subscription s1 = rcs.getSubscription(); - Subscription s2 = rcs.getSubscription(); + Subscription s1 = rcs.get(); + Subscription s2 = rcs.get(); s1.unsubscribe(); inOrder.verify(main, never()).call(); @@ -88,8 +88,8 @@ public void testMultipleClientsUnsubscribeFirst() { public void testMultipleClientsMainUnsubscribeFirst() { InOrder inOrder = inOrder(main); - Subscription s1 = rcs.getSubscription(); - Subscription s2 = rcs.getSubscription(); + Subscription s1 = rcs.get(); + Subscription s2 = rcs.get(); rcs.unsubscribe(); inOrder.verify(main, never()).call(); diff --git a/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java b/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java index 4afadb6f9c..9459242549 100644 --- a/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java +++ b/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java @@ -49,28 +49,20 @@ public void unsubscribingWithoutUnderlyingDoesNothing() { } @Test - public void getSubscriptionShouldReturnEmptySubscriptionAfterUnsubscribe() { + public void getSubscriptionShouldReturnset() { final Subscription underlying = mock(Subscription.class); - serialSubscription.setSubscription(underlying); - serialSubscription.unsubscribe(); - assertEquals(Subscriptions.empty(), serialSubscription.getSubscription()); - } - - @Test - public void getSubscriptionShouldReturnSetSubscription() { - final Subscription underlying = mock(Subscription.class); - serialSubscription.setSubscription(underlying); - assertSame(underlying, serialSubscription.getSubscription()); + serialSubscription.set(underlying); + assertSame(underlying, serialSubscription.get()); final Subscription another = mock(Subscription.class); - serialSubscription.setSubscription(another); - assertSame(another, serialSubscription.getSubscription()); + serialSubscription.set(another); + assertSame(another, serialSubscription.get()); } @Test public void unsubscribingTwiceDoesUnsubscribeOnce() { Subscription underlying = mock(Subscription.class); - serialSubscription.setSubscription(underlying); + serialSubscription.set(underlying); serialSubscription.unsubscribe(); verify(underlying).unsubscribe(); @@ -82,16 +74,16 @@ public void unsubscribingTwiceDoesUnsubscribeOnce() { @Test public void settingSameSubscriptionTwiceDoesUnsubscribeIt() { Subscription underlying = mock(Subscription.class); - serialSubscription.setSubscription(underlying); + serialSubscription.set(underlying); verifyZeroInteractions(underlying); - serialSubscription.setSubscription(underlying); + serialSubscription.set(underlying); verify(underlying).unsubscribe(); } @Test public void unsubscribingWithSingleUnderlyingUnsubscribes() { Subscription underlying = mock(Subscription.class); - serialSubscription.setSubscription(underlying); + serialSubscription.set(underlying); underlying.unsubscribe(); verify(underlying).unsubscribe(); } @@ -99,18 +91,18 @@ public void unsubscribingWithSingleUnderlyingUnsubscribes() { @Test public void replacingFirstUnderlyingCausesUnsubscription() { Subscription first = mock(Subscription.class); - serialSubscription.setSubscription(first); + serialSubscription.set(first); Subscription second = mock(Subscription.class); - serialSubscription.setSubscription(second); + serialSubscription.set(second); verify(first).unsubscribe(); } @Test public void whenUnsubscribingSecondUnderlyingUnsubscribed() { Subscription first = mock(Subscription.class); - serialSubscription.setSubscription(first); + serialSubscription.set(first); Subscription second = mock(Subscription.class); - serialSubscription.setSubscription(second); + serialSubscription.set(second); serialSubscription.unsubscribe(); verify(second).unsubscribe(); } @@ -119,7 +111,7 @@ public void whenUnsubscribingSecondUnderlyingUnsubscribed() { public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscription() { serialSubscription.unsubscribe(); Subscription underlying = mock(Subscription.class); - serialSubscription.setSubscription(underlying); + serialSubscription.set(underlying); verify(underlying).unsubscribe(); } @@ -127,7 +119,7 @@ public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscription() { public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscriptionConcurrently() throws InterruptedException { final Subscription firstSet = mock(Subscription.class); - serialSubscription.setSubscription(firstSet); + serialSubscription.set(firstSet); final CountDownLatch start = new CountDownLatch(1); @@ -155,7 +147,7 @@ public void run() { final Subscription underlying = mock(Subscription.class); start.countDown(); - serialSubscription.setSubscription(underlying); + serialSubscription.set(underlying); end.await(); verify(firstSet).unsubscribe(); verify(underlying).unsubscribe(); @@ -184,7 +176,7 @@ public void concurrentSetSubscriptionShouldNotInterleave() public void run() { try { start.await(); - serialSubscription.setSubscription(subscription); + serialSubscription.set(subscription); } catch (InterruptedException e) { fail(e.getMessage()); } finally {