Skip to content

Commit

Permalink
Merge pull request #793 from benjchristensen/subscriber-observer
Browse files Browse the repository at this point in the history
Observer + Subscriber
  • Loading branch information
benjchristensen committed Jan 31, 2014
2 parents ef12fe3 + 171ebc0 commit 025de00
Show file tree
Hide file tree
Showing 211 changed files with 1,848 additions and 1,385 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import groovy.lang.Closure;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;

public class GroovyCreateWrapper<T> implements OnSubscribe<T> {
Expand All @@ -29,7 +29,7 @@ public GroovyCreateWrapper(Closure<Void> closure) {
}

@Override
public void call(Observer<? super T> op) {
public void call(Subscriber<? super T> op) {
Object o = closure.call(op);
/*
* If the new signature is being used, we will get NULL back.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,21 @@
*/
package rx.lang.groovy

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;

import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.observables.GroupedObservable;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;
import static org.junit.Assert.*
import static org.mockito.Matchers.*
import static org.mockito.Mockito.*

import org.junit.Before
import org.junit.Test
import org.mockito.Mock
import org.mockito.MockitoAnnotations

import rx.Notification
import rx.Observable
import rx.Observer
import rx.Subscription
import rx.Observable.OnSubscribeFunc
import rx.subscriptions.Subscriptions

def class ObservableTests {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package rx.lang.jruby;

import org.jruby.RubyProc;
import org.jruby.Ruby;
import org.jruby.RubyProc;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.javasupport.JavaUtil;

import rx.util.functions.Action;
import rx.util.functions.Action0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package rx.lang.jruby;

import org.jruby.RubyProc;
import org.jruby.Ruby;
import org.jruby.RubyProc;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.javasupport.JavaUtil;

import rx.util.functions.Func0;
import rx.util.functions.Func1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package rx.lang.scala
trait Subject[T] extends Observable[T] with Observer[T] {
private [scala] val asJavaSubject: rx.subjects.Subject[_ >: T, _<: T]

val asJavaObservable: rx.Observable[_ <: T] = asJavaSubject.toObservable()
val asJavaObservable: rx.Observable[_ <: T] = asJavaSubject

override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubject
override def onNext(value: T): Unit = { asJavaObserver.onNext(value)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
package rx.android.observables;

import rx.Observable;
import rx.Observer;
import rx.operators.OperationObserveFromAndroidComponent;

import android.app.Activity;
import android.app.Fragment;
import android.os.Build;
import android.support.v4.app.FragmentActivity;


public final class AndroidObservable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package rx.android.schedulers;

import rx.Scheduler;
import android.os.Handler;
import android.os.Looper;
import rx.Scheduler;

/**
* Schedulers that have Android specific functionality
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
*/
package rx.android.schedulers;

import android.os.Handler;
import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.util.functions.Func2;

import java.util.concurrent.TimeUnit;
import android.os.Handler;

/**
* Schedules actions to run on an Android Handler thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,10 @@
import rx.Observer;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.subjects.PublishSubject;
import android.app.Activity;
import android.app.Fragment;
import android.os.Looper;
import android.util.Log;

import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class OperationObserveFromAndroidComponent {

public static <T> Observable<T> observeFromAndroidComponent(Observable<T> source, android.app.Fragment fragment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package rx.android.observables;

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -29,12 +29,9 @@
import rx.Observable;
import rx.Observer;
import rx.observers.TestObserver;
import rx.operators.OperationObserveFromAndroidComponent;
import android.app.Activity;
import android.app.Fragment;
import android.os.Build;
import android.support.v4.app.FragmentActivity;
import rx.android.observables.AndroidObservable;


@RunWith(RobolectricTestRunner.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.observers.TestObserver;
import rx.observers.TestSubscriber;
import rx.operators.OperationObserveFromAndroidComponent;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
Expand Down Expand Up @@ -149,8 +150,8 @@ public void itForwardsOnErrorToTargetObserver() {
public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone() throws Throwable {
PublishSubject<Integer> source = PublishSubject.create();

final Observable.OnSubscribeFunc<Integer> operator = newOnSubscribeFragmentInstance(source.toObservable(), mockFragment);
operator.onSubscribe(mockObserver);
final Observable.OnSubscribeFunc<Integer> operator = newOnSubscribeFragmentInstance(source, mockFragment);
operator.onSubscribe(new TestSubscriber<Integer>(mockObserver));

source.onNext(1);
releaseComponentRef(operator);
Expand All @@ -167,8 +168,8 @@ public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone() throws Thr
public void itDropsOnErrorIfTargetComponentIsGone() throws Throwable {
PublishSubject<Integer> source = PublishSubject.create();

final Observable.OnSubscribeFunc<Integer> operator = newOnSubscribeFragmentInstance(source.toObservable(), mockFragment);
operator.onSubscribe(mockObserver);
final Observable.OnSubscribeFunc<Integer> operator = newOnSubscribeFragmentInstance(source, mockFragment);
operator.onSubscribe(new TestSubscriber<Integer>(mockObserver));

source.onNext(1);
releaseComponentRef(operator);
Expand Down Expand Up @@ -203,7 +204,7 @@ private void releaseComponentRef(Observable.OnSubscribeFunc<Integer> operator) t
@Test
public void itDoesNotForwardOnNextOnCompletedSequenceIfFragmentIsDetached() {
PublishSubject<Integer> source = PublishSubject.create();
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source.toObservable(), mockFragment).subscribe(new TestObserver<Integer>(mockObserver));
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(new TestObserver<Integer>(mockObserver));

source.onNext(1);

Expand All @@ -219,7 +220,7 @@ public void itDoesNotForwardOnNextOnCompletedSequenceIfFragmentIsDetached() {
@Test
public void itDoesNotForwardOnErrorIfFragmentIsDetached() {
PublishSubject<Integer> source = PublishSubject.create();
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source.toObservable(), mockFragment).subscribe(new TestObserver<Integer>(mockObserver));
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(new TestObserver<Integer>(mockObserver));

source.onNext(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected void onResponseReceived(HttpResponse response) throws HttpException, I

@Override
public Subscription onSubscribe(Observer<? super byte[]> observer) {
parentSubscription.add(contentSubject.toObservable().subscribe(observer));
parentSubscription.add(contentSubject.subscribe(observer));
return parentSubscription;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -605,7 +606,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -662,7 +663,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -721,7 +722,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -782,7 +783,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -845,7 +846,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -910,7 +911,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -977,7 +978,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -1046,7 +1047,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -1117,7 +1118,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -1190,7 +1191,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -1243,7 +1244,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Observer;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
Expand All @@ -28,7 +29,7 @@
* a terminal state has been reached.
* @param <T> the observed value type
*/
abstract class LatchedObserver<T> extends Observer<T> {
abstract class LatchedObserver<T> implements Observer<T> {
/** The CountDownLatch to count-down on a terminal state. */
protected final CountDownLatch latch;
/** Contains the error. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
package rx.util.async.operators;

import java.util.concurrent.Future;

import rx.Observable;
import rx.Scheduler;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

/**
* Defer the execution of a factory method which produces an observable sequence.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

import rx.Observable;
import rx.Subscription;
import rx.util.Exceptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.util.async.operators;

import java.util.concurrent.Callable;

import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
Expand Down
Loading

0 comments on commit 025de00

Please sign in to comment.