Skip to content

Commit

Permalink
Merge pull request ReactiveX#434 from johnhmarks/Timeout
Browse files Browse the repository at this point in the history
Implemented SerialSubscription and Timeout operator
  • Loading branch information
benjchristensen committed Oct 22, 2013
2 parents 2dc5240 + f3a62cf commit df02670
Show file tree
Hide file tree
Showing 5 changed files with 418 additions and 5 deletions.
33 changes: 28 additions & 5 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationThrottleFirst;
import rx.operators.OperationTimeout;
import rx.operators.OperationTimestamp;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
Expand Down Expand Up @@ -1855,8 +1856,6 @@ public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? e
* its {@link Observer}s; it invokes {@code onCompleted} or {@code onError} only once; and it never invokes {@code onNext} after invoking either {@code onCompleted} or {@code onError}.
* {@code synchronize} enforces this, and the Observable it returns invokes {@code onNext} and {@code onCompleted} or {@code onError} synchronously.
*
* @param <T>
* the type of item emitted by the source Observable
* @return an Observable that is a chronologically well-behaved version of the source
* Observable, and that synchronously notifies its {@link Observer}s
*/
Expand All @@ -1876,8 +1875,6 @@ public Observable<T> synchronize() {
*
* @param lock
* The lock object to synchronize each observer call on
* @param <T>
* the type of item emitted by the source Observable
* @return an Observable that is a chronologically well-behaved version of the source
* Observable, and that synchronously notifies its {@link Observer}s
*/
Expand Down Expand Up @@ -3194,7 +3191,7 @@ public Observable<Boolean> exists(Func1<? super T, Boolean> predicate) {
/**
* Determines whether an observable sequence contains a specified element.
*
* @param value
* @param element
* The element to search in the sequence.
* @return an Observable that emits if the element is in the source sequence.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228965(v=vs.103).aspx">MSDN: Observable.Contains</a>
Expand Down Expand Up @@ -4507,6 +4504,32 @@ public Observable<T> ignoreElements() {
return filter(alwaysFalse());
}

/**
* Returns either the observable sequence or an TimeoutException if timeout elapses.
* @param timeout
* The timeout duration
* @param timeUnit
* The time unit of the timeout
* @param scheduler
* The scheduler to run the timeout timers on.
* @return The source sequence with a TimeoutException in case of a timeout.
*/
public Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, scheduler));
}

/**
* Returns either the observable sequence or an TimeoutException if timeout elapses.
* @param timeout
* The timeout duration
* @param timeUnit
* The time unit of the timeout
* @return The source sequence with a TimeoutException in case of a timeout.
*/
public Observable<T> timeout(long timeout, TimeUnit timeUnit) {
return create(OperationTimeout.timeout(this, timeout, timeUnit, Schedulers.threadPoolForComputation()));
}

