Skip to content

Commit

Permalink
Change OnSubscribeFunc.call to OnSubscribeFunc.onSubscribe
Browse files Browse the repository at this point in the history
Avoid name collions for `call` method with Func* interfaces to simplify interop with Clojure etc.
  • Loading branch information
benjchristensen committed Sep 4, 2013
1 parent a1ad9c4 commit 3585570
Show file tree
Hide file tree
Showing 69 changed files with 166 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public GroovyOnSubscribeFuncWrapper(Closure<Subscription> closure) {
}

@Override
public Subscription call(Observer<? super T> observer) {
public Subscription onSubscribe(Observer<? super T> observer) {
return closure.call(observer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def class ObservableTests {

def class AsyncObservable implements OnSubscribeFunc {

public Subscription call(final Observer<Integer> observer) {
public Subscription onSubscribe(final Observer<Integer> observer) {
new Thread(new Runnable() {
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object RxImplicits {

implicit def onSubscribeFunc[A](f: (Observer[_ >: A]) => Subscription): OnSubscribeFunc[A] =
new OnSubscribeFunc[A] {
override def call(a: Observer[_ >: A]) = f(a)
override def onSubscribe(a: Observer[_ >: A]) = f(a)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public enum AbstractButtonSource { ; // no instances
public static Observable<ActionEvent> fromActionOf(final AbstractButton button) {
return Observable.create(new OnSubscribeFunc<ActionEvent>() {
@Override
public Subscription call(final Observer<? super ActionEvent> observer) {
public Subscription onSubscribe(final Observer<? super ActionEvent> observer) {
final ActionListener listener = new ActionListener() {
@Override
public void actionPerformed(ActionEvent e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public enum ComponentEventSource { ; // no instances
public static Observable<ComponentEvent> fromComponentEventsOf(final Component component) {
return Observable.create(new OnSubscribeFunc<ComponentEvent>() {
@Override
public Subscription call(final Observer<? super ComponentEvent> observer) {
public Subscription onSubscribe(final Observer<? super ComponentEvent> observer) {
final ComponentListener listener = new ComponentListener() {
@Override
public void componentHidden(ComponentEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public enum KeyEventSource { ; // no instances
public static Observable<KeyEvent> fromKeyEventsOf(final Component component) {
return Observable.create(new OnSubscribeFunc<KeyEvent>() {
@Override
public Subscription call(final Observer<? super KeyEvent> observer) {
public Subscription onSubscribe(final Observer<? super KeyEvent> observer) {
final KeyListener listener = new KeyListener() {
@Override
public void keyPressed(KeyEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public enum MouseEventSource { ; // no instances
public static Observable<MouseEvent> fromMouseEventsOf(final Component component) {
return Observable.create(new OnSubscribeFunc<MouseEvent>() {
@Override
public Subscription call(final Observer<? super MouseEvent> observer) {
public Subscription onSubscribe(final Observer<? super MouseEvent> observer) {
final MouseListener listener = new MouseListener() {
@Override
public void mouseClicked(MouseEvent event) {
Expand Down Expand Up @@ -81,7 +81,7 @@ public void call() {
public static Observable<MouseEvent> fromMouseMotionEventsOf(final Component component) {
return Observable.create(new OnSubscribeFunc<MouseEvent>() {
@Override
public Subscription call(final Observer<? super MouseEvent> observer) {
public Subscription onSubscribe(final Observer<? super MouseEvent> observer) {
final MouseMotionListener listener = new MouseMotionListener() {
@Override
public void mouseDragged(MouseEvent event) {
Expand Down
14 changes: 7 additions & 7 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ public class Observable<T> {
*
* @param <T>
*/
public static interface OnSubscribeFunc<T> extends Function<T> {
public static interface OnSubscribeFunc<T> extends Function {

public Subscription call(Observer<? super T> t1);
public Subscription onSubscribe(Observer<? super T> t1);

}

Expand All @@ -133,7 +133,7 @@ public static interface OnSubscribeFunc<T> extends Function<T> {
* specifically have a need for inheritance.
*
* @param onSubscribe
* {@link Func1} to be executed when {@link #subscribe(Observer)} is called.
* {@link OnSubscribeFunc} to be executed when {@link #subscribe(Observer)} is called.
*/
protected Observable(OnSubscribeFunc<T> onSubscribe) {
this.onSubscribe = onSubscribe;
Expand Down Expand Up @@ -187,7 +187,7 @@ public Subscription subscribe(Observer<? super T> observer) {
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
if (isInternalImplementation(observer)) {
Subscription s = onSubscribeFunction.call(observer);
Subscription s = onSubscribeFunction.onSubscribe(observer);
if (s == null) {
// this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens
// we want to gracefully handle it the same as AtomicObservableSubscription does
Expand All @@ -197,7 +197,7 @@ public Subscription subscribe(Observer<? super T> observer) {
}
} else {
SafeObservableSubscription subscription = new SafeObservableSubscription();
subscription.wrap(onSubscribeFunction.call(new SafeObserver<T>(subscription, observer)));
subscription.wrap(onSubscribeFunction.onSubscribe(new SafeObserver<T>(subscription, observer)));
return hook.onSubscribeReturn(this, subscription);
}
} catch (OnErrorNotImplementedException e) {
Expand Down Expand Up @@ -416,7 +416,7 @@ public NeverObservable() {
super(new OnSubscribeFunc<T>() {

@Override
public Subscription call(Observer<? super T> t1) {
public Subscription onSubscribe(Observer<? super T> t1) {
return Subscriptions.empty();
}

Expand All @@ -443,7 +443,7 @@ public ThrowObservable(final Throwable exception) {
* @return a reference to the subscription
*/
@Override
public Subscription call(Observer<? super T> observer) {
public Subscription onSubscribe(Observer<? super T> observer) {
observer.onError(exception);
return Subscriptions.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
return new BlockingObservable<T>(new OnSubscribeFunc<T>() {

@Override
public Subscription call(Observer<? super T> observer) {
public Subscription onSubscribe(Observer<? super T> observer) {
return o.subscribe(observer);
}
});
Expand Down Expand Up @@ -787,7 +787,7 @@ public void testToIterableWithException() {
BlockingObservable<String> obs = BlockingObservable.from(create(new OnSubscribeFunc<String>() {

@Override
public Subscription call(Observer<? super String> observer) {
public Subscription onSubscribe(Observer<? super String> observer) {
observer.onNext("one");
observer.onError(new TestException());
return Subscriptions.empty();
Expand All @@ -810,7 +810,7 @@ public void testForEachWithError() {
BlockingObservable.from(Observable.create(new OnSubscribeFunc<String>() {

@Override
public Subscription call(final Observer<? super String> observer) {
public Subscription onSubscribe(final Observer<? super String> observer) {
final BooleanSubscription subscription = new BooleanSubscription();
new Thread(new Runnable() {

Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/operators/OperationAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private AllObservable(Observable<? extends T> sequence, Func1<? super T, Boolean


@Override
public Subscription call(final Observer<? super Boolean> observer) {
public Subscription onSubscribe(final Observer<? super Boolean> observer) {
return subscription.wrap(sequence.subscribe(new AllObserver(observer)));

}
Expand Down
34 changes: 17 additions & 17 deletions rxjava-core/src/main/java/rx/operators/OperationBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public final class OperationBuffer {
public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<? extends T> source, final Func0<? extends Observable<? extends BufferClosing>> bufferClosingSelector) {
return new OnSubscribeFunc<List<T>>() {
@Override
public Subscription call(final Observer<? super List<T>> observer) {
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
NonOverlappingBuffers<T> buffers = new NonOverlappingBuffers<T>(observer);
BufferCreator<T> creator = new ObservableBasedSingleBufferCreator<T>(buffers, bufferClosingSelector);
return source.subscribe(new BufferObserver<T>(buffers, observer, creator));
Expand Down Expand Up @@ -111,7 +111,7 @@ public Subscription call(final Observer<? super List<T>> observer) {
public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<? extends T> source, final Observable<? extends BufferOpening> bufferOpenings, final Func1<? super BufferOpening, ? extends Observable<? extends BufferClosing>> bufferClosingSelector) {
return new OnSubscribeFunc<List<T>>() {
@Override
public Subscription call(final Observer<? super List<T>> observer) {
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
OverlappingBuffers<T> buffers = new OverlappingBuffers<T>(observer);
BufferCreator<T> creator = new ObservableBasedMultiBufferCreator<T>(buffers, bufferOpenings, bufferClosingSelector);
return source.subscribe(new BufferObserver<T>(buffers, observer, creator));
Expand Down Expand Up @@ -166,7 +166,7 @@ public static <T> OnSubscribeFunc<List<T>> buffer(Observable<? extends T> source
public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<? extends T> source, final int count, final int skip) {
return new OnSubscribeFunc<List<T>>() {
@Override
public Subscription call(final Observer<? super List<T>> observer) {
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
Buffers<T> buffers = new SizeBasedBuffers<T>(observer, count);
BufferCreator<T> creator = new SkippingBufferCreator<T>(buffers, skip);
return source.subscribe(new BufferObserver<T>(buffers, observer, creator));
Expand Down Expand Up @@ -221,7 +221,7 @@ public static <T> OnSubscribeFunc<List<T>> buffer(Observable<? extends T> source
public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<? extends T> source, final long timespan, final TimeUnit unit, final Scheduler scheduler) {
return new OnSubscribeFunc<List<T>>() {
@Override
public Subscription call(final Observer<? super List<T>> observer) {
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
NonOverlappingBuffers<T> buffers = new NonOverlappingBuffers<T>(observer);
BufferCreator<T> creator = new TimeBasedBufferCreator<T>(buffers, timespan, unit, scheduler);
return source.subscribe(new BufferObserver<T>(buffers, observer, creator));
Expand Down Expand Up @@ -282,7 +282,7 @@ public static <T> OnSubscribeFunc<List<T>> buffer(Observable<? extends T> source
public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<? extends T> source, final long timespan, final TimeUnit unit, final int count, final Scheduler scheduler) {
return new OnSubscribeFunc<List<T>>() {
@Override
public Subscription call(final Observer<? super List<T>> observer) {
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
Buffers<T> buffers = new TimeAndSizeBasedBuffers<T>(observer, count, timespan, unit, scheduler);
BufferCreator<T> creator = new SingleBufferCreator<T>(buffers);
return source.subscribe(new BufferObserver<T>(buffers, observer, creator));
Expand Down Expand Up @@ -343,7 +343,7 @@ public static <T> OnSubscribeFunc<List<T>> buffer(Observable<? extends T> source
public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<? extends T> source, final long timespan, final long timeshift, final TimeUnit unit, final Scheduler scheduler) {
return new OnSubscribeFunc<List<T>>() {
@Override
public Subscription call(final Observer<? super List<T>> observer) {
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
OverlappingBuffers<T> buffers = new TimeBasedBuffers<T>(observer, timespan, unit, scheduler);
BufferCreator<T> creator = new TimeBasedBufferCreator<T>(buffers, timeshift, unit, scheduler);
return source.subscribe(new BufferObserver<T>(buffers, observer, creator));
Expand Down Expand Up @@ -892,7 +892,7 @@ public void before() {
public void testComplete() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription call(Observer<? super String> observer) {
public Subscription onSubscribe(Observer<? super String> observer) {
observer.onCompleted();
return Subscriptions.empty();
}
Expand All @@ -910,7 +910,7 @@ public Subscription call(Observer<? super String> observer) {
public void testSkipAndCountOverlappingBuffers() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription call(Observer<? super String> observer) {
public Subscription onSubscribe(Observer<? super String> observer) {
observer.onNext("one");
observer.onNext("two");
observer.onNext("three");
Expand All @@ -936,7 +936,7 @@ public Subscription call(Observer<? super String> observer) {
public void testSkipAndCountGaplessBuffers() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription call(Observer<? super String> observer) {
public Subscription onSubscribe(Observer<? super String> observer) {
observer.onNext("one");
observer.onNext("two");
observer.onNext("three");
Expand All @@ -962,7 +962,7 @@ public Subscription call(Observer<? super String> observer) {
public void testSkipAndCountBuffersWithGaps() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription call(Observer<? super String> observer) {
public Subscription onSubscribe(Observer<? super String> observer) {
observer.onNext("one");
observer.onNext("two");
observer.onNext("three");
Expand All @@ -988,7 +988,7 @@ public Subscription call(Observer<? super String> observer) {
public void testTimedAndCount() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription call(Observer<? super String> observer) {
public Subscription onSubscribe(Observer<? super String> observer) {
push(observer, "one", 10);
push(observer, "two", 90);
push(observer, "three", 110);
Expand Down Expand Up @@ -1020,7 +1020,7 @@ public Subscription call(Observer<? super String> observer) {
public void testTimed() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription call(Observer<? super String> observer) {
public Subscription onSubscribe(Observer<? super String> observer) {
push(observer, "one", 98);
push(observer, "two", 99);
push(observer, "three", 100);
Expand Down Expand Up @@ -1049,7 +1049,7 @@ public Subscription call(Observer<? super String> observer) {
public void testObservableBasedOpenerAndCloser() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription call(Observer<? super String> observer) {
public Subscription onSubscribe(Observer<? super String> observer) {
push(observer, "one", 10);
push(observer, "two", 60);
push(observer, "three", 110);
Expand All @@ -1062,7 +1062,7 @@ public Subscription call(Observer<? super String> observer) {

Observable<BufferOpening> openings = Observable.create(new OnSubscribeFunc<BufferOpening>() {
@Override
public Subscription call(Observer<? super BufferOpening> observer) {
public Subscription onSubscribe(Observer<? super BufferOpening> observer) {
push(observer, BufferOpenings.create(), 50);
push(observer, BufferOpenings.create(), 200);
complete(observer, 250);
Expand All @@ -1075,7 +1075,7 @@ public Subscription call(Observer<? super BufferOpening> observer) {
public Observable<BufferClosing> call(BufferOpening opening) {
return Observable.create(new OnSubscribeFunc<BufferClosing>() {
@Override
public Subscription call(Observer<? super BufferClosing> observer) {
public Subscription onSubscribe(Observer<? super BufferClosing> observer) {
push(observer, BufferClosings.create(), 100);
complete(observer, 101);
return Subscriptions.empty();
Expand All @@ -1100,7 +1100,7 @@ public Subscription call(Observer<? super BufferClosing> observer) {
public void testObservableBasedCloser() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription call(Observer<? super String> observer) {
public Subscription onSubscribe(Observer<? super String> observer) {
push(observer, "one", 10);
push(observer, "two", 60);
push(observer, "three", 110);
Expand All @@ -1116,7 +1116,7 @@ public Subscription call(Observer<? super String> observer) {
public Observable<BufferClosing> call() {
return Observable.create(new OnSubscribeFunc<BufferClosing>() {
@Override
public Subscription call(Observer<? super BufferClosing> observer) {
public Subscription onSubscribe(Observer<? super BufferClosing> observer) {
push(observer, BufferClosings.create(), 100);
complete(observer, 101);
return Subscriptions.empty();
Expand Down
Loading

0 comments on commit 3585570

Please sign in to comment.