Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Observable.timeout wrappers to scala adapter #720

Merged
merged 1 commit into from
Jan 9, 2014
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,81 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit, scheduler))
}

/**
* Applies a timeout policy for each item emitted by the Observable, using
* the specified scheduler to run timeout timers. If the next item isn't
* observed within the specified timeout duration starting from its
* predecessor, observers are notified of a `TimeoutException`.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.1.png">
*
* @param timeout maximum duration between items before a timeout occurs
* @return the source Observable modified to notify observers of a
* `TimeoutException` in case of a timeout
*/
def timeout(timeout: Duration): Observable[T] = {
toScalaObservable[T](asJavaObservable.timeout(timeout.length, timeout.unit))
}

/**
* Applies a timeout policy for each item emitted by the Observable, using
* the specified scheduler to run timeout timers. If the next item isn't
* observed within the specified timeout duration starting from its
* predecessor, a specified fallback Observable produces future items and
* notifications from that point on.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.2.png">
*
* @param timeout maximum duration between items before a timeout occurs
* @param other fallback Observable to use in case of a timeout
* @return the source Observable modified to switch to the fallback
* Observable in case of a timeout
*/
def timeout[U >: T](timeout: Duration, other: Observable[U]): Observable[U] = {
val otherJava: rx.Observable[_ <: U] = other.asJavaObservable
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
toScalaObservable[U](thisJava.timeout(timeout.length, timeout.unit, otherJava))
}

/**
* Applies a timeout policy for each item emitted by the Observable, using
* the specified scheduler to run timeout timers. If the next item isn't
* observed within the specified timeout duration starting from its
* predecessor, the observer is notified of a `TimeoutException`.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.1s.png">
*
* @param timeout maximum duration between items before a timeout occurs
* @param scheduler Scheduler to run the timeout timers on
* @return the source Observable modified to notify observers of a
* `TimeoutException` in case of a timeout
*/
def timeout(timeout: Duration, scheduler: Scheduler): Observable[T] = {
toScalaObservable[T](asJavaObservable.timeout(timeout.length, timeout.unit, scheduler.asJavaScheduler))
}

/**
* Applies a timeout policy for each item emitted by the Observable, using
* the specified scheduler to run timeout timers. If the next item isn't
* observed within the specified timeout duration starting from its
* predecessor, a specified fallback Observable sequence produces future
* items and notifications from that point on.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.2s.png">
*
* @param timeout maximum duration between items before a timeout occurs
* @param other Observable to use as the fallback in case of a timeout
* @param scheduler Scheduler to run the timeout timers on
* @return the source Observable modified so that it will switch to the
* fallback Observable in case of a timeout
*/
def timeout[U >: T](timeout: Duration, other: Observable[U], scheduler: Scheduler): Observable[U] = {
val otherJava: rx.Observable[_ <: U] = other.asJavaObservable
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
toScalaObservable[U](thisJava.timeout(timeout.length, timeout.unit, otherJava, scheduler.asJavaScheduler))
}


/**
* Returns an Observable that sums up the elements of this Observable.
*
Expand Down Expand Up @@ -1894,21 +1969,21 @@ trait Observable[+T]
}

/**
* Invokes an action if the source Observable calls <code>onError</code>.
* Invokes an action if the source Observable calls `onError`.
*
* @param onError the action to invoke if the source Observable calls
* <code>onError</code>
* `onError`
* @return the source Observable with the side-effecting behavior applied
*/
def doOnError(onError: Throwable => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnError(onError))
}

/**
* Invokes an action when the source Observable calls <code>onCompleted</code>.
* Invokes an action when the source Observable calls `onCompleted`.
*
* @param onCompleted the action to invoke when the source Observable calls
* <code>onCompleted</code>
* `onCompleted`
* @return the source Observable with the side-effecting behavior applied
*/
def doOnCompleted(onCompleted: () => Unit): Observable[T] = {
Expand Down