From 34c563858e028d3fe7a2d3cac5a0585fac61f3a9 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 11 Jan 2014 11:57:16 +0100 Subject: [PATCH] Timeout with selector overloads --- rxjava-core/src/main/java/rx/Observable.java | 80 ++++++ .../java/rx/operators/OperationTimeout.java | 158 ++++++++++++ .../rx/operators/OperationTimeoutTest.java | 236 ++++++++++++++++++ 3 files changed, 474 insertions(+) create mode 100644 rxjava-core/src/test/java/rx/operators/OperationTimeoutTest.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 2ac2fb7b5d..6319767d4d 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -6691,6 +6691,86 @@ public Observable timeout(long timeout, TimeUnit timeUnit, Observable + * The arrival of the first source item is not timed out. + * @param the timeout value type (ignored) + * @param timeoutSelector function that returns an observable for each source item + * which determines the timeout window for the subsequent source item + * @return an observable which completes if a source item doesn't arrive after the + * previous one in the time window specified by the per-item observable. + */ + public Observable timeout(Func1> timeoutSelector) { + return timeout(timeoutSelector, Observable.empty()); + } + + /** + * Create an observable which switches to the other Observable if a source + * item doesn't arrive after the + * previous one in the time window specified by the per-item observable. + *

+ * The arrival of the first source item is not timed out. + * @param the timeout value type (ignored) + * @param timeoutSelector function that returns an observable for each source item + * which determines the timeout window for the subsequent source item + * @param other the other observable to switch to if the source times out + * @return an observable which switches to the other Observable if a source + * item doesn't arrive after the + * previous one in the time window specified by the per-item observable + */ + public Observable timeout(Func1> timeoutSelector, Observable other) { + if (other == null) { + throw new NullPointerException("other"); + } + return create(OperationTimeout.timeoutSelector(this, null, timeoutSelector, other)); + } + + /** + * Create an Observable which completes if either the first item or any subsequent item + * doesn't arrive within the time window specified by the timeout selectors' Observable. + * @param the first timeout value type (ignored) + * @param the subsequent timeout value type (ignored) + * @param firstTimeoutSelector function that returns an observable which determines + * the timeout window for the first source item + * @param timeoutSelector function that returns an observable for each source item + * which determines the timeout window for the subsequent source item + * @return an Observable which completes if either the first item or any subsequent item + * doesn't arrive within the time window specified by the timeout selectors' Observable. + */ + public Observable timeout(Func0> firstTimeoutSelector, Func1> timeoutSelector) { + if (firstTimeoutSelector == null) { + throw new NullPointerException("firstTimeoutSelector"); + } + return timeout(firstTimeoutSelector, timeoutSelector, Observable.empty()); + } + + /** + * Create an Observable which switches to another Observable + * if either the first item or any subsequent item + * doesn't arrive within the time window specified by the timeout selectors' Observable. + * @param the first timeout value type (ignored) + * @param the subsequent timeout value type (ignored) + * @param firstTimeoutSelector function that returns an observable which determines + * the timeout window for the first source item + * @param timeoutSelector function that returns an observable for each source item + * which determines the timeout window for the subsequent source item + * @param other the other observable to switch to if the source times out + * @return an Observable which switches to another Observable + * if either the first item or any subsequent item + * doesn't arrive within the time window specified by the timeout selectors' Observable + */ + public Observable timeout(Func0> firstTimeoutSelector, Func1> timeoutSelector, Observable other) { + if (firstTimeoutSelector == null) { + throw new NullPointerException("firstTimeoutSelector"); + } + if (other == null) { + throw new NullPointerException("other"); + } + return create(OperationTimeout.timeoutSelector(this, firstTimeoutSelector, timeoutSelector, other)); + } + /** * Records the time interval between consecutive items emitted by an * Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationTimeout.java b/rxjava-core/src/main/java/rx/operators/OperationTimeout.java index 8f4c5b4b0e..f93bfd2eec 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTimeout.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTimeout.java @@ -28,8 +28,10 @@ import rx.schedulers.Schedulers; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.SerialSubscription; +import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Func0; +import rx.util.functions.Func1; /** * Applies a timeout policy for each element in the observable sequence, using @@ -154,4 +156,160 @@ public void onCompleted() { return composite; } } + + /** Timeout using a per-item observable sequence. */ + public static OnSubscribeFunc timeoutSelector(Observable source, Func0> firstValueTimeout, Func1> valueTimeout, Observable other) { + return new TimeoutSelector(source, firstValueTimeout, valueTimeout, other); + } + + /** Timeout using a per-item observable sequence. */ + private static final class TimeoutSelector implements OnSubscribeFunc { + final Observable source; + final Func0> firstValueTimeout; + final Func1> valueTimeout; + final Observable other; + + public TimeoutSelector(Observable source, Func0> firstValueTimeout, Func1> valueTimeout, Observable other) { + this.source = source; + this.firstValueTimeout = firstValueTimeout; + this.valueTimeout = valueTimeout; + this.other = other; + } + + @Override + public Subscription onSubscribe(Observer t1) { + CompositeSubscription csub = new CompositeSubscription(); + + SourceObserver so = new SourceObserver(t1, valueTimeout, other, csub); + if (firstValueTimeout != null) { + Observable o; + try { + o = firstValueTimeout.call(); + } catch (Throwable t) { + t1.onError(t); + return Subscriptions.empty(); + } + + csub.add(o.subscribe(new TimeoutObserver(so))); + } + csub.add(source.subscribe(so)); + return csub; + } + + /** Observe the source. */ + private static final class SourceObserver implements Observer, TimeoutCallback { + final Observer observer; + final Func1> valueTimeout; + final Observable other; + final CompositeSubscription cancel; + final Object guard; + boolean done; + final SerialSubscription tsub; + final TimeoutObserver to; + + public SourceObserver(Observer observer, Func1> valueTimeout, Observable other, CompositeSubscription cancel) { + this.observer = observer; + this.valueTimeout = valueTimeout; + this.other = other; + this.cancel = cancel; + this.guard = new Object(); + this.tsub = new SerialSubscription(); + this.cancel.add(tsub); + this.to = new TimeoutObserver(this); + } + + @Override + public void onNext(T args) { + tsub.set(Subscriptions.empty()); + + synchronized (guard) { + if (done) { + return; + } + observer.onNext(args); + } + + Observable o; + try { + o = valueTimeout.call(args); + } catch (Throwable t) { + onError(t); + return; + } + + SerialSubscription osub = new SerialSubscription(); + tsub.set(osub); + + osub.set(o.subscribe(to)); + } + @Override + public void onError(Throwable e) { + synchronized (guard) { + if (done) { + return; + } + done = true; + observer.onError(e); + } + cancel.unsubscribe(); + } + + @Override + public void onCompleted() { + synchronized (guard) { + if (done) { + return; + } + done = true; + observer.onCompleted(); + } + cancel.unsubscribe(); + } + @Override + public void timeout() { + if (other != null) { + synchronized (guard) { + if (done) { + return; + } + done = true; + } + cancel.clear(); + cancel.add(other.subscribe(observer)); + } else { + onCompleted(); + } + } + } + + /** The timeout callback. */ + private interface TimeoutCallback { + void timeout(); + void onError(Throwable t); + } + + /** Observe the timeout. */ + private static final class TimeoutObserver implements Observer { + final TimeoutCallback parent; + + public TimeoutObserver(TimeoutCallback parent) { + this.parent = parent; + } + + @Override + public void onNext(V args) { + parent.timeout(); + } + + @Override + public void onError(Throwable e) { + parent.onError(e); + } + + @Override + public void onCompleted() { + parent.timeout(); + } + } + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationTimeoutTest.java b/rxjava-core/src/test/java/rx/operators/OperationTimeoutTest.java new file mode 100644 index 0000000000..833d2060ec --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationTimeoutTest.java @@ -0,0 +1,236 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.Arrays; +import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Mockito.*; +import rx.Observable; +import rx.Observer; +import rx.subjects.PublishSubject; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +public class OperationTimeoutTest { + @Test + public void testTimeoutSelectorNormal1() { + PublishSubject source = PublishSubject.create(); + final PublishSubject timeout = PublishSubject.create(); + + Func1> timeoutFunc = new Func1>() { + @Override + public Observable call(Integer t1) { + return timeout; + } + }; + + Func0> firstTimeoutFunc = new Func0>() { + @Override + public Observable call() { + return timeout; + } + }; + + Observable other = Observable.from(Arrays.asList(100)); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.timeout(firstTimeoutFunc, timeoutFunc, other).subscribe(o); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + timeout.onNext(1); + + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(2); + inOrder.verify(o).onNext(3); + inOrder.verify(o).onNext(100); + inOrder.verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + + } + + @Test + public void testTimeoutSelectorTimeoutFirst() { + PublishSubject source = PublishSubject.create(); + final PublishSubject timeout = PublishSubject.create(); + + Func1> timeoutFunc = new Func1>() { + @Override + public Observable call(Integer t1) { + return timeout; + } + }; + + Func0> firstTimeoutFunc = new Func0>() { + @Override + public Observable call() { + return timeout; + } + }; + + Observable other = Observable.from(Arrays.asList(100)); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.timeout(firstTimeoutFunc, timeoutFunc, other).subscribe(o); + + timeout.onNext(1); + + inOrder.verify(o).onNext(100); + inOrder.verify(o).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + + } + + @Test + public void testTimeoutSelectorFirstThrows() { + PublishSubject source = PublishSubject.create(); + final PublishSubject timeout = PublishSubject.create(); + + Func1> timeoutFunc = new Func1>() { + @Override + public Observable call(Integer t1) { + return timeout; + } + }; + + Func0> firstTimeoutFunc = new Func0>() { + @Override + public Observable call() { + throw new OperationReduceTest.CustomException(); + } + }; + + Observable other = Observable.from(Arrays.asList(100)); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + source.timeout(firstTimeoutFunc, timeoutFunc, other).subscribe(o); + + verify(o).onError(any(OperationReduceTest.CustomException.class)); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + + } + @Test + public void testTimeoutSelectorSubsequentThrows() { + PublishSubject source = PublishSubject.create(); + final PublishSubject timeout = PublishSubject.create(); + + Func1> timeoutFunc = new Func1>() { + @Override + public Observable call(Integer t1) { + throw new OperationReduceTest.CustomException(); + } + }; + + Func0> firstTimeoutFunc = new Func0>() { + @Override + public Observable call() { + return timeout; + } + }; + + Observable other = Observable.from(Arrays.asList(100)); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.timeout(firstTimeoutFunc, timeoutFunc, other).subscribe(o); + + source.onNext(1); + + inOrder.verify(o).onNext(1); + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + verify(o, never()).onCompleted(); + + } + + @Test + public void testTimeoutSelectorFirstObservableThrows() { + PublishSubject source = PublishSubject.create(); + final PublishSubject timeout = PublishSubject.create(); + + Func1> timeoutFunc = new Func1>() { + @Override + public Observable call(Integer t1) { + return timeout; + } + }; + + Func0> firstTimeoutFunc = new Func0>() { + @Override + public Observable call() { + return Observable.error(new OperationReduceTest.CustomException()); + } + }; + + Observable other = Observable.from(Arrays.asList(100)); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + source.timeout(firstTimeoutFunc, timeoutFunc, other).subscribe(o); + + verify(o).onError(any(OperationReduceTest.CustomException.class)); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + + } + @Test + public void testTimeoutSelectorSubsequentObservableThrows() { + PublishSubject source = PublishSubject.create(); + final PublishSubject timeout = PublishSubject.create(); + + Func1> timeoutFunc = new Func1>() { + @Override + public Observable call(Integer t1) { + return Observable.error(new OperationReduceTest.CustomException()); + } + }; + + Func0> firstTimeoutFunc = new Func0>() { + @Override + public Observable call() { + return timeout; + } + }; + + Observable other = Observable.from(Arrays.asList(100)); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + InOrder inOrder = inOrder(o); + + source.timeout(firstTimeoutFunc, timeoutFunc, other).subscribe(o); + + source.onNext(1); + + inOrder.verify(o).onNext(1); + inOrder.verify(o).onError(any(OperationReduceTest.CustomException.class)); + verify(o, never()).onCompleted(); + + } +}