Skip to content

Commit

Permalink
Subscriptions utility class and rx.subscriptions package
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Mar 12, 2013
1 parent 4e2b666 commit d8d305e
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 67 deletions.
64 changes: 5 additions & 59 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import rx.operators.OperatorToIterator;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.Subscriptions;
import rx.util.AtomicObservableSubscription;
import rx.util.AtomicObserver;
import rx.util.Range;
Expand Down Expand Up @@ -529,20 +530,13 @@ public NeverObservable() {

@Override
public Subscription call(Observer<T> t1) {
return new NoOpObservableSubscription();
return Subscriptions.empty();
}

}, true);
}
}

/**
* A {@link Subscription} that does nothing when its unsubscribe method is called.
*/
private static class NoOpObservableSubscription implements Subscription {
public void unsubscribe() {
}
}

/**
* an Observable that calls {@link Observer#onError(Exception)} when the Observer subscribes.
Expand All @@ -565,7 +559,7 @@ public ThrowObservable(final Exception exception) {
@Override
public Subscription call(Observer<T> observer) {
observer.onError(exception);
return new NoOpObservableSubscription();
return Subscriptions.empty();
}

}, true);
Expand Down Expand Up @@ -1226,54 +1220,6 @@ public static <T> Observable<T> never() {
return new NeverObservable<T>();
}

/**
* A {@link Subscription} that does nothing.
*
* //TODO should this be moved to a Subscriptions utility class?
*
* @return
*/
public static Subscription noOpSubscription() {
return new NoOpObservableSubscription();
}

/**
* A {@link Subscription} implemented via a Func
*
* //TODO should this be moved to a Subscriptions utility class?
*
* @return
*/
public static Subscription createSubscription(final Action0 unsubscribe) {
return new Subscription() {

@Override
public void unsubscribe() {
unsubscribe.call();
}

};
}

/**
* A {@link Subscription} implemented via an anonymous function (such as closures from other languages).
*
* //TODO should this be moved to a Subscriptions utility class?
*
* @return
*/
public static Subscription createSubscription(final Object unsubscribe) {
final FuncN<?> f = Functions.from(unsubscribe);
return new Subscription() {

@Override
public void unsubscribe() {
f.call();
}

};
}

/**
* Instruct an Observable to pass control to another Observable (the return value of a function)
* rather than calling <code>onError</code> if it encounters an error.
Expand Down Expand Up @@ -3172,7 +3118,7 @@ public Subscription call(Observer<String> Observer) {
Observer.onNext("two");
Observer.onNext("three");
Observer.onCompleted();
return Observable.noOpSubscription();
return Subscriptions.empty();
}

});
Expand Down Expand Up @@ -3257,7 +3203,7 @@ public void testToIterableWithException() {
public Subscription call(Observer<String> observer) {
observer.onNext("one");
observer.onError(new TestException());
return Observable.noOpSubscription();
return Subscriptions.empty();
}
});

Expand Down
7 changes: 7 additions & 0 deletions rxjava-core/src/main/java/rx/Subscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
*/
package rx;

import rx.subscriptions.Subscriptions;

