Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable interface #744

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
(let [f-name (gensym "rc")]
`(let [~f-name ~f]
(reify
; If they want Func1, give them onSubscribe as well so Observable/create can be
; If they want Func1, give them subscribe as well so Observable/create can be
; used seemlessly with rx/fn.
~@(if (and (= prefix "rx.util.functions.Func")
(some #{1} arities))
`(rx.Observable$OnSubscribeFunc
(~'onSubscribe [~'this observer#]
`(rx.IObservable
(~'subscribe [~'this observer#]
(~f-name observer#))))

~@(mapcat (clojure.core/fn [n]
Expand All @@ -39,7 +39,7 @@

If the f has the wrong arity, an ArityException will be thrown at runtime.

This will also implement rx.Observable$OnSubscribeFunc.onSubscribe for use with
This will also implement rx.IObservable.subscribe for use with
Observable/create. In this case, the function must take an Observable as its single
argument and return a subscription object.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
(deftest test-fn*
(testing "implements Func0-9"
(let [f (rx/fn* vector)]
(is (instance? rx.Observable$OnSubscribeFunc f))
(is (instance? rx.IObservable f))
(is (instance? rx.util.functions.Func0 f))
(is (instance? rx.util.functions.Func1 f))
(is (instance? rx.util.functions.Func2 f))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
package rx.lang.groovy;

import groovy.lang.Closure;
import rx.Observable.OnSubscribeFunc;
import rx.IObservable;
import rx.Observer;
import rx.Subscription;

/**
* Concrete wrapper that accepts a {@link Closure} and produces a {@link OnSubscribeFunc}.
* Concrete wrapper that accepts a {@link Closure} and produces a {@link IObservable}.
*
* @param <T>
*/
public class GroovyOnSubscribeFuncWrapper<T> implements OnSubscribeFunc<T> {
public class GroovyOnSubscribeFuncWrapper<T> implements IObservable<T> {

private final Closure<Subscription> closure;

Expand All @@ -34,7 +34,7 @@ public GroovyOnSubscribeFuncWrapper(Closure<Subscription> closure) {
}

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
public Subscription subscribe(Observer<? super T> observer) {
return closure.call(observer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.codehaus.groovy.runtime.metaclass.MetaClassRegistryImpl;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.IObservable;
import rx.observables.BlockingObservable;
import rx.util.functions.Action;
import rx.util.functions.Function;
Expand Down Expand Up @@ -109,7 +109,7 @@ public Object invoke(Object object, Object[] arguments) {
if (o instanceof Closure) {
if (Action.class.isAssignableFrom(m.getParameterTypes()[i])) {
newArgs[i] = new GroovyActionWrapper((Closure) o);
} else if(OnSubscribeFunc.class.isAssignableFrom(m.getParameterTypes()[i])) {
} else if(IObservable.class.isAssignableFrom(m.getParameterTypes()[i])) {
newArgs[i] = new GroovyOnSubscribeFuncWrapper((Closure) o);
} else {
newArgs[i] = new GroovyFunctionWrapper((Closure) o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.mockito.MockitoAnnotations;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.IObservable;
import rx.Observer;
import rx.Subscription;
import rx.observables.GroupedObservable;
Expand Down Expand Up @@ -483,9 +483,9 @@ def class ObservableTests {
assertEquals(expected, actual);
}

def class AsyncObservable implements OnSubscribeFunc {
def class AsyncObservable implements IObservable {

public Subscription onSubscribe(final Observer<Integer> observer) {
public Subscription subscribe(final Observer<Integer> observer) {
new Thread(new Runnable() {
public void run() {
try {
Expand Down Expand Up @@ -525,14 +525,14 @@ def class ObservableTests {
public void received(Object o);
}

def class TestOnSubscribe implements OnSubscribeFunc<String> {
def class TestOnSubscribe implements IObservable<String> {
private final int count;

public TestOnSubscribe(int count) {
this.count = count;
}

public Subscription onSubscribe(Observer<String> observer) {
public Subscription subscribe(Observer<String> observer) {

observer.onNext("hello_" + count);
observer.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ module Jruby
class Interop
WRAPPERS = {
Java::RxUtilFunctions::Action => Java::RxLangJruby::JRubyActionWrapper,
Java::Rx::Observable::OnSubscribeFunc => false
Java::Rx::IObservable => false
}

WRAPPERS.default = Java::RxLangJruby::JRubyFunctionWrapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
observable.subscribe(1)
end

it "doesn't wrap OnSubscribeFunc arguments" do
it "doesn't wrap IObservable arguments" do
proc = lambda {|observer| observer.onNext("hi")}
Java::Rx::Observable.__persistent__ = true
Java::Rx::Observable.should_not_receive(:create_without_wrapping)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.junit.Assert.*
import rx.Notification
import rx.Subscription
import kotlin.concurrent.thread
import rx.Observable.OnSubscribeFunc
import rx.IObservable
import rx.lang.kotlin.BasicKotlinTests.AsyncObservable

/**
Expand Down Expand Up @@ -344,8 +344,8 @@ public class BasicKotlinTests {

}

class AsyncObservable : OnSubscribeFunc<Int>{
override fun onSubscribe(t1: Observer<in Int>?): Subscription? {
class AsyncObservable : IObservable<Int>{
override fun subscribe(t1: Observer<in Int>?): Subscription? {
thread {
Thread.sleep(50)
t1!!.onNext(1)
Expand All @@ -357,8 +357,8 @@ public class BasicKotlinTests {
}
}

class TestOnSubscribe(val count: Int) : OnSubscribeFunc<String>{
override fun onSubscribe(t1: Observer<in String>?): Subscription? {
class TestOnSubscribe(val count: Int) : IObservable<String>{
override fun subscribe(t1: Observer<in String>?): Subscription? {
t1!!.onNext("hello_$count")
t1.onCompleted()
return Subscription { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ object ImplicitFunctionConversions {
}

implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) =
new rx.Observable.OnSubscribeFunc[T] {
def onSubscribe(obs: rx.Observer[_ >: T]): rx.Subscription = {
new rx.IObservable[T] {
def subscribe(obs: rx.Observer[_ >: T]): rx.Subscription = {
f(obs)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package rx.lang.scala

import rx.util.functions.FuncN
import rx.Observable.OnSubscribeFunc
import rx.IObservable
import rx.lang.scala.observables.ConnectableObservable


Expand Down Expand Up @@ -2081,8 +2081,8 @@ object Observable {
* an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given function.
*/
def create[T](func: Observer[T] => Subscription): Observable[T] = {
toScalaObservable[T](rx.Observable.create(new OnSubscribeFunc[T] {
def onSubscribe(t1: rx.Observer[_ >: T]): rx.Subscription = {
toScalaObservable[T](rx.Observable.create(new IObservable[T] {
def subscribe(t1: rx.Observer[_ >: T]): rx.Subscription = {
func(Observer(t1))
}
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class CompletenessTest extends JUnitSuite {
"averageDoubles(Observable[Double])" -> averageProblem,
"averageFloats(Observable[Float])" -> averageProblem,
"averageLongs(Observable[Long])" -> averageProblem,
"create(OnSubscribeFunc[T])" -> "apply(Observer[T] => Subscription)",
"create(IObservable[T])" -> "apply(Observer[T] => Subscription)",
"combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])",
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
"defer(Func0[_ <: Observable[_ <: T]])" -> "defer(=> Observable[T])",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static <T> Observable<T> observeFromAndroidComponent(Observable<T> source
return Observable.create(new OnSubscribeBase<T, Activity>(source, activity));
}

private static class OnSubscribeBase<T, AndroidComponent> implements Observable.OnSubscribeFunc<T> {
private static class OnSubscribeBase<T, AndroidComponent> implements IObservable<T> {

private static final String LOG_TAG = "AndroidObserver";

Expand All @@ -88,7 +88,7 @@ protected boolean isComponentValid(AndroidComponent component) {
}

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
public Subscription subscribe(Observer<? super T> observer) {
assertUiThread();
observerRef = observer;
final Subscription sourceSub = source.observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<T>() {
Expand Down Expand Up @@ -232,7 +232,7 @@ public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone() throws Thr
PublishSubject<Integer> source = PublishSubject.create();

final OnSubscribeFragment<Integer> operator = new OnSubscribeFragment<Integer>(source, mockFragment);
operator.onSubscribe(mockObserver);
operator.subscribe(mockObserver);

source.onNext(1);
releaseComponentRef(operator);
Expand All @@ -250,7 +250,7 @@ public void itDropsOnErrorIfTargetComponentIsGone() throws Throwable {
PublishSubject<Integer> source = PublishSubject.create();

final OnSubscribeFragment<Integer> operator = new OnSubscribeFragment<Integer>(source, mockFragment);
operator.onSubscribe(mockObserver);
operator.subscribe(mockObserver);

source.onNext(1);
releaseComponentRef(operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.IObservable;
import rx.Observer;
import rx.Subscription;
import rx.apache.http.consumers.ResponseConsumerDelegate;
Expand Down Expand Up @@ -78,22 +78,22 @@
*/
public class ObservableHttp<T> {

private final OnSubscribeFunc<T> onSubscribe;
private final IObservable<T> onSubscribe;

private ObservableHttp(OnSubscribeFunc<T> onSubscribe) {
private ObservableHttp(IObservable<T> onSubscribe) {
this.onSubscribe = onSubscribe;
}

private static <T> ObservableHttp<T> create(OnSubscribeFunc<T> onSubscribe) {
private static <T> ObservableHttp<T> create(IObservable<T> onSubscribe) {
return new ObservableHttp<T>(onSubscribe);
}

public Observable<T> toObservable() {
return Observable.create(new OnSubscribeFunc<T>() {
return Observable.create(new IObservable<T>() {

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
return onSubscribe.onSubscribe(observer);
public Subscription subscribe(Observer<? super T> observer) {
return onSubscribe.subscribe(observer);
}
});
}
Expand Down Expand Up @@ -135,10 +135,10 @@ public static ObservableHttp<ObservableHttpResponse> createGet(String uri, final
*/
public static ObservableHttp<ObservableHttpResponse> createRequest(final HttpAsyncRequestProducer requestProducer, final HttpAsyncClient client) {

return ObservableHttp.create(new OnSubscribeFunc<ObservableHttpResponse>() {
return ObservableHttp.create(new IObservable<ObservableHttpResponse>() {

@Override
public Subscription onSubscribe(final Observer<? super ObservableHttpResponse> observer) {
public Subscription subscribe(final Observer<? super ObservableHttpResponse> observer) {

final CompositeSubscription parentSubscription = new CompositeSubscription();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.http.protocol.HttpContext;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.IObservable;
import rx.Observer;
import rx.Subscription;
import rx.apache.http.ObservableHttpResponse;
Expand Down Expand Up @@ -68,10 +68,10 @@ public void _onEntityEnclosed(HttpEntity entity, ContentType contentType) throws
public HttpResponse _buildResult(HttpContext context) throws Exception {
final HttpResponse response = buildResult(context);

Observable<byte[]> contentObservable = Observable.create(new OnSubscribeFunc<byte[]>() {
Observable<byte[]> contentObservable = Observable.create(new IObservable<byte[]>() {

@Override
public Subscription onSubscribe(Observer<? super byte[]> o) {
public Subscription subscribe(Observer<? super byte[]> o) {
long length = response.getEntity().getContentLength();
if (length > Integer.MAX_VALUE) {
o.onError(new IllegalStateException("Content Length too large for a byte[] => " + length));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.http.protocol.HttpContext;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.IObservable;
import rx.Observer;
import rx.Subscription;
import rx.apache.http.ObservableHttpResponse;
Expand Down Expand Up @@ -81,10 +81,10 @@ protected void onByteReceived(ByteBuffer buf, IOControl ioctrl) throws IOExcepti
protected void onResponseReceived(HttpResponse response) throws HttpException, IOException {

// wrap the contentSubject so we can chain the Subscription between parent and child
Observable<byte[]> contentObservable = Observable.create(new OnSubscribeFunc<byte[]>() {
Observable<byte[]> contentObservable = Observable.create(new IObservable<byte[]>() {

@Override
public Subscription onSubscribe(Observer<? super byte[]> observer) {
public Subscription subscribe(Observer<? super byte[]> observer) {
parentSubscription.add(contentSubject.subscribe(observer));
return parentSubscription;
}
Expand Down
Loading