Skip to content

Commit

Permalink
Merge pull request #1720 from abersnaze/dematerize-reWhen
Browse files Browse the repository at this point in the history
Change repeatWhen and retryWhen signatures.
  • Loading branch information
benjchristensen committed Oct 4, 2014
2 parents edbdf6c + bb8be53 commit 67df049
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 37 deletions.
60 changes: 52 additions & 8 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5801,8 +5801,19 @@ public final Observable<T> repeat(final long count, Scheduler scheduler) {
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
*/
public final Observable<T> repeatWhen(Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
return OnSubscribeRedo.repeat(this, notificationHandler, scheduler);
public final Observable<T> repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> dematerializedNotificationHandler = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Notification<?>> notifications) {
return notificationHandler.call(notifications.map(new Func1<Notification<?>, Void>() {
@Override
public Void call(Notification<?> notification) {
return null;
}
}));
}
};
return OnSubscribeRedo.repeat(this, dematerializedNotificationHandler, scheduler);
}

/**
Expand All @@ -5825,8 +5836,19 @@ public final Observable<T> repeatWhen(Func1<? super Observable<? extends Notific
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
*/
public final Observable<T> repeatWhen(Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
return OnSubscribeRedo.repeat(this, notificationHandler);
public final Observable<T> repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler) {
Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> dematerializedNotificationHandler = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Notification<?>> notifications) {
return notificationHandler.call(notifications.map(new Func1<Notification<?>, Void>() {
@Override
public Void call(Notification<?> notification) {
return null;
}
}));
}
};
return OnSubscribeRedo.repeat(this, dematerializedNotificationHandler);
}

/**
Expand Down Expand Up @@ -6541,8 +6563,19 @@ public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
* @return the source Observable modified with retry logic
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
*/
public final Observable<T> retryWhen(Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
return OnSubscribeRedo.<T> retry(this, notificationHandler);
public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler) {
Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> dematerializedNotificationHandler = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Notification<?>> notifications) {
return notificationHandler.call(notifications.map(new Func1<Notification<?>, Throwable>() {
@Override
public Throwable call(Notification<?> notification) {
return notification.getThrowable();
}
}));
}
};
return OnSubscribeRedo.<T> retry(this, dematerializedNotificationHandler);
}

/**
Expand All @@ -6566,8 +6599,19 @@ public final Observable<T> retryWhen(Func1<? super Observable<? extends Notifica
* @return the source Observable modified with retry logic
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
*/
public final Observable<T> retryWhen(Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
return OnSubscribeRedo.<T> retry(this, notificationHandler, scheduler);
public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> dematerializedNotificationHandler = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Notification<?>> notifications) {
return notificationHandler.call(notifications.map(new Func1<Notification<?>, Throwable>() {
@Override
public Throwable call(Notification<?> notification) {
return notification.getThrowable();
}
}));
}
};
return OnSubscribeRedo.<T> retry(this, dematerializedNotificationHandler, scheduler);
}

/**
Expand Down
57 changes: 28 additions & 29 deletions src/test/java/rx/internal/operators/OperatorRetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,12 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Notification;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
Expand All @@ -49,6 +43,11 @@
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class OperatorRetryTest {

@Test
Expand All @@ -73,15 +72,15 @@ public void call(Subscriber<? super String> t1) {

});
TestSubscriber<String> ts = new TestSubscriber<String>(consumer);
producer.retryWhen(new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
producer.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {

@Override
public Observable<?> call(Observable<? extends Notification<?>> attempts) {
public Observable<?> call(Observable<? extends Throwable> attempts) {
// Worker w = Schedulers.computation().createWorker();
return attempts
.map(new Func1<Notification<?>, Tuple>() {
.map(new Func1<Throwable, Tuple>() {
@Override
public Tuple call(Notification<?> n) {
public Tuple call(Throwable n) {
return new Tuple(new Long(1), n);
}})
.scan(new Func2<Tuple, Tuple, Tuple>(){
Expand All @@ -94,7 +93,7 @@ public Tuple call(Tuple t, Tuple n) {
public Observable<Long> call(Tuple t) {
System.out.println("Retry # "+t.count);
return t.count > 20 ?
Observable.<Long>error(t.n.getThrowable()) :
Observable.<Long>error(t.n) :
Observable.timer(t.count *1L, TimeUnit.MILLISECONDS);
}});
}
Expand All @@ -112,9 +111,9 @@ public Observable<Long> call(Tuple t) {

public static class Tuple {
Long count;
Notification<?> n;
Throwable n;

Tuple(Long c, Notification<?> n) {
Tuple(Long c, Throwable n) {
count = c;
this.n = n;
}
Expand Down Expand Up @@ -147,15 +146,15 @@ public void testSchedulingNotificationHandler() {
int NUM_RETRIES = 2;
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
TestSubscriber<String> subscriber = new TestSubscriber<String>(observer);
origin.retryWhen(new Func1<Observable<? extends Notification<?>>, Observable<? extends Notification<?>>>() {
origin.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> t1) {
return t1.observeOn(Schedulers.computation()).map(new Func1<Notification<?>, Notification<?>>() {
public Observable<?> call(Observable<? extends Throwable> t1) {
return t1.observeOn(Schedulers.computation()).map(new Func1<Throwable, Void>() {
@Override
public Notification<?> call(Notification<?> t1) {
return Notification.createOnNext(null);
public Void call(Throwable t1) {
return null;
}
}).startWith(Notification.createOnNext(null));
}).startWith((Void) null);
}
}).subscribe(subscriber);

Expand All @@ -178,16 +177,16 @@ public void testOnNextFromNotificationHandler() {
Observer<String> observer = mock(Observer.class);
int NUM_RETRIES = 2;
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
origin.retryWhen(new Func1<Observable<? extends Notification<?>>, Observable<? extends Notification<?>>>() {
origin.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> t1) {
return t1.map(new Func1<Notification<?>, Notification<?>>() {
public Observable<?> call(Observable<? extends Throwable> t1) {
return t1.map(new Func1<Throwable, Void>() {

@Override
public Notification<?> call(Notification<?> t1) {
return Notification.createOnNext(null);
public Void call(Throwable t1) {
return null;
}
}).startWith(Notification.createOnNext(null));
}).startWith((Void) null);
}
}).subscribe(observer);

Expand All @@ -209,9 +208,9 @@ public void testOnCompletedFromNotificationHandler() {
Observer<String> observer = mock(Observer.class);
Observable<String> origin = Observable.create(new FuncWithErrors(1));
TestSubscriber<String> subscriber = new TestSubscriber<String>(observer);
origin.retryWhen(new Func1<Observable<? extends Notification<?>>, Observable<? extends Notification<?>>>() {
origin.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> t1) {
public Observable<?> call(Observable<? extends Throwable> t1) {
return Observable.empty();
}
}).subscribe(subscriber);
Expand All @@ -229,9 +228,9 @@ public void testOnErrorFromNotificationHandler() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Observable<String> origin = Observable.create(new FuncWithErrors(2));
origin.retryWhen(new Func1<Observable<? extends Notification<?>>, Observable<? extends Notification<?>>>() {
origin.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> t1) {
public Observable<?> call(Observable<? extends Throwable> t1) {
return Observable.error(new RuntimeException());
}
}).subscribe(observer);
Expand Down

0 comments on commit 67df049

Please sign in to comment.