Skip to content

Commit

Permalink
Merge pull request #1555 from jbripley/rxscala-retrywhen
Browse files Browse the repository at this point in the history
RxScala: Add retryWhen/repeatWhen methods
  • Loading branch information
benjchristensen committed Aug 8, 2014
2 parents a7953e6 + 22f2062 commit 0fe6e01
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 0 deletions.
24 changes: 24 additions & 0 deletions language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,30 @@ class RxScalaDemo extends JUnitSuite {
}.subscribe(s => println(s), e => e.printStackTrace())
}

@Test def retryWhenExample(): Unit = {
Observable[String]({ subscriber =>
println("subscribing")
subscriber.onError(new RuntimeException("always fails"))
}).retryWhen(attempts => {
attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
println("delay retry by " + i + " second(s)")
Observable.timer(Duration(i, TimeUnit.SECONDS))
})
}).toBlocking.foreach(s => println(s))
}

@Test def repeatWhenExample(): Unit = {
Observable[String]({ subscriber =>
println("subscribing")
subscriber.onCompleted()
}).repeatWhen(attempts => {
attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
println("delay repeat by " + i + " second(s)")
Observable.timer(Duration(i, TimeUnit.SECONDS)).materialize
})
}, NewThreadScheduler()).toBlocking.foreach(s => println(s))
}

@Test def liftExample1(): Unit = {
// Add "No. " in front of each item
val o = List(1, 2, 3).toObservable.lift {
Expand Down
149 changes: 149 additions & 0 deletions language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -3175,6 +3175,94 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.retry(f))
}

/**
* Returns an Observable that emits the same values as the source observable with the exception of an
* {@code onError}. An {@code onError} notification from the source will result in the emission of a
* {@link Notification} to the Observable provided as an argument to the {@code notificationHandler}
* function. If the Observable returned {@code onCompletes} or {@code onErrors} then {@code retry} will call
* {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
* resubscribe to the source Observable.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
*
* Example:
*
* This retries 3 times, each time incrementing the number of seconds it waits.
*
* <pre> {@code
* Observable[String]({ subscriber =>
* println("subscribing")
* subscriber.onError(new RuntimeException("always fails"))
* }).retryWhen(attempts => {
* attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
* println("delay retry by " + i + " second(s)")
* Observable.timer(Duration(i, TimeUnit.SECONDS))
* })
* }).toBlocking.foreach(s => println(s))
* } </pre>
*
* Output is:
*
* <pre> {@code
* subscribing
* delay retry by 1 second(s)
* subscribing
* delay retry by 2 second(s)
* subscribing
* delay retry by 3 second(s)
* subscribing
* } </pre>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retryWhen} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the
* retry
* @return the source Observable modified with retry logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
* @since 0.20
*/
def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any]): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).asJavaObservable
}

toScalaObservable[T](asJavaObservable.retryWhen(f))
}

/**
* Returns an Observable that emits the same values as the source observable with the exception of an {@code onError}.
* An onError will emit a {@link Notification} to the observable provided as an argument to the notificationHandler
* func. If the observable returned {@code onCompletes} or {@code onErrors} then retry will call {@code onCompleted}
* or {@code onError} on the child subscription. Otherwise, this observable will resubscribe to the source observable, on a particular Scheduler.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
* <p>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the
* retry
* @param scheduler the Scheduler on which to subscribe to the source Observable
* @return the source Observable modified with retry logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
* @since 0.20
*/
def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Notification[Any]], scheduler: Scheduler): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: rx.Notification[_ <: Any]]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).map({ n => n.asJavaNotification }).asJavaObservable
}

toScalaObservable[T](asJavaObservable.retryWhen(f, scheduler))
}

/**
* Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely.
* <p>
Expand Down Expand Up @@ -3237,6 +3325,67 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.repeat(count, scheduler))
}

/**
* Returns an Observable that emits the same values as the source Observable with the exception of an
* {@code onCompleted}. An {@code onCompleted} notification from the source will result in the emission of
* a {@link Notification} to the Observable provided as an argument to the {@code notificationHandler}
* function. If the Observable returned {@code onCompletes} or {@code onErrors} then {@code repeatWhen} will
* call {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
* resubscribe to the source Observable, on a particular Scheduler.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat.
* @param scheduler the Scheduler to emit the items on
* @return the source Observable modified with repeat logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
* @since 0.20
*/
def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Notification[Any]], scheduler: Scheduler): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: rx.Notification[_ <: Any]]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).map({ n => n.asJavaNotification }).asJavaObservable
}

toScalaObservable[T](asJavaObservable.repeatWhen(f, scheduler))
}

/**
* Returns an Observable that emits the same values as the source Observable with the exception of an
* {@code onCompleted}. An {@code onCompleted} notification from the source will result in the emission of
* a {@link Notification} to the Observable provided as an argument to the {@code notificationHandler}
* function. If the Observable returned {@code onCompletes} or {@code onErrors} then {@code repeatWhen} will
* call {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
* resubscribe to the source observable.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeatWhen} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat.
* @return the source Observable modified with repeat logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
* @since 0.20
*/
def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Notification[Any]]): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: rx.Notification[_ <: Any]]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).map({ n => n.asJavaNotification }).asJavaObservable
}

toScalaObservable[T](asJavaObservable.repeatWhen(f))
}

/**
* Converts an Observable into a [[BlockingObservable]] (an Observable with blocking operators).
*
Expand Down

0 comments on commit 0fe6e01

Please sign in to comment.