/**
* Whether a given {@link Function} is an internal implementation inside rx.* packages or not.
* <p>
Expand Down
128 changes: 128 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationTimeout.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Func0;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public final class OperationTimeout {
public static <T> Observable.OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
return new Timeout<T>(source, timeout, timeUnit, scheduler);
}

private static class Timeout<T> implements Observable.OnSubscribeFunc<T> {
private final Observable<? extends T> source;
private final long timeout;
private final TimeUnit timeUnit;
private final Scheduler scheduler;

private Timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
this.source = source;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.scheduler = scheduler;
}

@Override
public Subscription onSubscribe(final Observer<? super T> observer) {
final AtomicBoolean terminated = new AtomicBoolean(false);
final AtomicLong actual = new AtomicLong(0L); // Required to handle race between onNext and timeout
final SerialSubscription serial = new SerialSubscription();
final Object gate = new Object();
CompositeSubscription composite = new CompositeSubscription();
final Func0<Subscription> schedule = new Func0<Subscription>() {
@Override
public Subscription call() {
final long expected = actual.get();
return scheduler.schedule(new Action0() {
@Override
public void call() {
boolean timeoutWins = false;
synchronized (gate) {
if (expected == actual.get() && !terminated.getAndSet(true)) {
timeoutWins = true;
}
}
if (timeoutWins) {
observer.onError(new TimeoutException());
}

}
}, timeout, timeUnit);
}
};
SafeObservableSubscription subscription = new SafeObservableSubscription();
composite.add(subscription.wrap(source.subscribe(new Observer<T>() {
@Override
public void onNext(T value) {
boolean onNextWins = false;
synchronized (gate) {
if (!terminated.get()) {
actual.incrementAndGet();
onNextWins = true;
}
}
if (onNextWins) {
serial.setSubscription(schedule.call());
observer.onNext(value);
}
}

@Override
public void onError(Throwable error) {
boolean onErrorWins = false;
synchronized (gate) {
if (!terminated.getAndSet(true)) {
onErrorWins = true;
}
}
if (onErrorWins) {
serial.unsubscribe();
observer.onError(error);
}
}

@Override
public void onCompleted() {
boolean onCompletedWins = false;
synchronized (gate) {
if (!terminated.getAndSet(true)) {
onCompletedWins = true;
}
}
if (onCompletedWins) {
serial.unsubscribe();
observer.onCompleted();
}
}
})));
composite.add(serial);
serial.setSubscription(schedule.call());
return composite;
}
}
}
70 changes: 70 additions & 0 deletions rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subscriptions;

import rx.Subscription;

/**
* Represents a subscription whose underlying subscription can be swapped for another subscription
* which causes the previous underlying subscription to be unsubscribed.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
*/
public class SerialSubscription implements Subscription {
private boolean unsubscribed;
private Subscription subscription;
private final Object gate = new Object();

@Override
public void unsubscribe() {
Subscription toUnsubscribe = null;
synchronized (gate) {
if (!unsubscribed) {
if (subscription != null) {
toUnsubscribe = subscription;
subscription = null;
}
unsubscribed = true;
}
}
if (toUnsubscribe != null) {
toUnsubscribe.unsubscribe();
}
}

public Subscription getSubscription() {
synchronized (gate) {
return subscription;
}
}

public void setSubscription(Subscription subscription) {
Subscription toUnsubscribe = null;
synchronized (gate) {
if (!unsubscribed) {
if (this.subscription != null) {
toUnsubscribe = this.subscription;
}
this.subscription = subscription;
} else {
toUnsubscribe = subscription;
}
}
if (toUnsubscribe != null) {
toUnsubscribe.unsubscribe();
}
}
}
117 changes: 117 additions & 0 deletions rxjava-core/src/test/java/rx/TimeoutTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;

import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
import rx.concurrency.TestScheduler;
import rx.subjects.PublishSubject;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

public class TimeoutTests {
private PublishSubject<String> underlyingSubject;
private TestScheduler testScheduler;
private Observable<String> withTimeout;
private static final long TIMEOUT = 3;
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);

underlyingSubject = PublishSubject.create();
testScheduler = new TestScheduler();
withTimeout = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, testScheduler);
}

@Test
public void shouldNotTimeoutIfOnNextWithinTimeout() {
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("One");
verify(observer).onNext("One");
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
verify(observer, never()).onError(any(Throwable.class));
subscription.unsubscribe();
}

@Test
public void shouldNotTimeoutIfSecondOnNextWithinTimeout() {
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("One");
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("Two");
verify(observer).onNext("Two");
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
verify(observer, never()).onError(any(Throwable.class));
subscription.unsubscribe();
}

@Test
public void shouldTimeoutIfOnNextNotWithinTimeout() {
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS);
verify(observer).onError(any(TimeoutException.class));
subscription.unsubscribe();
}

@Test
public void shouldTimeoutIfSecondOnNextNotWithinTimeout() {
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onNext("One");
verify(observer).onNext("One");
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS);
verify(observer).onError(any(TimeoutException.class));
subscription.unsubscribe();
}

@Test
public void shouldCompleteIfUnderlyingComletes() {
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onCompleted();
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
verify(observer).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
subscription.unsubscribe();
}

@Test
public void shouldErrorIfUnderlyingErrors() {
Observer<String> observer = mock(Observer.class);
Subscription subscription = withTimeout.subscribe(observer);
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
underlyingSubject.onError(new UnsupportedOperationException());
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
verify(observer).onError(any(UnsupportedOperationException.class));
subscription.unsubscribe();
}
}
Loading

0 comments on commit df02670

Please sign in to comment.