From c030321bcffbebc654b414160a4cd425031f150f Mon Sep 17 00:00:00 2001 From: hura Date: Sat, 4 Jan 2014 11:53:49 -0500 Subject: [PATCH] Added `Observable.timeout` wrappers to scala adapter Added the four timeout methods on Observable in the Scala adaptor. Note for the java/scala type interop: http://stackoverflow.com/q/20912151 --- .../main/scala/rx/lang/scala/Observable.scala | 83 ++++++++++++++++++- 1 file changed, 79 insertions(+), 4 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index a39754e593..560cea3a18 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -1600,6 +1600,81 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit, scheduler)) } + /** + * Applies a timeout policy for each item emitted by the Observable, using + * the specified scheduler to run timeout timers. If the next item isn't + * observed within the specified timeout duration starting from its + * predecessor, observers are notified of a `TimeoutException`. + *

+ * + * + * @param timeout maximum duration between items before a timeout occurs + * @return the source Observable modified to notify observers of a + * `TimeoutException` in case of a timeout + */ + def timeout(timeout: Duration): Observable[T] = { + toScalaObservable[T](asJavaObservable.timeout(timeout.length, timeout.unit)) + } + + /** + * Applies a timeout policy for each item emitted by the Observable, using + * the specified scheduler to run timeout timers. If the next item isn't + * observed within the specified timeout duration starting from its + * predecessor, a specified fallback Observable produces future items and + * notifications from that point on. + *

+ * + * + * @param timeout maximum duration between items before a timeout occurs + * @param other fallback Observable to use in case of a timeout + * @return the source Observable modified to switch to the fallback + * Observable in case of a timeout + */ + def timeout[U >: T](timeout: Duration, other: Observable[U]): Observable[U] = { + val otherJava: rx.Observable[_ <: U] = other.asJavaObservable + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + toScalaObservable[U](thisJava.timeout(timeout.length, timeout.unit, otherJava)) + } + + /** + * Applies a timeout policy for each item emitted by the Observable, using + * the specified scheduler to run timeout timers. If the next item isn't + * observed within the specified timeout duration starting from its + * predecessor, the observer is notified of a `TimeoutException`. + *

+ * + * + * @param timeout maximum duration between items before a timeout occurs + * @param scheduler Scheduler to run the timeout timers on + * @return the source Observable modified to notify observers of a + * `TimeoutException` in case of a timeout + */ + def timeout(timeout: Duration, scheduler: Scheduler): Observable[T] = { + toScalaObservable[T](asJavaObservable.timeout(timeout.length, timeout.unit, scheduler.asJavaScheduler)) + } + + /** + * Applies a timeout policy for each item emitted by the Observable, using + * the specified scheduler to run timeout timers. If the next item isn't + * observed within the specified timeout duration starting from its + * predecessor, a specified fallback Observable sequence produces future + * items and notifications from that point on. + *

+ * + * + * @param timeout maximum duration between items before a timeout occurs + * @param other Observable to use as the fallback in case of a timeout + * @param scheduler Scheduler to run the timeout timers on + * @return the source Observable modified so that it will switch to the + * fallback Observable in case of a timeout + */ + def timeout[U >: T](timeout: Duration, other: Observable[U], scheduler: Scheduler): Observable[U] = { + val otherJava: rx.Observable[_ <: U] = other.asJavaObservable + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + toScalaObservable[U](thisJava.timeout(timeout.length, timeout.unit, otherJava, scheduler.asJavaScheduler)) + } + + /** * Returns an Observable that sums up the elements of this Observable. * @@ -1894,10 +1969,10 @@ trait Observable[+T] } /** - * Invokes an action if the source Observable calls onError. + * Invokes an action if the source Observable calls `onError`. * * @param onError the action to invoke if the source Observable calls - * onError + * `onError` * @return the source Observable with the side-effecting behavior applied */ def doOnError(onError: Throwable => Unit): Observable[T] = { @@ -1905,10 +1980,10 @@ trait Observable[+T] } /** - * Invokes an action when the source Observable calls onCompleted. + * Invokes an action when the source Observable calls `onCompleted`. * * @param onCompleted the action to invoke when the source Observable calls - * onCompleted + * `onCompleted` * @return the source Observable with the side-effecting behavior applied */ def doOnCompleted(onCompleted: () => Unit): Observable[T] = {