From 38a113794f7c4a4ec64b1eea15f9868d4de868f4 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 5 Feb 2014 10:04:29 +0100 Subject: [PATCH] Operator Repeat and other operator fixes --- rxjava-core/src/main/java/rx/Observable.java | 39 +++++- .../java/rx/operators/OperationObserveOn.java | 129 ------------------ .../java/rx/operators/OperationRepeat.java | 80 ----------- .../java/rx/operators/OperatorGroupBy.java | 12 +- .../java/rx/operators/OperatorObserveOn.java | 67 +++++++++ .../java/rx/operators/OperatorRepeat.java | 103 ++++++++++++++ .../main/java/rx/operators/QueueDrain.java | 106 ++++++++++++++ ...OnTest.java => OperatorObserveOnTest.java} | 5 +- ...epeatTest.java => OperatorRepeatTest.java} | 55 +++++++- .../java/rx/operators/OperatorTakeTest.java | 15 +- 10 files changed, 386 insertions(+), 225 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationObserveOn.java delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationRepeat.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorRepeat.java create mode 100644 rxjava-core/src/main/java/rx/operators/QueueDrain.java rename rxjava-core/src/test/java/rx/operators/{OperationObserveOnTest.java => OperatorObserveOnTest.java} (98%) rename rxjava-core/src/test/java/rx/operators/{OperationRepeatTest.java => OperatorRepeatTest.java} (52%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5337bf3123..bfa6ab2d31 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -63,13 +63,11 @@ import rx.operators.OperationMergeDelayError; import rx.operators.OperationMinMax; import rx.operators.OperationMulticast; -import rx.operators.OperationObserveOn; import rx.operators.OperationOnErrorResumeNextViaFunction; import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; import rx.operators.OperationOnExceptionResumeNextViaObservable; import rx.operators.OperationParallelMerge; -import rx.operators.OperationRepeat; import rx.operators.OperationReplay; import rx.operators.OperationRetry; import rx.operators.OperationSample; @@ -103,7 +101,9 @@ import rx.operators.OperatorGroupBy; import rx.operators.OperatorMap; import rx.operators.OperatorMerge; +import rx.operators.OperatorObserveOn; import rx.operators.OperatorParallel; +import rx.operators.OperatorRepeat; import rx.operators.OperatorTake; import rx.operators.OperatorTimestamp; import rx.operators.OperatorToObservableList; @@ -5172,7 +5172,7 @@ public final ConnectableObservable multicast(SubjectRxJava Wiki: observeOn() */ public final Observable observeOn(Scheduler scheduler) { - return create(OperationObserveOn.observeOn(this, scheduler)); + return lift(new OperatorObserveOn(scheduler)); } /** @@ -5549,7 +5549,7 @@ public final Observable reduce(R initialValue, Func2 acc * @see MSDN: Observable.Repeat */ public final Observable repeat() { - return this.repeat(Schedulers.currentThread()); + return from(this).lift(new OperatorRepeat(-1)); } /** @@ -5566,9 +5566,38 @@ public final Observable repeat() { * @see MSDN: Observable.Repeat */ public final Observable repeat(Scheduler scheduler) { - return create(OperationRepeat.repeat(this, scheduler)); + return repeat().observeOn(scheduler); } + /** + * Returns an Observable that repeats the sequence of items emitted by the source + * Observable at most count times. + * @param count the number of times the source Observable items are repeated, + * a count of 0 will yield an empty sequence. + * @return an Observable that repeats the sequence of items emitted by the source + * Observable at most count times. + */ + public final Observable repeat(long count) { + if (count < 0) { + throw new IllegalArgumentException("count >= 0 expected"); + } + return from(this).lift(new OperatorRepeat(count)); + } + + /** + * Returns an Observable that repeats the sequence of items emitted by the source + * Observable at most count times on a particular scheduler. + * @param count the number of times the source Observable items are repeated, + * a count of 0 will yield an empty sequence. + * @param scheduler + * the scheduler to emit the items on + * @return an Observable that repeats the sequence of items emitted by the source + * Observable at most count times on a particular scheduler. + */ + public final Observable repeat(long count, Scheduler scheduler) { + return repeat(count).observeOn(scheduler); + } + /** * Returns a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable that will replay all of its items and notifications to any future {@link Observer}. diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java deleted file mode 100644 index c3addd9933..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicLong; - -import rx.Notification; -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Scheduler; -import rx.Scheduler.Inner; -import rx.Subscription; -import rx.schedulers.ImmediateScheduler; -import rx.schedulers.TrampolineScheduler; -import rx.subscriptions.CompositeSubscription; -import rx.util.functions.Action1; - -/** - * Asynchronously notify Observers on the specified Scheduler. - *

- * - */ -public class OperationObserveOn { - - public static OnSubscribeFunc observeOn(Observable source, Scheduler scheduler) { - return new ObserveOn(source, scheduler); - } - - private static class ObserveOn implements OnSubscribeFunc { - private final Observable source; - private final Scheduler scheduler; - - public ObserveOn(Observable source, Scheduler scheduler) { - this.source = source; - this.scheduler = scheduler; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - if (scheduler instanceof ImmediateScheduler) { - // do nothing if we request ImmediateScheduler so we don't invoke overhead - return source.subscribe(observer); - } else if (scheduler instanceof TrampolineScheduler) { - // do nothing if we request CurrentThreadScheduler so we don't invoke overhead - return source.subscribe(observer); - } else { - return new Observation(observer).init(); - } - } - - /** Observe through individual queue per observer. */ - private class Observation { - final Observer observer; - final CompositeSubscription compositeSubscription = new CompositeSubscription(); - final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); - final AtomicLong counter = new AtomicLong(0); - private volatile Scheduler.Inner recursiveScheduler; - - public Observation(Observer observer) { - this.observer = observer; - } - - public Subscription init() { - compositeSubscription.add(source.materialize().subscribe(new SourceObserver())); - return compositeSubscription; - } - - private class SourceObserver implements Action1> { - - @Override - public void call(Notification e) { - queue.offer(e); - if (counter.getAndIncrement() == 0) { - if (recursiveScheduler == null) { - // compositeSubscription for the outer scheduler, recursive for inner - compositeSubscription.add(scheduler.schedule(new Action1() { - - @Override - public void call(Inner inner) { - // record innerScheduler so 'processQueue' can use it for all subsequent executions - recursiveScheduler = inner; - // once we have the innerScheduler we can start doing real work - processQueue(); - } - - })); - } else { - processQueue(); - } - } - } - - void processQueue() { - recursiveScheduler.schedule(new Action1() { - @Override - public void call(Inner inner) { - Notification not = queue.poll(); - if (not != null) { - not.accept(observer); - } - - // decrement count and if we still have work to do - // recursively schedule ourselves to process again - if (counter.decrementAndGet() > 0) { - inner.schedule(this); - } - } - }); - } - } - } - } - -} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/operators/OperationRepeat.java b/rxjava-core/src/main/java/rx/operators/OperationRepeat.java deleted file mode 100644 index c84cc781f2..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationRepeat.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package rx.operators; - -import rx.Observable; -import rx.Observer; -import rx.Scheduler; -import rx.Scheduler.Inner; -import rx.Subscription; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.MultipleAssignmentSubscription; -import rx.util.functions.Action0; -import rx.util.functions.Action1; - -public class OperationRepeat implements Observable.OnSubscribeFunc { - - private final Observable source; - private final Scheduler scheduler; - - public static Observable.OnSubscribeFunc repeat(Observable source, Scheduler scheduler) { - return new OperationRepeat(source, scheduler); - } - - private OperationRepeat(Observable source, Scheduler scheduler) { - this.source = source; - this.scheduler = scheduler; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - final CompositeSubscription compositeSubscription = new CompositeSubscription(); - final MultipleAssignmentSubscription innerSubscription = new MultipleAssignmentSubscription(); - compositeSubscription.add(innerSubscription); - compositeSubscription.add(scheduler.schedule(new Action1() { - - @Override - public void call(Inner inner) { - inner.schedule(new Action1() { - @Override - public void call(final Inner inner) { - final Action1 _self = this; - innerSubscription.set(source.subscribe(new Observer() { - - @Override - public void onCompleted() { - inner.schedule(_self); - } - - @Override - public void onError(Throwable error) { - observer.onError(error); - } - - @Override - public void onNext(T value) { - observer.onNext(value); - } - })); - } - }); - } - - })); - return compositeSubscription; - } -} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java index 6226461876..9b39c98aa5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java @@ -49,7 +49,7 @@ public Subscriber call(final Subscriber(new CompositeSubscription()) { private final Map> groups = new HashMap>(); private final AtomicInteger completionCounter = new AtomicInteger(0); - + private boolean completeOnce; @Override public void onCompleted() { // if we receive onCompleted from our parent we onComplete children @@ -59,7 +59,10 @@ public void onCompleted() { if (completionCounter.get() == 0) { // special case if no children are running (such as an empty sequence, or just getting the groups and not subscribing) - childObserver.onCompleted(); + if (!completeOnce) { + completeOnce = true; + childObserver.onCompleted(); + } } } @@ -135,7 +138,10 @@ private void completeInner() { for (PublishSubject ps : groups.values()) { ps.onCompleted(); } - childObserver.onCompleted(); + if (!completeOnce) { + completeOnce = true; + childObserver.onCompleted(); + } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java new file mode 100644 index 0000000000..bfbf52905b --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java @@ -0,0 +1,67 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Notification; +import rx.Scheduler; +import rx.Subscriber; +import rx.subscriptions.CompositeSubscription; +import rx.util.functions.Action0; + +/** + * Move the observation of events to another thread via Scheduler. + * @param the item type + */ +public class OperatorObserveOn implements Operator { + final Scheduler scheduler; + public OperatorObserveOn(Scheduler scheduler) { + this.scheduler = scheduler; + } + + @Override + public Subscriber call(final Subscriber t1) { + final QueueDrain qd = new QueueDrain(t1); + final CompositeSubscription csub = new CompositeSubscription(); + t1.add(csub); + return new Subscriber(t1) { + /** Dispatch the notification value. */ + void run(final Notification nt) { + qd.enqueue(new Action0() { + @Override + public void call() { + nt.accept(t1); + } + }); + qd.tryDrainAsync(scheduler, csub); + } + @Override + public void onNext(final T args) { + run(Notification.createOnNext(args)); + } + + @Override + public void onError(final Throwable e) { + run(Notification.createOnError(e)); + } + + @Override + public void onCompleted() { + run(Notification.createOnCompleted()); + } + }; + } + +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/operators/OperatorRepeat.java b/rxjava-core/src/main/java/rx/operators/OperatorRepeat.java new file mode 100644 index 0000000000..095629139b --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorRepeat.java @@ -0,0 +1,103 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Observable; +import rx.Subscriber; +import rx.util.functions.Action0; + +/** + * Repeats the source observable limited or unlimited times. + * @param the element type of the repeated observable + */ +public class OperatorRepeat implements Operator> { + /** The repeat count, -1 means indefinitely. */ + final long count; + /** + * Constructor with repeat count. + * @param count the repeat count, -1 indicates indefinitely + */ + public OperatorRepeat(long count) { + this.count = count; + } + @Override + public Subscriber> call(final Subscriber t1) { + return new Subscriber>(t1) { + /** Queue to avoid reentrancy and asynchronous anomalies. */ + final QueueDrain qd = new QueueDrain(t1); + /** Repeat count. */ + long idx = count - 1; + @Override + public void onNext(final Observable t) { + if (count == 0) { + t1.onCompleted(); + return; + } + qd.enqueue(new Action0() { + + @Override + public void call() { + final Subscriber s2 = new Subscriber(t1) { + + @Override + public void onNext(T v) { + t1.onNext(v); + } + + @Override + public void onError(Throwable e) { + t1.onError(e); + } + + @Override + public void onCompleted() { + if (!isUnsubscribed()) { + if (count < 0) { + innerNext(t); + } else + if (idx > 0) { + idx--; + innerNext(t); + } else { + t1.onCompleted(); + } + } + } + + }; + t.subscribe(s2); + } + + }); + qd.tryDrain(); + } + /** Trigger resubscription from inner. */ + void innerNext(Observable iargs) { + onNext(iargs); + } + @Override + public void onError(Throwable e) { + t1.onError(e); + } + + @Override + public void onCompleted() { + // from(this) will send this onCompleted, which can be safely ignored. + } + }; + } + +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/operators/QueueDrain.java b/rxjava-core/src/main/java/rx/operators/QueueDrain.java new file mode 100644 index 0000000000..7e85c237e0 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/QueueDrain.java @@ -0,0 +1,106 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +/** + * Action queue ensuring that only a single drain caller succeeds at a time. + * This class can be used to execute work without the issues of reentrancy and + * concurrency. + */ +public final class QueueDrain implements Runnable, Action0 { + /** The number of work items. */ + private final AtomicInteger wip = new AtomicInteger(); + /** The action queue. */ + private final BlockingQueue queue = new LinkedBlockingQueue(); + /** The subscription to stop the queue processing. */ + private final Subscription k; + /** + * Constructor which takes a cancellation token. + * @param k the cancellation token (aka subscription). + */ + public QueueDrain(Subscription k) { + this.k = k; + } + /** + * Enqueue an action. + * To execute any queued action, call {@link #tryDrain()} or + * submit this instance to a {@code Scheduler.schedule()} method. + * @param action the action to enqueue, not null + */ + public void enqueue(Action0 action) { + if (!k.isUnsubscribed()) { + queue.add(action); + } + } + /** + * Try draining the queue and executing the actions in it. + */ + public void tryDrain() { + if (wip.incrementAndGet() > 1 || k.isUnsubscribed()) { + return; + } + do { + queue.poll().call(); + } while (wip.decrementAndGet() > 0 && !k.isUnsubscribed()); + } + /** + * Try draining the queue on the given scheduler. + * The method ensures that only one thread is actively draining the + * queue on the given scheduler. + * @param scheduler the scheduler where the draining should happen + * @param cs the composite subscription to track the schedule + */ + public void tryDrainAsync(Scheduler scheduler, final CompositeSubscription cs) { + if (cs.isUnsubscribed() || wip.incrementAndGet() > 1) { + return; + } + // add tracking subscription only if schedule is run to avoid overfilling cs + final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + cs.add(mas); + mas.set(scheduler.schedule(new Action1() { + @Override + public void call(Scheduler.Inner o) { + if (!cs.isUnsubscribed()) { + do { + queue.poll().call(); + } while (wip.decrementAndGet() > 0 && !cs.isUnsubscribed()); + } + cs.remove(mas); + } + })); + } + @Override + public void run() { + // to help the draining of the queue on a ThreadPool/Scheduler + tryDrain(); + } + + @Override + public void call() { + tryDrain(); + } + +} \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java similarity index 98% rename from rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java index 0e3a892599..f180658722 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java @@ -18,7 +18,6 @@ import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import static rx.operators.OperationObserveOn.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -37,7 +36,7 @@ import rx.util.functions.Action1; import rx.util.functions.Func1; -public class OperationObserveOnTest { +public class OperatorObserveOnTest { /** * This is testing a no-op path since it uses Schedulers.immediate() which will not do scheduling. @@ -46,7 +45,7 @@ public class OperationObserveOnTest { @SuppressWarnings("unchecked") public void testObserveOn() { Observer observer = mock(Observer.class); - Observable.create(observeOn(Observable.from(1, 2, 3), Schedulers.immediate())).subscribe(observer); + Observable.from(1, 2, 3).observeOn(Schedulers.immediate()).subscribe(observer); verify(observer, times(1)).onNext(1); verify(observer, times(1)).onNext(2); diff --git a/rxjava-core/src/test/java/rx/operators/OperationRepeatTest.java b/rxjava-core/src/test/java/rx/operators/OperatorRepeatTest.java similarity index 52% rename from rxjava-core/src/test/java/rx/operators/OperationRepeatTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorRepeatTest.java index 0153c24cf1..c67b372f62 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationRepeatTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorRepeatTest.java @@ -27,10 +27,12 @@ import rx.Subscription; import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; +import static org.mockito.Mockito.*; +import rx.operators.OperationReduceTest.CustomException; -public class OperationRepeatTest { +public class OperatorRepeatTest { - @Test + @Test(timeout = 2000) public void testRepetition() { int NUM = 10; final AtomicInteger count = new AtomicInteger(); @@ -47,16 +49,61 @@ public Subscription onSubscribe(Observer o) { assertEquals(NUM, value); } - @Test + @Test(timeout = 2000) public void testRepeatTake() { Observable xs = Observable.from(1, 2); Object[] ys = xs.repeat(Schedulers.newThread()).take(4).toList().toBlockingObservable().last().toArray(); assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys); } - @Test + @Test(timeout = 20000) public void testNoStackOverFlow() { Observable.from(1).repeat(Schedulers.newThread()).take(100000).toBlockingObservable().last(); } + @Test(timeout = 2000) + public void testRepeatAndTake() { + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + Observable.from(1).repeat().take(10).subscribe(o); + + verify(o, times(10)).onNext(1); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + @Test(timeout = 2000) + public void testRepeatLimited() { + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + Observable.from(1).repeat(10).subscribe(o); + + verify(o, times(10)).onNext(1); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } + @Test(timeout = 2000) + public void testRepeatError() { + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + Observable.error(new CustomException()).repeat(10).subscribe(o); + + verify(o).onError(any(CustomException.class)); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + + } + @Test(timeout = 2000) + public void testRepeatZero() { + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + Observable.from(1).repeat(0).subscribe(o); + + verify(o).onCompleted(); + verify(o, never()).onNext(any()); + verify(o, never()).onError(any(Throwable.class)); + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java b/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java index 5331bfa90a..d76201763e 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java @@ -16,7 +16,6 @@ package rx.operators; import static org.junit.Assert.*; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.util.Arrays; @@ -31,6 +30,7 @@ import rx.Observer; import rx.Subscriber; import rx.Subscription; +import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; import rx.util.functions.Action1; import rx.util.functions.Func1; @@ -260,4 +260,17 @@ public void call(Subscriber op) { } }); + + @Test(timeout = 2000) + public void testTakeObserveOn() { + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + INFINITE_OBSERVABLE.observeOn(Schedulers.newThread()).take(1).subscribe(o); + + verify(o).onNext(1L); + verify(o, never()).onNext(2L); + verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + } }