Skip to content

Commit

Permalink
Merge pull request #1950 from zsxwing/unsubscribed
Browse files Browse the repository at this point in the history
Add "Subscriptions.unsubscribed" to fix the 'isUnsubscribed' issue
  • Loading branch information
benjchristensen committed Dec 13, 2014
2 parents 162e042 + 35f2807 commit ba85468
Show file tree
Hide file tree
Showing 13 changed files with 67 additions and 26 deletions.
4 changes: 2 additions & 2 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7321,7 +7321,7 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
}

Expand Down Expand Up @@ -7410,7 +7410,7 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Subscription call(
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
timeoutSubscriber.onError(t);
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
return o.unsafeSubscribe(new Subscriber<U>() {

Expand All @@ -72,7 +72,7 @@ public void onNext(U t) {

});
} else {
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
}
}, new TimeoutStub<T>() {
Expand All @@ -87,7 +87,7 @@ public Subscription call(
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
timeoutSubscriber.onError(t);
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
return o.unsafeSubscribe(new Subscriber<V>() {

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/schedulers/NewThreadWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Subscription schedule(final Action0 action) {
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed) {
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
return scheduleActual(action, delayTime, unit);
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/schedulers/CachedThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public Subscription schedule(Action0 action) {
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}

ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/schedulers/EventLoopsScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public Subscription schedule(Action0 action) {
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}

ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/rx/schedulers/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public ExecutorSchedulerWorker(Executor executor) {
@Override
public Subscription schedule(Action0 action) {
if (isUnsubscribed()) {
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
ExecutorAction ea = new ExecutorAction(action, tasks);
tasks.add(ea);
Expand Down Expand Up @@ -106,7 +106,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
return schedule(action);
}
if (isUnsubscribed()) {
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
ScheduledExecutorService service;
if (executor instanceof ScheduledExecutorService) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/schedulers/ImmediateScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
@Override
public Subscription schedule(Action0 action) {
action.call();
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/rx/schedulers/TrampolineScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {

private Subscription enqueue(Action0 action, long execTime) {
if (innerSubscription.isUnsubscribed()) {
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
final TimedAction timedAction = new TimedAction(action, execTime, COUNTER_UPDATER.incrementAndGet(TrampolineScheduler.this));
queue.add(timedAction);
Expand All @@ -81,7 +81,7 @@ private Subscription enqueue(Action0 action, long execTime) {
polled.action.call();
}
} while (wip.decrementAndGet() > 0);
return Subscriptions.empty();
return Subscriptions.unsubscribed();
} else {
// queue wasn't empty, a parent is already processing so we just add to the end of the queue
return Subscriptions.create(new Action0() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@
* if unsubscribed.
*/
public final class MultipleAssignmentSubscription implements Subscription {
/** The shared empty state. */
static final State EMPTY_STATE = new State(false, Subscriptions.empty());
volatile State state = EMPTY_STATE;

volatile State state = new State(false, Subscriptions.empty());
static final AtomicReferenceFieldUpdater<MultipleAssignmentSubscription, State> STATE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(MultipleAssignmentSubscription.class, State.class, "state");

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/subscriptions/RefCountSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public Subscription get() {
do {
oldState = state;
if (oldState.isUnsubscribed) {
return Subscriptions.empty();
return Subscriptions.unsubscribed();
} else {
newState = oldState.addChild();
}
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/rx/subscriptions/SerialSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
* the previous underlying subscription to be unsubscribed.
*/
public final class SerialSubscription implements Subscription {
static final State EMPTY_STATE = new State(false, Subscriptions.empty());
volatile State state = EMPTY_STATE;
volatile State state = new State(false, Subscriptions.empty());
static final AtomicReferenceFieldUpdater<SerialSubscription, State> STATE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(SerialSubscription.class, State.class, "state");

Expand Down
41 changes: 34 additions & 7 deletions src/main/java/rx/subscriptions/Subscriptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import rx.Subscription;
import rx.annotations.Experimental;
import rx.functions.Action0;
import rx.functions.Actions;

Expand All @@ -30,12 +31,38 @@ private Subscriptions() {
throw new IllegalStateException("No instances!");
}
/**
* Returns a {@link Subscription} that does nothing.
*
* @return a {@link Subscription} that does nothing
* Returns a {@link Subscription} that <code>unsubscribe</code> does nothing except changing
* <code>isUnsubscribed</code> to true. It's stateful and <code>isUnsubscribed</code>
* indicates if <code>unsubscribe</code> is called, which is different from {@link #unsubscribed()}.
*
* <pre><code>
* Subscription empty = Subscriptions.empty();
* System.out.println(empty.isUnsubscribed()); // false
* empty.unsubscribe();
* System.out.println(empty.isUnsubscribed()); // true
* </code></pre>
*
* @return a {@link Subscription} that <code>unsubscribe</code> does nothing except changing
* <code>isUnsubscribed</code> to true.
*/
public static Subscription empty() {
return EMPTY;
return BooleanSubscription.create();
}

/**
* Returns a {@link Subscription} that <code>unsubscribe</code> does nothing but is already unsubscribed.
* Its <code>isUnsubscribed</code> always return true, which is different from {@link #empty()}.
*
* <pre><code>
* Subscription unsubscribed = Subscriptions.unsubscribed();
* System.out.println(unsubscribed.isUnsubscribed()); // true
* </code></pre>
*
* @return a {@link Subscription} that <code>unsubscribe</code> does nothing but is already unsubscribed.
*/
@Experimental
public static Subscription unsubscribed() {
return UNSUBSCRIBED;
}

/**
Expand Down Expand Up @@ -94,16 +121,16 @@ public static CompositeSubscription from(Subscription... subscriptions) {
/**
* A {@link Subscription} that does nothing when its unsubscribe method is called.
*/
private static final Empty EMPTY = new Empty();
private static final Unsubscribed UNSUBSCRIBED = new Unsubscribed();
/** Naming classes helps with debugging. */
private static final class Empty implements Subscription {
private static final class Unsubscribed implements Subscription {
@Override
public void unsubscribe() {
}

@Override
public boolean isUnsubscribed() {
return false;
return true;
}
}
}
16 changes: 16 additions & 0 deletions src/test/java/rx/subscriptions/SubscriptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package rx.subscriptions;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -35,4 +37,18 @@ public void testUnsubscribeOnlyOnce() {
subscription.unsubscribe();
verify(unsubscribe, times(1)).call();
}

@Test
public void testEmpty() {
Subscription empty = Subscriptions.empty();
assertFalse(empty.isUnsubscribed());
empty.unsubscribe();
assertTrue(empty.isUnsubscribed());
}

@Test
public void testUnsubscribed() {
Subscription unsubscribed = Subscriptions.unsubscribed();
assertTrue(unsubscribed.isUnsubscribed());
}
}

0 comments on commit ba85468

Please sign in to comment.