From d1282eec387fb140b3695ac016f828a7e181a95c Mon Sep 17 00:00:00 2001 From: George Campbell Date: Fri, 3 Oct 2014 14:48:04 -0700 Subject: [PATCH 1/2] Add a shim to make it so the public api for repeatWhen and retryWhen don't expose Notification --- src/main/java/rx/Observable.java | 60 +++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 8 deletions(-) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 88335ef03e..20ef13bdbd 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -5801,8 +5801,19 @@ public final Observable repeat(final long count, Scheduler scheduler) { * @see RxJava Wiki: repeatWhen() * @see MSDN: Observable.Repeat */ - public final Observable repeatWhen(Func1>, ? extends Observable> notificationHandler, Scheduler scheduler) { - return OnSubscribeRedo.repeat(this, notificationHandler, scheduler); + public final Observable repeatWhen(final Func1, ? extends Observable> notificationHandler, Scheduler scheduler) { + Func1>, ? extends Observable> dematerializedNotificationHandler = new Func1>, Observable>() { + @Override + public Observable call(Observable> notifications) { + return notificationHandler.call(notifications.map(new Func1, Void>() { + @Override + public Void call(Notification notification) { + return null; + } + })); + } + }; + return OnSubscribeRedo.repeat(this, dematerializedNotificationHandler, scheduler); } /** @@ -5825,8 +5836,19 @@ public final Observable repeatWhen(Func1RxJava Wiki: repeatWhen() * @see MSDN: Observable.Repeat */ - public final Observable repeatWhen(Func1>, ? extends Observable> notificationHandler) { - return OnSubscribeRedo.repeat(this, notificationHandler); + public final Observable repeatWhen(final Func1, ? extends Observable> notificationHandler) { + Func1>, ? extends Observable> dematerializedNotificationHandler = new Func1>, Observable>() { + @Override + public Observable call(Observable> notifications) { + return notificationHandler.call(notifications.map(new Func1, Void>() { + @Override + public Void call(Notification notification) { + return null; + } + })); + } + }; + return OnSubscribeRedo.repeat(this, dematerializedNotificationHandler); } /** @@ -6541,8 +6563,19 @@ public final Observable retry(Func2 predicate) { * @return the source Observable modified with retry logic * @see RxJava Wiki: retryWhen() */ - public final Observable retryWhen(Func1>, ? extends Observable> notificationHandler) { - return OnSubscribeRedo. retry(this, notificationHandler); + public final Observable retryWhen(final Func1, ? extends Observable> notificationHandler) { + Func1>, ? extends Observable> dematerializedNotificationHandler = new Func1>, Observable>() { + @Override + public Observable call(Observable> notifications) { + return notificationHandler.call(notifications.map(new Func1, Throwable>() { + @Override + public Throwable call(Notification notification) { + return notification.getThrowable(); + } + })); + } + }; + return OnSubscribeRedo. retry(this, dematerializedNotificationHandler); } /** @@ -6566,8 +6599,19 @@ public final Observable retryWhen(Func1RxJava Wiki: retryWhen() */ - public final Observable retryWhen(Func1>, ? extends Observable> notificationHandler, Scheduler scheduler) { - return OnSubscribeRedo. retry(this, notificationHandler, scheduler); + public final Observable retryWhen(final Func1, ? extends Observable> notificationHandler, Scheduler scheduler) { + Func1>, ? extends Observable> dematerializedNotificationHandler = new Func1>, Observable>() { + @Override + public Observable call(Observable> notifications) { + return notificationHandler.call(notifications.map(new Func1, Throwable>() { + @Override + public Throwable call(Notification notification) { + return notification.getThrowable(); + } + })); + } + }; + return OnSubscribeRedo. retry(this, dematerializedNotificationHandler, scheduler); } /** From bb8be53f1058db3d94b3dcad6fcc75047e347e00 Mon Sep 17 00:00:00 2001 From: George Campbell Date: Fri, 3 Oct 2014 15:01:45 -0700 Subject: [PATCH 2/2] forgot to commit the changes to the tests. --- .../internal/operators/OperatorRetryTest.java | 57 +++++++++---------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/src/test/java/rx/internal/operators/OperatorRetryTest.java b/src/test/java/rx/internal/operators/OperatorRetryTest.java index dbfa504d45..716a0bfd8a 100644 --- a/src/test/java/rx/internal/operators/OperatorRetryTest.java +++ b/src/test/java/rx/internal/operators/OperatorRetryTest.java @@ -24,18 +24,12 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; import rx.Observable; import rx.Observable.OnSubscribe; -import rx.Notification; import rx.Observer; import rx.Subscriber; import rx.Subscription; @@ -49,6 +43,11 @@ import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + public class OperatorRetryTest { @Test @@ -73,15 +72,15 @@ public void call(Subscriber t1) { }); TestSubscriber ts = new TestSubscriber(consumer); - producer.retryWhen(new Func1>, Observable>() { + producer.retryWhen(new Func1, Observable>() { @Override - public Observable call(Observable> attempts) { + public Observable call(Observable attempts) { // Worker w = Schedulers.computation().createWorker(); return attempts - .map(new Func1, Tuple>() { + .map(new Func1() { @Override - public Tuple call(Notification n) { + public Tuple call(Throwable n) { return new Tuple(new Long(1), n); }}) .scan(new Func2(){ @@ -94,7 +93,7 @@ public Tuple call(Tuple t, Tuple n) { public Observable call(Tuple t) { System.out.println("Retry # "+t.count); return t.count > 20 ? - Observable.error(t.n.getThrowable()) : + Observable.error(t.n) : Observable.timer(t.count *1L, TimeUnit.MILLISECONDS); }}); } @@ -112,9 +111,9 @@ public Observable call(Tuple t) { public static class Tuple { Long count; - Notification n; + Throwable n; - Tuple(Long c, Notification n) { + Tuple(Long c, Throwable n) { count = c; this.n = n; } @@ -147,15 +146,15 @@ public void testSchedulingNotificationHandler() { int NUM_RETRIES = 2; Observable origin = Observable.create(new FuncWithErrors(NUM_RETRIES)); TestSubscriber subscriber = new TestSubscriber(observer); - origin.retryWhen(new Func1>, Observable>>() { + origin.retryWhen(new Func1, Observable>() { @Override - public Observable> call(Observable> t1) { - return t1.observeOn(Schedulers.computation()).map(new Func1, Notification>() { + public Observable call(Observable t1) { + return t1.observeOn(Schedulers.computation()).map(new Func1() { @Override - public Notification call(Notification t1) { - return Notification.createOnNext(null); + public Void call(Throwable t1) { + return null; } - }).startWith(Notification.createOnNext(null)); + }).startWith((Void) null); } }).subscribe(subscriber); @@ -178,16 +177,16 @@ public void testOnNextFromNotificationHandler() { Observer observer = mock(Observer.class); int NUM_RETRIES = 2; Observable origin = Observable.create(new FuncWithErrors(NUM_RETRIES)); - origin.retryWhen(new Func1>, Observable>>() { + origin.retryWhen(new Func1, Observable>() { @Override - public Observable> call(Observable> t1) { - return t1.map(new Func1, Notification>() { + public Observable call(Observable t1) { + return t1.map(new Func1() { @Override - public Notification call(Notification t1) { - return Notification.createOnNext(null); + public Void call(Throwable t1) { + return null; } - }).startWith(Notification.createOnNext(null)); + }).startWith((Void) null); } }).subscribe(observer); @@ -209,9 +208,9 @@ public void testOnCompletedFromNotificationHandler() { Observer observer = mock(Observer.class); Observable origin = Observable.create(new FuncWithErrors(1)); TestSubscriber subscriber = new TestSubscriber(observer); - origin.retryWhen(new Func1>, Observable>>() { + origin.retryWhen(new Func1, Observable>() { @Override - public Observable> call(Observable> t1) { + public Observable call(Observable t1) { return Observable.empty(); } }).subscribe(subscriber); @@ -229,9 +228,9 @@ public void testOnErrorFromNotificationHandler() { @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); Observable origin = Observable.create(new FuncWithErrors(2)); - origin.retryWhen(new Func1>, Observable>>() { + origin.retryWhen(new Func1, Observable>() { @Override - public Observable> call(Observable> t1) { + public Observable call(Observable t1) { return Observable.error(new RuntimeException()); } }).subscribe(observer);