/**
* Subscription returns from {@link Observable#subscribe(Observer)} to allow unsubscribing.
* <p>
* See utilities in {@link Subscriptions} and implementations in the {@link rx.subscriptions} package.
*/
public interface Subscription {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
import rx.util.functions.Func3;
Expand Down Expand Up @@ -798,7 +799,7 @@ private static class TestObservable extends Observable<String> {
public Subscription subscribe(Observer<String> Observer) {
// just store the variable where it can be accessed so we can manually trigger it
this.Observer = Observer;
return Observable.noOpSubscription();
return Subscriptions.empty();
}

}
Expand Down
3 changes: 2 additions & 1 deletion rxjava-core/src/main/java/rx/operators/OperationNext.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.Exceptions;
import rx.util.functions.Func1;

Expand Down Expand Up @@ -333,7 +334,7 @@ public void run() {
}
}
}).start();
return Observable.noOpSubscription();
return Subscriptions.empty();
}

});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.AtomicObservableSubscription;
import rx.util.CompositeException;
import rx.util.functions.Func1;
Expand Down Expand Up @@ -114,7 +115,7 @@ public void testResumeNextWithSynchronousExecution() {
public Subscription call(Observer<String> observer) {
observer.onNext("one");
observer.onError(new Exception("injected failure"));
return Observable.noOpSubscription();
return Subscriptions.empty();
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

public class OperationToObservableFuture {
Expand Down Expand Up @@ -45,7 +45,7 @@ public Subscription call(Observer<T> observer) {

// the get() has already completed so there is no point in
// giving the user a way to cancel.
return Observable.noOpSubscription();
return Subscriptions.empty();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

/**
Expand Down Expand Up @@ -53,7 +54,7 @@ public Subscription call(Observer<T> observer) {
}
observer.onCompleted();

return Observable.noOpSubscription();
return Subscriptions.empty();
}
}

Expand Down
3 changes: 2 additions & 1 deletion rxjava-core/src/main/java/rx/operators/OperationZip.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.AtomicObservableSubscription;
import rx.util.SynchronizedObserver;
import rx.util.functions.Func1;
Expand Down Expand Up @@ -805,7 +806,7 @@ private static class TestObservable extends Observable<String> {
public Subscription subscribe(Observer<String> Observer) {
// just store the variable where it can be accessed so we can manually trigger it
this.Observer = Observer;
return Observable.noOpSubscription();
return Subscriptions.empty();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.Exceptions;
import rx.util.functions.Func1;

Expand Down Expand Up @@ -116,7 +117,7 @@ public void testToIteratorWithException() {
public Subscription call(Observer<String> observer) {
observer.onNext("one");
observer.onError(new TestException());
return Observable.noOpSubscription();
return Subscriptions.empty();
}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package rx.subscriptions;

import java.util.concurrent.atomic.AtomicBoolean;

import rx.Observable;
import rx.Subscription;

/**
* Subscription that can be checked for status such as in a loop inside an {@link Observable} to exit the loop if unsubscribed.
*
* @see Rx.Net equivalent BooleanDisposable at http://msdn.microsoft.com/en-us/library/system.reactive.disposables.booleandisposable(v=vs.103).aspx
*/
public class BooleanSubscription implements Subscription {

private final AtomicBoolean unsubscribed = new AtomicBoolean(false);

public boolean isUnsubscribed() {
return unsubscribed.get();
}

@Override
public void unsubscribe() {
unsubscribed.set(false);
}

}
58 changes: 58 additions & 0 deletions rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package rx.subscriptions;

import rx.Subscription;
import rx.util.functions.Action0;
import rx.util.functions.FuncN;
import rx.util.functions.Functions;

public class Subscriptions {
/**
* A {@link Subscription} that does nothing.
*
* @return {@link Subscription}
*/
public static Subscription empty() {
return new EmptySubscription();
}

/**
* A {@link Subscription} implemented via a Func
*
* @return {@link Subscription}
*/
public static Subscription createSubscription(final Action0 unsubscribe) {
return new Subscription() {

@Override
public void unsubscribe() {
unsubscribe.call();
}

};
}

/**
* A {@link Subscription} implemented via an anonymous function (such as closures from other languages).
*
* @return {@link Subscription}
*/
public static Subscription createSubscription(final Object unsubscribe) {
final FuncN<?> f = Functions.from(unsubscribe);
return new Subscription() {

@Override
public void unsubscribe() {
f.call();
}

};
}

/**
* A {@link Subscription} that does nothing when its unsubscribe method is called.
*/
private static class EmptySubscription implements Subscription {
public void unsubscribe() {
}
}
}

0 comments on commit d8d305e

Please sign in to comment.