From 70eea84c646644ef374784e0284c6cd113fa28c1 Mon Sep 17 00:00:00 2001 From: Aaron Tull Date: Wed, 26 Aug 2015 17:48:11 -0700 Subject: [PATCH] Implemented the AsyncOnSubscribe --- .../java/rx/observables/AsyncOnSubscribe.java | 510 ++++++++++++++++++ .../rx/observables/AsyncOnSubscribeTest.java | 408 ++++++++++++++ 2 files changed, 918 insertions(+) create mode 100644 src/main/java/rx/observables/AsyncOnSubscribe.java create mode 100644 src/test/java/rx/observables/AsyncOnSubscribeTest.java diff --git a/src/main/java/rx/observables/AsyncOnSubscribe.java b/src/main/java/rx/observables/AsyncOnSubscribe.java new file mode 100644 index 0000000000..d4a12b0245 --- /dev/null +++ b/src/main/java/rx/observables/AsyncOnSubscribe.java @@ -0,0 +1,510 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.observables; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Observer; +import rx.Producer; +import rx.Subscriber; +import rx.Subscription; +import rx.annotations.Experimental; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Action2; +import rx.functions.Action3; +import rx.functions.Func0; +import rx.functions.Func3; +import rx.internal.operators.BufferUntilSubscriber; +import rx.observers.SerializedObserver; +import rx.observers.Subscribers; +import rx.plugins.RxJavaPlugins; +import rx.subscriptions.BooleanSubscription; +; +/** + * A utility class to create {@code OnSubscribe} functions that respond correctly to back + * pressure requests from subscribers. This is an improvement over + * {@link rx.Observable#create(OnSubscribe) Observable.create(OnSubscribe)} which does not provide + * any means of managing back pressure requests out-of-the-box. This variant of an OnSubscribe + * function allows for the asynchronous processing of requests. + * + * @param + * the type of the user-define state used in {@link #generateState() generateState(S)} , + * {@link #next(Object, Long, Subscriber) next(S, Long, Subscriber)}, and + * {@link #onUnsubscribe(Object) onUnsubscribe(S)}. + * @param + * the type of {@code Subscribers} that will be compatible with {@code this}. + */ +@Experimental +public abstract class AsyncOnSubscribe implements OnSubscribe { + + /** + * Executed once when subscribed to by a subscriber (via {@link OnSubscribe#call(Subscriber)}) + * to produce a state value. This value is passed into {@link #next(Object, long, Observer) + * next(S state, Observer observer)} on the first iteration. Subsequent iterations of + * {@code next} will receive the state returned by the previous invocation of {@code next}. + * + * @return the initial state value + */ + protected abstract S generateState(); + + /** + * Called to produce data to the downstream subscribers. To emit data to a downstream subscriber + * call {@code observer.onNext(t)}. To signal an error condition call + * {@code observer.onError(throwable)} or throw an Exception. To signal the end of a data stream + * call {@code observer.onCompleted()}. Implementations of this method must follow the following + * rules. + * + *
    + *
  • Must not call {@code observer.onNext(t)} more than 1 time per invocation.
  • + *
  • Must not call {@code observer.onNext(t)} concurrently.
  • + *
+ * + * The value returned from an invocation of this method will be passed in as the {@code state} + * argument of the next invocation of this method. + * + * @param state + * the state value (from {@link #generateState()} on the first invocation or the + * previous invocation of this method. + * @param requested + * the amount of data requested. An observable emitted to the observer should not + * exceed this amount. + * @param observer + * the observer of data emitted by + * @return the next iteration's state value + */ + protected abstract S next(S state, long requested, Observer> observer); + + /** + * Clean up behavior that is executed after the downstream subscriber's subscription is + * unsubscribed. This method will be invoked exactly once. + * + * @param state + * the last state value returned from {@code next(S, Long, Observer)} or + * {@code generateState()} at the time when a terminal event is emitted from + * {@link #next(Object, long, Observer)} or unsubscribing. + */ + protected void onUnsubscribe(S state) { + + } + + /** + * Generates a synchronous {@link AsyncOnSubscribe} that calls the provided {@code next} + * function to generate data to downstream subscribers. + * + * @param generator + * generates the initial state value (see {@link #generateState()}) + * @param next + * produces data to the downstream subscriber (see + * {@link #next(Object, long, Observer) next(S, long, Observer)}) + * @return an OnSubscribe that emits data in a protocol compatible with back-pressure. + */ + @Experimental + public static OnSubscribe createSingleState(Func0 generator, + final Action3>> next) { + Func3>, S> nextFunc = + new Func3>, S>() { + @Override + public S call(S state, Long requested, Observer> subscriber) { + next.call(state, requested, subscriber); + return state; + }}; + return new AsyncOnSubscribeImpl(generator, nextFunc); + } + + /** + * Generates a synchronous {@link AsyncOnSubscribe} that calls the provided {@code next} + * function to generate data to downstream subscribers. + * + * This overload creates a AsyncOnSubscribe without an explicit clean up step. + * + * @param generator + * generates the initial state value (see {@link #generateState()}) + * @param next + * produces data to the downstream subscriber (see + * {@link #next(Object, long, Observer) next(S, long, Observer)}) + * @param onUnsubscribe + * clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)}) + * @return an OnSubscribe that emits data downstream in a protocol compatible with + * back-pressure. + */ + @Experimental + public static OnSubscribe createSingleState(Func0 generator, + final Action3>> next, + final Action1 onUnsubscribe) { + Func3>, S> nextFunc = + new Func3>, S>() { + @Override + public S call(S state, Long requested, Observer> subscriber) { + next.call(state, requested, subscriber); + return state; + }}; + return new AsyncOnSubscribeImpl(generator, nextFunc, onUnsubscribe); + } + + /** + * Generates a synchronous {@link AsyncOnSubscribe} that calls the provided {@code next} + * function to generate data to downstream subscribers. + * + * @param generator + * generates the initial state value (see {@link #generateState()}) + * @param next + * produces data to the downstream subscriber (see + * {@link #next(Object, long, Observer) next(S, long, Observer)}) + * @param onUnsubscribe + * clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)}) + * @return an OnSubscribe that emits data downstream in a protocol compatible with + * back-pressure. + */ + @Experimental + public static OnSubscribe createStateful(Func0 generator, + Func3>, ? extends S> next, + Action1 onUnsubscribe) { + return new AsyncOnSubscribeImpl(generator, next, onUnsubscribe); + } + + /** + * Generates a synchronous {@link AsyncOnSubscribe} that calls the provided {@code next} + * function to generate data to downstream subscribers. + * + * @param generator + * generates the initial state value (see {@link #generateState()}) + * @param next + * produces data to the downstream subscriber (see + * {@link #next(Object, long, Observer) next(S, long, Observer)}) + * @return an OnSubscribe that emits data downstream in a protocol compatible with + * back-pressure. + */ + @Experimental + public static OnSubscribe createStateful(Func0 generator, + Func3>, ? extends S> next) { + return new AsyncOnSubscribeImpl(generator, next); + } + + /** + * Generates a synchronous {@link AsyncOnSubscribe} that calls the provided {@code next} + * function to generate data to downstream subscribers. + * + * This overload creates a "state-less" AsyncOnSubscribe which does not have an explicit state + * value. This should be used when the {@code next} function closes over it's state. + * + * @param next + * produces data to the downstream subscriber (see + * {@link #next(Object, long, Observer) next(S, long, Observer)}) + * @return an OnSubscribe that emits data downstream in a protocol compatible with + * back-pressure. + */ + @Experimental + public static OnSubscribe createStateless(final Action2>> next) { + Func3>, Void> nextFunc = + new Func3>, Void>() { + @Override + public Void call(Void state, Long requested, Observer> subscriber) { + next.call(requested, subscriber); + return state; + }}; + return new AsyncOnSubscribeImpl(nextFunc); + } + + /** + * Generates a synchronous {@link AsyncOnSubscribe} that calls the provided {@code next} + * function to generate data to downstream subscribers. + * + * This overload creates a "state-less" AsyncOnSubscribe which does not have an explicit state + * value. This should be used when the {@code next} function closes over it's state. + * + * @param next + * produces data to the downstream subscriber (see + * {@link #next(Object, long, Observer) next(S, long, Observer)}) + * @param onUnsubscribe + * clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)}) + * @return an OnSubscribe that emits data downstream in a protocol compatible with + * back-pressure. + */ + @Experimental + public static OnSubscribe createStateless(final Action2>> next, + final Action0 onUnsubscribe) { + Func3>, Void> nextFunc = + new Func3>, Void>() { + @Override + public Void call(Void state, Long requested, Observer> subscriber) { + next.call(requested, subscriber); + return null; + }}; + Action1 wrappedOnUnsubscribe = new Action1() { + @Override + public void call(Void t) { + onUnsubscribe.call(); + }}; + return new AsyncOnSubscribeImpl(nextFunc, wrappedOnUnsubscribe); + } + + /** + * An implementation of AsyncOnSubscribe that delegates + * {@link AsyncOnSubscribe#next(Object, long, Observer)}, + * {@link AsyncOnSubscribe#generateState()}, and {@link AsyncOnSubscribe#onUnsubscribe(Object)} + * to provided functions/closures. + * + * @param + * the type of the user-defined state + * @param + * the type of compatible Subscribers + */ + private static final class AsyncOnSubscribeImpl extends AsyncOnSubscribe { + private final Func0 generator; + private final Func3>, ? extends S> next; + private final Action1 onUnsubscribe; + + private AsyncOnSubscribeImpl(Func0 generator, Func3>, ? extends S> next, Action1 onUnsubscribe) { + this.generator = generator; + this.next = next; + this.onUnsubscribe = onUnsubscribe; + } + + public AsyncOnSubscribeImpl(Func0 generator, Func3>, ? extends S> next) { + this(generator, next, null); + } + + public AsyncOnSubscribeImpl(Func3>, S> next, Action1 onUnsubscribe) { + this(null, next, onUnsubscribe); + } + + public AsyncOnSubscribeImpl(Func3>, S> nextFunc) { + this(null, nextFunc, null); + } + + @Override + protected S generateState() { + return generator == null ? null : generator.call(); + } + + @Override + protected S next(S state, long requested, Observer> observer) { + return next.call(state, requested, observer); + } + + @Override + protected void onUnsubscribe(S state) { + if (onUnsubscribe != null) + onUnsubscribe.call(state); + } + } + + @Override + public final void call(Subscriber actualSubscriber) { + S state = generateState(); + UnicastSubject> subject = UnicastSubject.> create(); + AsyncOuterSubscriber outerSubscriberProducer = new AsyncOuterSubscriber(this, state, subject); + actualSubscriber.add(outerSubscriberProducer); + Observable.concat(subject).unsafeSubscribe(Subscribers.wrap(actualSubscriber)); + actualSubscriber.setProducer(outerSubscriberProducer); + } + + private static class AsyncOuterSubscriber extends ConcurrentLinkedQueueimplements Producer, Subscription, Observer> { + /** */ + private static final long serialVersionUID = -7884904861928856832L; + + private volatile int isUnsubscribed; + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater IS_UNSUBSCRIBED = AtomicIntegerFieldUpdater.newUpdater(AsyncOuterSubscriber.class, "isUnsubscribed"); + + private final AsyncOnSubscribe parent; + private final SerializedObserver> serializedSubscriber; + private final Set subscriptions = new HashSet(); + + private boolean hasTerminated = false; + private boolean onNextCalled = false; + + private S state; + + private final UnicastSubject> merger; + + public AsyncOuterSubscriber(AsyncOnSubscribe parent, S initialState, UnicastSubject> merger) { + this.parent = parent; + this.serializedSubscriber = new SerializedObserver>(this); + this.state = initialState; + this.merger = merger; + } + + @Override + public void unsubscribe() { + if (IS_UNSUBSCRIBED.compareAndSet(this, 0, 1)) { + // it's safe to process terminal behavior + if (isEmpty()) { + parent.onUnsubscribe(state); + } + for (Subscription s : subscriptions) { + if (!s.isUnsubscribed()) { + s.unsubscribe(); + } + } + } + } + + @Override + public boolean isUnsubscribed() { + return isUnsubscribed != 0; + } + + public void nextIteration(long requestCount) { + state = parent.next(state, requestCount, serializedSubscriber); + } + + @Override + public void request(long n) { + int size = 0; + Long r; + synchronized (this) { + size = size(); + add(n); + r = n; + } + if (size == 0) { + do { + // check if unsubscribed before doing any work + if (isUnsubscribed()) { + unsubscribe(); + return; + } + // otherwise try one iteration for a request of `numRequested` elements + try { + onNextCalled = false; + nextIteration(r); + if (onNextCalled) + r = poll(); + if (hasTerminated || isUnsubscribed()) { + parent.onUnsubscribe(state); + } + } catch (Throwable ex) { + handleThrownError(parent, state, ex); + return; + } + } while (r != null && !hasTerminated); + } + } + + private void handleThrownError(final AsyncOnSubscribe p, S st, Throwable ex) { + if (hasTerminated) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(ex); + } else { + hasTerminated = true; + merger.onError(ex); + unsubscribe(); + } + } + + @Override + public void onCompleted() { + if (hasTerminated) { + throw new IllegalStateException("Terminal event already emitted."); + } + hasTerminated = true; + merger.onCompleted(); + } + + @Override + public void onError(Throwable e) { + if (hasTerminated) { + throw new IllegalStateException("Terminal event already emitted."); + } + hasTerminated = true; + merger.onError(e); + } + + // This exists simply to check if the subscription has already been + // terminated before getting access to the subscription + private static Subscription SUBSCRIPTION_SENTINEL = new BooleanSubscription(); + + @Override + public void onNext(final Observable t) { + if (onNextCalled) { + throw new IllegalStateException("onNext called multiple times!"); + } + onNextCalled = true; + if (hasTerminated) + return; + subscribeBufferToObservable(t); + } + + private void subscribeBufferToObservable(final Observable t) { + BufferUntilSubscriber buffer = BufferUntilSubscriber. create(); + final AtomicReference holder = new AtomicReference(null); + Subscription innerSubscription = t + .doOnTerminate(new Action0() { + @Override + public void call() { + if (!holder.compareAndSet(null, SUBSCRIPTION_SENTINEL)) { + Subscription h = holder.get(); + subscriptions.remove(h); + } + }}) + .subscribe(buffer); + + if (holder.compareAndSet(null, innerSubscription)) { + subscriptions.add(innerSubscription); + } + merger.onNext(buffer); + } + } + + private static final class UnicastSubject extends Observableimplements Observer { + public static UnicastSubject create() { + return new UnicastSubject(new State()); + } + + private State state; + + protected UnicastSubject(final State state) { + super(new OnSubscribe() { + @Override + public void call(Subscriber s) { + if (state.subscriber != null) { + s.onError(new IllegalStateException("There can be only one subscriber")); + } else { + state.subscriber = s; + } + } + }); + this.state = state; + } + + @Override + public void onCompleted() { + state.subscriber.onCompleted(); + } + + @Override + public void onError(Throwable e) { + state.subscriber.onError(e); + } + + @Override + public void onNext(T t) { + state.subscriber.onNext(t); + } + + private static class State { + private Subscriber subscriber; + } + } +} diff --git a/src/test/java/rx/observables/AsyncOnSubscribeTest.java b/src/test/java/rx/observables/AsyncOnSubscribeTest.java new file mode 100644 index 0000000000..92537dc455 --- /dev/null +++ b/src/test/java/rx/observables/AsyncOnSubscribeTest.java @@ -0,0 +1,408 @@ +package rx.observables; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Observer; +import rx.Subscription; +import rx.exceptions.TestException; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Action2; +import rx.functions.Func0; +import rx.functions.Func3; +import rx.internal.util.RxRingBuffer; +import rx.observers.TestSubscriber; +import rx.schedulers.TestScheduler; + +@RunWith(MockitoJUnitRunner.class) +public class AsyncOnSubscribeTest { + + @Mock + public Observer o; + + TestSubscriber subscriber; + + @Before + public void setup() { + subscriber = new TestSubscriber(o); + } + + @Test + public void testSerializesConcurrentObservables() throws InterruptedException { + final TestScheduler scheduler = new TestScheduler(); + OnSubscribe os = AsyncOnSubscribe.createStateful(new Func0(){ + @Override + public Integer call() { + return 1; + }}, + new Func3>, Integer>(){ + @Override + public Integer call(Integer state, Long requested, Observer> observer) { + if (state == 1) { + Observable o1 = Observable.just(1, 2, 3, 4) + .delay(100, TimeUnit.MILLISECONDS, scheduler); + observer.onNext(o1); + } + else if (state == 2) { + Observable o = Observable.just(5, 6, 7, 8); + observer.onNext(o); + } + else + observer.onCompleted(); + return state + 1; + }}); + // initial request emits [[1, 2, 3, 4]] on delay + Observable.create(os).subscribe(subscriber); + // next request emits [[5, 6, 7, 8]] firing immediately + subscriber.requestMore(2); + // triggers delayed observable + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + // final request completes + subscriber.requestMore(3); + subscriber.awaitTerminalEventAndUnsubscribeOnTimeout(100, TimeUnit.MILLISECONDS); + subscriber.assertNoErrors(); + subscriber.assertReceivedOnNext(Arrays.asList(new Integer[] {1, 2, 3, 4, 5, 6, 7, 8})); + subscriber.assertCompleted(); + } + + @Test + public void testSubscribedByBufferingOperator() { + final TestScheduler scheduler = new TestScheduler(); + OnSubscribe os = AsyncOnSubscribe.createStateless( + new Action2>>(){ + @Override + public void call(Long requested, Observer> observer) { + observer.onNext(Observable.range(1, requested.intValue())); + }}); + Observable.create(os).observeOn(scheduler).subscribe(subscriber); + scheduler.advanceTimeBy(10, TimeUnit.DAYS); + subscriber.assertNoErrors(); + subscriber.assertValueCount(RxRingBuffer.SIZE); + subscriber.assertNotCompleted(); + } + + @Test + public void testOnUnsubscribeHasCorrectState() throws InterruptedException { + final AtomicInteger lastState = new AtomicInteger(-1); + OnSubscribe os = AsyncOnSubscribe.createStateful(new Func0(){ + @Override + public Integer call() { + return 1; + }}, + new Func3>, Integer>(){ + @Override + public Integer call(Integer state, Long requested, Observer> observer) { + if (state < 3) { + observer.onNext(Observable.just(state)); + } + else + observer.onCompleted(); + return state + 1; + }}, + new Action1() { + @Override + public void call(Integer t) { + lastState.set(t); + }}); + Observable.create(os).subscribe(subscriber); // [[1]], state = 1 + subscriber.requestMore(2); // [[1]], state = 2 + subscriber.requestMore(3); // onComplete, state = 3 + subscriber.assertNoErrors(); + subscriber.assertReceivedOnNext(Arrays.asList(new Integer[] {1, 2})); + subscriber.assertCompleted(); + assertEquals("Final state when unsubscribing is not correct", 4, lastState.get()); + } + + @Test + public void testOnCompleteOuter() throws InterruptedException { + OnSubscribe os = AsyncOnSubscribe.createStateless(new Action2>>(){ + @Override + public void call(Long requested, Observer> observer) { + observer.onCompleted(); + }}); + Observable.create(os).subscribe(subscriber); + subscriber.assertNoErrors(); + subscriber.assertCompleted(); + subscriber.assertNoValues(); + } + + @Test + public void testTryOnNextTwice() throws InterruptedException { + OnSubscribe os = AsyncOnSubscribe.createStateless(new Action2>>(){ + @Override + public void call(Long requested, Observer> observer) { + observer.onNext(Observable.just(1)); + observer.onNext(Observable.just(2)); + } + }); + Observable.create(os).subscribe(subscriber); + subscriber.assertError(IllegalStateException.class); + subscriber.assertNotCompleted(); + subscriber.assertReceivedOnNext(Arrays.asList(new Integer[] {1})); + } + + @Test + public void testThrowException() throws InterruptedException { + OnSubscribe os = AsyncOnSubscribe.createStateless( + new Action2>>(){ + @Override + public void call(Long requested, Observer> observer) { + throw new TestException(); + }}); + Observable.create(os).subscribe(subscriber); + subscriber.assertError(TestException.class); + subscriber.assertNotCompleted(); + subscriber.assertNoValues(); + } + + @Test + public void testThrowExceptionAfterTerminal() throws InterruptedException { + OnSubscribe os = AsyncOnSubscribe.createStateful(new Func0(){ + @Override + public Integer call() { + return 1; + }}, + new Func3>, Integer>(){ + @Override + public Integer call(Integer state, Long requested, Observer> observer) { + observer.onCompleted(); + throw new TestException(); + }}); + Observable.create(os).subscribe(subscriber); + subscriber.assertNoErrors(); + subscriber.assertCompleted(); + subscriber.assertNoValues(); + } + + @Test + public void testOnNextAfterCompleted() throws InterruptedException { + OnSubscribe os = AsyncOnSubscribe.createStateful(new Func0(){ + @Override + public Integer call() { + return 1; + }}, + new Func3>, Integer>(){ + @Override + public Integer call(Integer state, Long requested, Observer> observer) { + observer.onCompleted(); + observer.onNext(Observable.just(1)); + return 1; + }}); + Observable.create(os).subscribe(subscriber); + subscriber.assertNoErrors(); + subscriber.assertCompleted(); + subscriber.assertNoValues(); + } + + @Test + public void testOnNextAfterError() throws InterruptedException { + OnSubscribe os = AsyncOnSubscribe.createStateful(new Func0(){ + @Override + public Integer call() { + return 1; + }}, + new Func3>, Integer>(){ + @Override + public Integer call(Integer state, Long requested, Observer> observer) { + observer.onError(new TestException()); + observer.onNext(Observable.just(1)); + return 1; + }}); + Observable.create(os).subscribe(subscriber); + subscriber.assertError(TestException.class); + subscriber.assertNotCompleted(); + subscriber.assertNoValues(); + } + + @Test + public void testEmittingEmptyObservable() throws InterruptedException { + OnSubscribe os = AsyncOnSubscribe.createStateful(new Func0(){ + @Override + public Integer call() { + return 1; + }}, + new Func3>, Integer>(){ + @Override + public Integer call(Integer state, Long requested, Observer> observer) { + observer.onNext(Observable.empty()); + observer.onCompleted(); + return state; + }}); + Observable.create(os).subscribe(subscriber); + subscriber.assertNoErrors(); + subscriber.assertCompleted(); + subscriber.assertNoValues(); + } + + @Test + public void testOnErrorOuter() throws InterruptedException { + OnSubscribe os = AsyncOnSubscribe.createStateful(new Func0(){ + @Override + public Integer call() { + return 1; + }}, + new Func3>, Integer>(){ + @Override + public Integer call(Integer state, Long requested, Observer> observer) { + observer.onError(new TestException()); + return state; + } + }); + Observable.create(os).subscribe(subscriber); + subscriber.assertError(TestException.class); + subscriber.assertNotCompleted(); + subscriber.assertNoValues(); + } + + @Test + public void testOnCompleteFollowedByOnErrorOuter() throws InterruptedException { + OnSubscribe os = AsyncOnSubscribe.createStateful(new Func0(){ + @Override + public Integer call() { + return 1; + }}, + new Func3>, Integer>(){ + @Override + public Integer call(Integer state, Long requested, Observer> observer) { + observer.onCompleted(); + observer.onError(new TestException()); + return state; + } + }); + Observable.create(os).subscribe(subscriber); + subscriber.assertCompleted(); + subscriber.assertNoErrors(); + subscriber.assertNoValues(); + } + + @Test + public void testUnsubscribesFromAllSelfTerminatedObservables() throws InterruptedException { + final AtomicInteger l1 = new AtomicInteger(); + final AtomicInteger l2 = new AtomicInteger(); + OnSubscribe os = AsyncOnSubscribe.createStateful(new Func0(){ + @Override + public Integer call() { + return 1; + }}, + new Func3>, Integer>(){ + @Override + public Integer call(Integer state, Long requested, Observer> observer) { + Observable o1; + switch (state) { + case 1: + o1 = Observable.just(1) + .doOnUnsubscribe(new Action0(){ + @Override + public void call() { + l1.incrementAndGet(); + }}); + break; + case 2: + o1 = Observable.just(2) + .doOnUnsubscribe(new Action0(){ + @Override + public void call() { + l2.incrementAndGet(); + }}); + break; + default: + observer.onCompleted(); + return null; + } + observer.onNext(o1); + return state + 1; + }}); + Observable.create(os).subscribe(subscriber); // [[1]] + subscriber.requestMore(2); // [[2]] + subscriber.requestMore(2); // onCompleted + subscriber.awaitTerminalEventAndUnsubscribeOnTimeout(100, TimeUnit.MILLISECONDS); + assertEquals("did not unsub from first observable after terminal", 1, l1.get()); + assertEquals("did not unsub from second observable after terminal", 1, l2.get()); + List onNextEvents = subscriber.getOnNextEvents(); + assertEquals(2, onNextEvents.size()); + assertEquals(new Integer(1), onNextEvents.get(0)); + assertEquals(new Integer(2), onNextEvents.get(1)); + subscriber.assertNoErrors(); + subscriber.assertCompleted(); + } + + @Test + public void testUnsubscribesFromAllNonTerminatedObservables() throws InterruptedException { + final AtomicInteger l1 = new AtomicInteger(); + final AtomicInteger l2 = new AtomicInteger(); + final TestScheduler scheduler = new TestScheduler(); + final AtomicReference sub = new AtomicReference(); + OnSubscribe os = AsyncOnSubscribe.createStateful(new Func0(){ + @Override + public Integer call() { + return 1; + }}, + new Func3>, Integer>(){ + @Override + public Integer call(Integer state, Long requested, Observer> observer) { + switch (state) { + case 1: + observer.onNext(Observable.just(1) + .subscribeOn(scheduler) + .doOnUnsubscribe(new Action0(){ + @Override + public void call() { + l1.incrementAndGet(); + }})); + break; + case 2: + observer.onNext(Observable.never() + .subscribeOn(scheduler) + .doOnUnsubscribe(new Action0(){ + @Override + public void call() { + l2.incrementAndGet(); + }})); + break; + case 3: + sub.get().unsubscribe(); + } + return state + 1; + }}); + Subscription subscription = Observable.create(os) + .observeOn(scheduler) + .subscribe(subscriber); + sub.set(subscription); + subscriber.assertNoValues(); + scheduler.triggerActions(); + subscriber.assertValue(1); + subscriber.assertNotCompleted(); + subscriber.assertNoErrors(); + assertEquals("did not unsub from 1st observable after terminal", 1, l1.get()); + assertEquals("did not unsub from Observable.never() inner obs", 1, l2.get()); + } + + private static class Foo {} + private static class Bar extends Foo {} + + @Test + public void testGenerics() { + AsyncOnSubscribe.createStateless(new Action2>>(){ + @Override + public void call(Long state, Observer> observer) { + if (state == null) + observer.onNext(Observable.just(new Foo())); + else + observer.onNext(Observable.just(new Bar())); + }}); + } +}