diff --git a/rxjava-core/src/main/java/rx/operators/OperatorRetry.java b/rxjava-core/src/main/java/rx/operators/OperatorRetry.java index 31057ccc49..ca5e7da892 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorRetry.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorRetry.java @@ -37,9 +37,11 @@ import rx.Observable.Operator; import rx.Scheduler.Inner; import rx.Subscriber; +import rx.functions.Action0; import rx.functions.Action1; import rx.schedulers.Schedulers; import rx.subscriptions.SerialSubscription; +import rx.subscriptions.Subscriptions; public class OperatorRetry implements Operator> { @@ -82,7 +84,9 @@ public void call(final Inner inner) { final Action1 _self = this; attempts.incrementAndGet(); - Subscriber subscriber = new Subscriber(child) { + // new subscription each time so if it unsubscribes itself it does not prevent retries + // by unsubscribing the child subscription + Subscriber subscriber = new Subscriber() { @Override public void onCompleted() { diff --git a/rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java b/rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java index 97ea46adf5..645895817b 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java @@ -15,23 +15,30 @@ */ package rx.operators; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.mockito.InOrder; import rx.Observable; -import rx.Observable.OnSubscribeFunc; +import rx.Observable.OnSubscribe; import rx.Observer; import rx.Subscriber; import rx.Subscription; +import rx.functions.Action0; import rx.functions.Action1; +import rx.observers.TestSubscriber; import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; @@ -112,7 +119,7 @@ public void testInfiniteRetry() { inOrder.verifyNoMoreInteractions(); } - public static class FuncWithErrors implements Observable.OnSubscribeFunc { + public static class FuncWithErrors implements Observable.OnSubscribe { private final int numFailures; private final AtomicInteger count = new AtomicInteger(0); @@ -122,7 +129,7 @@ public static class FuncWithErrors implements Observable.OnSubscribeFunc } @Override - public Subscription onSubscribe(Observer o) { + public void call(Subscriber o) { o.onNext("beginningEveryTime"); if (count.incrementAndGet() <= numFailures) { o.onError(new RuntimeException("forced failure: " + count.get())); @@ -130,7 +137,6 @@ public Subscription onSubscribe(Observer o) { o.onNext("onSuccessOnly"); o.onCompleted(); } - return Subscriptions.empty(); } } @@ -150,101 +156,14 @@ public void call(Integer n) { assertEquals(1, count.get()); } - public static class SlowFuncAlwaysFails implements Observable.OnSubscribe { - - final AtomicInteger nextSeq=new AtomicInteger(); - final AtomicInteger activeSubs=new AtomicInteger(); - final AtomicInteger concurrentSubs=new AtomicInteger(); - - public void call(final Subscriber s) - { - final int seq=nextSeq.incrementAndGet(); - - int cur=activeSubs.incrementAndGet(); - // Track concurrent subscriptions - concurrentSubs.set(Math.max(cur,concurrentSubs.get())); - - // Use async error - new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // ignore - } - s.onError(new RuntimeException("Subscriber #"+seq+" fails")); - } - }).start(); - - // Track unsubscribes - s.add(new Subscription() - { - private boolean active=true; - - public void unsubscribe() - { - if (active) { - activeSubs.decrementAndGet(); - active=false; - } - } - - public boolean isUnsubscribed() - { - return !active; - } - }); - } - } - - @Test - public void testUnsubscribeAfterError() { - - final CountDownLatch check=new CountDownLatch(1); - final SlowFuncAlwaysFails sf=new SlowFuncAlwaysFails(); - - Observable - .create(sf) - .retry(4) - .subscribe( - new Action1() - { - @Override - public void call(String v) - { - fail("Should never happen"); - } - }, - new Action1() - { - public void call(Throwable throwable) - { - check.countDown(); - } - } - ); - - try - { - check.await(1, TimeUnit.SECONDS); - } catch (InterruptedException e) - { - fail("interrupted"); - } - - assertEquals("5 Subscribers created", 5, sf.nextSeq.get()); - assertEquals("1 Active Subscriber", 1, sf.concurrentSubs.get()); - } - @Test - public void testRetryAllowsSubscriptionAfterAllSubscriptionsUnsubsribed() throws InterruptedException { + public void testRetryAllowsSubscriptionAfterAllSubscriptionsUnsubscribed() throws InterruptedException { final AtomicInteger subsCount = new AtomicInteger(0); - OnSubscribeFunc onSubscribe = new OnSubscribeFunc() { + OnSubscribe onSubscribe = new OnSubscribe() { @Override - public Subscription onSubscribe(Observer observer) { + public void call(Subscriber s) { subsCount.incrementAndGet(); - return new Subscription() { + s.add(new Subscription() { boolean unsubscribed = false; @Override @@ -257,7 +176,7 @@ public void unsubscribe() { public boolean isUnsubscribed() { return unsubscribed; } - }; + }); } }; Observable stream = Observable.create(onSubscribe); @@ -269,4 +188,172 @@ public boolean isUnsubscribed() { streamWithRetry.subscribe(); assertEquals(1, subsCount.get()); } + + @Test + public void testSourceObservableCallsUnsubscribe() throws InterruptedException { + final AtomicInteger subsCount = new AtomicInteger(0); + + final TestSubscriber ts = new TestSubscriber(); + + OnSubscribe onSubscribe = new OnSubscribe() { + @Override + public void call(Subscriber s) { + // if isUnsubscribed is true that means we have a bug such as https://github.com/Netflix/RxJava/issues/1024 + if (!s.isUnsubscribed()) { + subsCount.incrementAndGet(); + s.onError(new RuntimeException("failed")); + // it unsubscribes the child directly + // this simulates various error/completion scenarios that could occur + // or just a source that proactively triggers cleanup + s.unsubscribe(); + } + } + }; + + Observable.create(onSubscribe).retry(3).subscribe(ts); + assertEquals(4, subsCount.get()); // 1 + 3 retries + } + + class SlowObservable implements Observable.OnSubscribe { + + private AtomicInteger efforts = new AtomicInteger(0); + private AtomicInteger active = new AtomicInteger(0), maxActive = new AtomicInteger(0); + private AtomicInteger nextBeforeFailure; + + private final int emitDelay; + + public SlowObservable(int emitDelay, int countNext) { + this.emitDelay = emitDelay; + this.nextBeforeFailure = new AtomicInteger(countNext); + } + + public void call(final Subscriber subscriber) { + final AtomicBoolean terminate = new AtomicBoolean(false); + efforts.getAndIncrement(); + active.getAndIncrement(); + maxActive.set(Math.max(active.get(), maxActive.get())); + final Thread thread = new Thread() { + @Override + public void run() { + long nr = 0; + try { + while (!terminate.get()) { + Thread.sleep(emitDelay); + if (nextBeforeFailure.getAndDecrement() > 0) { + subscriber.onNext(nr++); + } + else { + subscriber.onError(new RuntimeException("expected-failed")); + } + } + } + catch (InterruptedException t) { + } + } + }; + thread.start(); + subscriber.add(Subscriptions.create(new Action0() { + @Override + public void call() { + terminate.set(true); + active.decrementAndGet(); + } + })); + } + } + + /** Observer for listener on seperate thread */ + class AsyncObserver implements Observer { + + protected CountDownLatch latch = new CountDownLatch(1); + + protected Observer target; + + /** Wrap existing Observer */ + public AsyncObserver(Observer target) { + this.target = target; + } + + /** Wait */ + public void await() { + try { + latch.await(); + } catch (InterruptedException e) { + fail("Test interrupted"); + } + } + + // Observer implementation + + @Override + public void onCompleted() { + target.onCompleted(); + latch.countDown(); + } + + @Override + public void onError(Throwable t) { + target.onError(t); + latch.countDown(); + } + + @Override + public void onNext(T v) { + target.onNext(v); + } + } + + @Test(timeout = 1000) + public void testUnsubscribeAfterError() { + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + + // Observable that always fails after 100ms + SlowObservable so = new SlowObservable(100, 0); + Observable o = Observable + .create(so) + .retry(5); + + AsyncObserver async = new AsyncObserver(observer); + + o.subscribe(async); + + async.await(); + + InOrder inOrder = inOrder(observer); + // Should fail once + inOrder.verify(observer, times(1)).onError(any(Throwable.class)); + inOrder.verify(observer, never()).onCompleted(); + + assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get()); + assertEquals("Only 1 active subscription", 1, so.maxActive.get()); + } + + @Test(timeout = 1000) + public void testTimeoutWithRetry() { + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + + // Observable that sends every 100ms (timeout fails instead) + SlowObservable so = new SlowObservable(100, 10); + Observable o = Observable + .create(so) + .timeout(80, TimeUnit.MILLISECONDS) + .retry(5); + + AsyncObserver async = new AsyncObserver(observer); + + o.subscribe(async); + + async.await(); + + InOrder inOrder = inOrder(observer); + // Should fail once + inOrder.verify(observer, times(1)).onError(any(Throwable.class)); + inOrder.verify(observer, never()).onCompleted(); + + assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get()); + } }