diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 7c5a9bb993..78d49425c9 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -5381,16 +5381,45 @@ public Observable skip(int num) { } /** - * If the source Observable completes after emitting a single item, return - * an Observable that emits that item. If the source Observable emits more - * than one item or no items, throw an IllegalArgumentException. - *

- * + * Create an Observable that skips values before the given time ellapses. + * + * @param time + * the length of the time window + * @param unit + * the time unit + * @return an Observable that skips values before the given time ellapses + */ + public Observable skip(long time, TimeUnit unit) { + return skip(time, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Create an Observable that skips values before the given time + * elapses while waiting on the given scheduler. + * + * @param time + * the length of the time window + * @param unit + * the time unit + * @param scheduler + * the scheduler where the timed wait happens + * @return an Observable that skips values before the given time + * elapses while waiting on the given scheduler + */ + public Observable skip(long time, TimeUnit unit, Scheduler scheduler) { + return create(new OperationSkip.SkipTimed(this, time, unit, scheduler)); + } + + /** + * If the Observable completes after emitting a single item, return an + * Observable containing that item. If it emits more than one item or no + * item, throw an IllegalArgumentException. * * @return an Observable that emits the single item emitted by the source * Observable that matches the predicate - * @throws IllegalArgumentException if the source emits more than one item - * or no items + * @throws IllegalArgumentException + * if the source emits more than one item + * or no items * @see RxJava Wiki: single() * @see MSDN: Observable.singleAsync() */ @@ -5575,6 +5604,31 @@ public Observable take(final int num) { return create(OperationTake.take(this, num)); } + /** + * Create an Observable that takes the emitted values of the source + * Observable before the time runs out. + * @param time the length of the time window + * @param unit the time unit + * @return an Observable that takes the emitted values of the source + * Observable before the time runs out. + */ + public Observable take(long time, TimeUnit unit) { + return take(time, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Create an Observable that takes the emitted values of the source + * Observable before the time runs out, waiting on the given scheduler. + * @param time the length of the time window + * @param unit the time unit + * @param scheduler the scheduler used for time source + * @return an Observable that takes the emitted values of the source + * Observable before the time runs out, waiting on the given scheduler. + */ + public Observable take(long time, TimeUnit unit, Scheduler scheduler) { + return create(new OperationTake.TakeTimed(this, time, unit, scheduler)); + } + /** * Returns an Observable that emits items emitted by the source Observable * so long as a specified condition is true. @@ -5917,6 +5971,31 @@ public Observable skipLast(int count) { return create(OperationSkipLast.skipLast(this, count)); } + /** + * Create an observable which skips values emitted in a time window + * before the source completes. + * @param time the length of the time window + * @param unit the time unit + * @return an observable which skips values emitted in a time window + * before the source completes + */ + public Observable skipLast(long time, TimeUnit unit) { + return skipLast(time, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Create an observable which skips values emitted in a time window + * before the source completes by using the given scheduler as time source. + * @param time the length of the time window + * @param unit the time unit + * @param scheduler the scheduler used for time source + * @return an observable which skips values emitted in a time window + * before the source completes by using the given scheduler as time source + */ + public Observable skipLast(long time, TimeUnit unit, Scheduler scheduler) { + return create(new OperationSkipLast.SkipLastTimed(this, time, unit, scheduler)); + } + /** * Returns an Observable that emits a single item, a list composed of all * the items emitted by the source Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkip.java b/rxjava-core/src/main/java/rx/operators/OperationSkip.java index 4dc3359815..0b33fb0af9 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSkip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSkip.java @@ -15,12 +15,17 @@ */ package rx.operators; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; +import rx.Scheduler; import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.util.functions.Action0; /** * Returns an Observable that skips the first num items emitted by the source @@ -107,4 +112,86 @@ public void onNext(T args) { } } + + /** + * Skip the items after subscription for the given duration. + * @param the value type + */ + public static final class SkipTimed implements OnSubscribeFunc { + final Observable source; + final long time; + final TimeUnit unit; + final Scheduler scheduler; + + public SkipTimed(Observable source, long time, TimeUnit unit, Scheduler scheduler) { + this.source = source; + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Subscription onSubscribe(Observer t1) { + + SafeObservableSubscription timer = new SafeObservableSubscription(); + SafeObservableSubscription data = new SafeObservableSubscription(); + + CompositeSubscription csub = new CompositeSubscription(timer, data); + + SourceObserver so = new SourceObserver(t1, csub); + data.wrap(source.subscribe(so)); + if (!data.isUnsubscribed()) { + timer.wrap(scheduler.schedule(so, time, unit)); + } + + return csub; + } + /** + * Observes the source and relays its values once gate turns into true. + * @param the observed value type + */ + private static final class SourceObserver implements Observer, Action0 { + final AtomicBoolean gate; + final Observer observer; + final Subscription cancel; + + public SourceObserver(Observer observer, + Subscription cancel) { + this.gate = new AtomicBoolean(); + this.observer = observer; + this.cancel = cancel; + } + + @Override + public void onNext(T args) { + if (gate.get()) { + observer.onNext(args); + } + } + + @Override + public void onError(Throwable e) { + try { + observer.onError(e); + } finally { + cancel.unsubscribe(); + } + } + + @Override + public void onCompleted() { + try { + observer.onCompleted(); + } finally { + cancel.unsubscribe(); + } + } + + @Override + public void call() { + gate.set(true); + } + + } + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java b/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java index f3cb462e55..a25186a4d1 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java @@ -15,14 +15,20 @@ */ package rx.operators; +import java.util.ArrayList; +import java.util.Collections; import java.util.Deque; import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; +import rx.Scheduler; import rx.Subscription; +import rx.util.Timestamped; /** * Bypasses a specified number of elements at the end of an observable sequence. @@ -123,4 +129,73 @@ public void onNext(T value) { })); } } + + /** + * Skip delivering values in the time window before the values. + * @param the result value type + */ + public static final class SkipLastTimed implements OnSubscribeFunc { + final Observable source; + final long timeInMillis; + final Scheduler scheduler; + + public SkipLastTimed(Observable source, long time, TimeUnit unit, Scheduler scheduler) { + this.source = source; + this.timeInMillis = unit.toMillis(time); + this.scheduler = scheduler; + } + + @Override + public Subscription onSubscribe(Observer t1) { + return source.subscribe(new SourceObserver(t1, timeInMillis, scheduler)); + } + /** Observes the source. */ + private static final class SourceObserver implements Observer { + final Observer observer; + final long timeInMillis; + final Scheduler scheduler; + List> buffer = new ArrayList>(); + + public SourceObserver(Observer observer, + long timeInMillis, Scheduler scheduler) { + this.observer = observer; + this.timeInMillis = timeInMillis; + this.scheduler = scheduler; + } + + @Override + public void onNext(T args) { + buffer.add(new Timestamped(scheduler.now(), args)); + } + + @Override + public void onError(Throwable e) { + buffer = Collections.emptyList(); + observer.onError(e); + } + + @Override + public void onCompleted() { + long limit = scheduler.now() - timeInMillis; + try { + for (Timestamped v : buffer) { + if (v.getTimestampMillis() < limit) { + try { + observer.onNext(v.getValue()); + } catch (Throwable t) { + observer.onError(t); + return; + } + } else { + observer.onCompleted(); + break; + } + } + } finally { + buffer = Collections.emptyList(); + } + } + + } + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 877ba4d5f3..797eb47d43 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -15,13 +15,17 @@ */ package rx.operators; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; +import rx.Scheduler; import rx.Subscription; +import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; /** * Returns an Observable that emits the first num items emitted by the source @@ -161,4 +165,123 @@ public void onNext(T args) { } } + + /** + * Takes values from the source until a timer fires. + * @param the result value type + */ + public static final class TakeTimed implements OnSubscribeFunc { + final Observable source; + final long time; + final TimeUnit unit; + final Scheduler scheduler; + + public TakeTimed(Observable source, long time, TimeUnit unit, Scheduler scheduler) { + this.source = source; + this.time = time; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Subscription onSubscribe(Observer t1) { + + SafeObservableSubscription timer = new SafeObservableSubscription(); + SafeObservableSubscription data = new SafeObservableSubscription(); + + CompositeSubscription csub = new CompositeSubscription(timer, data); + + SourceObserver so = new SourceObserver(t1, csub); + data.wrap(source.subscribe(so)); + if (!data.isUnsubscribed()) { + timer.wrap(scheduler.schedule(so, time, unit)); + } + + return csub; + } + /** + * Observes the source and relays its values until gate turns into false. + * @param the observed value type + */ + private static final class SourceObserver implements Observer, Action0 { + final Observer observer; + final Subscription cancel; + final AtomicInteger state = new AtomicInteger(); + static final int ACTIVE = 0; + static final int NEXT = 1; + static final int DONE = 2; + + public SourceObserver(Observer observer, + Subscription cancel) { + this.observer = observer; + this.cancel = cancel; + } + + @Override + public void onNext(T args) { + do { + int s = state.get(); + if (s == DONE) { + return; + } + if (state.compareAndSet(s, NEXT)) { + try { + observer.onNext(args); + } finally { + state.set(ACTIVE); + return; + } + } + } while (true); + } + + @Override + public void onError(Throwable e) { + do { + int s = state.get(); + if (s == DONE) { + return; + } else + if (s == NEXT) { + continue; + } else + if (state.compareAndSet(s, DONE)) { + try { + observer.onError(e); + } finally { + cancel.unsubscribe(); + } + return; + } + } while (true); + } + + @Override + public void onCompleted() { + do { + int s = state.get(); + if (s == DONE) { + return; + } else + if (s == NEXT) { + continue; + } else + if (state.compareAndSet(s, DONE)) { + try { + observer.onCompleted(); + } finally { + cancel.unsubscribe(); + } + return; + } + } while (true); + } + + @Override + public void call() { + onCompleted(); + } + + } + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java index bdea8a0a25..e0054704b6 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java @@ -156,9 +156,9 @@ public TakeLastTimed(Observable source, int count, long time, TimeU @Override public Subscription onSubscribe(Observer t1) { - SafeObservableSubscription s = new SafeObservableSubscription(); - source.subscribe(new TakeLastTimedObserver(t1, s, count, ageMillis, scheduler)); - return s; + SafeObservableSubscription sas = new SafeObservableSubscription(); + sas.wrap(source.subscribe(new TakeLastTimedObserver(t1, sas, count, ageMillis, scheduler))); + return sas; } } /** Observes source values and keeps the most recent items. */ diff --git a/rxjava-core/src/test/java/rx/operators/OperationSkipLastTest.java b/rxjava-core/src/test/java/rx/operators/OperationSkipLastTest.java index 9391424e07..27e1b1e126 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSkipLastTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSkipLastTest.java @@ -15,7 +15,7 @@ */ package rx.operators; -import static org.mockito.Matchers.*; +import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.*; import static rx.operators.OperationSkipLast.*; @@ -24,6 +24,9 @@ import rx.Observable; import rx.Observer; +import rx.concurrency.TestScheduler; +import rx.operators.OperationSkipTest.CustomException; +import rx.subjects.PublishSubject; public class OperationSkipLastTest { @@ -111,4 +114,96 @@ public void testSkipLastWithNegativeCount() { any(IndexOutOfBoundsException.class)); verify(aObserver, never()).onCompleted(); } + + @Test + public void testSkipLastTimed() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.skipLast(1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS); + + source.onNext(4); + source.onNext(5); + source.onNext(6); + + scheduler.advanceTimeBy(950, TimeUnit.MILLISECONDS); + source.onCompleted(); + + InOrder inOrder = inOrder(o); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(2); + inOrder.verify(o).onNext(3); + inOrder.verify(o, never()).onNext(4); + inOrder.verify(o, never()).onNext(5); + inOrder.verify(o, never()).onNext(6); + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testSkipLastTimedErrorBeforeTime() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.skipLast(1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + source.onError(new OperationSkipTest.CustomException()); + + scheduler.advanceTimeBy(1050, TimeUnit.MILLISECONDS); + + verify(o).onError(any(CustomException.class)); + + verify(o, never()).onCompleted(); + verify(o, never()).onNext(any()); + } + + @Test + public void testSkipLastTimedCompleteBeforeTime() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.skipLast(1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS); + + source.onCompleted(); + + + InOrder inOrder = inOrder(o); + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + verify(o, never()).onNext(any()); + verify(o, never()).onError(any(Throwable.class)); + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationSkipTest.java b/rxjava-core/src/test/java/rx/operators/OperationSkipTest.java index 16bc76820f..44f08d2402 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSkipTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSkipTest.java @@ -15,14 +15,17 @@ */ package rx.operators; -import static org.mockito.Matchers.*; +import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.*; import static rx.operators.OperationSkip.*; import org.junit.Test; +import org.mockito.InOrder; import rx.Observable; import rx.Observer; +import rx.concurrency.TestScheduler; +import rx.subjects.PublishSubject; public class OperationSkipTest { @@ -55,4 +58,131 @@ public void testSkip2() { verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } + + @Test + public void testSkipTimed() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.skip(1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + source.onNext(4); + source.onNext(5); + source.onNext(6); + + source.onCompleted(); + + InOrder inOrder = inOrder(o); + + inOrder.verify(o, never()).onNext(1); + inOrder.verify(o, never()).onNext(2); + inOrder.verify(o, never()).onNext(3); + inOrder.verify(o).onNext(4); + inOrder.verify(o).onNext(5); + inOrder.verify(o).onNext(6); + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onError(any(Throwable.class)); + } + @Test + public void testSkipTimedFinishBeforeTime() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.skip(1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + source.onCompleted(); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + InOrder inOrder = inOrder(o); + + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(any()); + verify(o, never()).onError(any(Throwable.class)); + } + static class CustomException extends RuntimeException { } + @Test + public void testSkipTimedErrorBeforeTime() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.skip(1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + source.onError(new CustomException()); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + InOrder inOrder = inOrder(o); + + inOrder.verify(o).onError(any(CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onNext(any()); + verify(o, never()).onCompleted(); + } + @Test + public void testSkipTimedErrorAfterTime() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.skip(1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + source.onNext(4); + source.onNext(5); + source.onNext(6); + + source.onError(new CustomException()); + + InOrder inOrder = inOrder(o); + + inOrder.verify(o, never()).onNext(1); + inOrder.verify(o, never()).onNext(2); + inOrder.verify(o, never()).onNext(3); + inOrder.verify(o).onNext(4); + inOrder.verify(o).onNext(5); + inOrder.verify(o).onNext(6); + inOrder.verify(o).onError(any(CustomException.class)); + inOrder.verifyNoMoreInteractions(); + verify(o, never()).onCompleted(); + + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationTakeTest.java b/rxjava-core/src/test/java/rx/operators/OperationTakeTest.java index 773154eda0..988ffa062e 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationTakeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationTakeTest.java @@ -15,8 +15,8 @@ */ package rx.operators; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import static rx.operators.OperationTake.*; @@ -28,6 +28,9 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.operators.OperationSkipTest.CustomException; +import rx.schedulers.TestScheduler; +import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; import rx.util.functions.Func1; @@ -224,4 +227,99 @@ public void run() { return s; } } + + @Test + public void testTakeTimed() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.take(1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + source.onNext(4); + + InOrder inOrder = inOrder(o); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(2); + inOrder.verify(o).onNext(3); + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + verify(o, never()).onNext(4); + verify(o, never()).onError(any(Throwable.class)); + } + + @Test + public void testTakeTimedErrorBeforeTime() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.take(1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + source.onError(new CustomException()); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + source.onNext(4); + + InOrder inOrder = inOrder(o); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(2); + inOrder.verify(o).onNext(3); + inOrder.verify(o).onError(any(CustomException.class)); + inOrder.verifyNoMoreInteractions(); + + verify(o, never()).onCompleted(); + verify(o, never()).onNext(4); + } + + @Test + public void testTakeTimedErrorAfterTime() { + TestScheduler scheduler = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + Observable result = source.take(1, TimeUnit.SECONDS, scheduler); + + Observer o = mock(Observer.class); + + result.subscribe(o); + + source.onNext(1); + source.onNext(2); + source.onNext(3); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + source.onNext(4); + source.onError(new CustomException()); + + InOrder inOrder = inOrder(o); + inOrder.verify(o).onNext(1); + inOrder.verify(o).onNext(2); + inOrder.verify(o).onNext(3); + inOrder.verify(o).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + verify(o, never()).onNext(4); + verify(o, never()).onError(any(CustomException.class)); + } }