diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 9482a5f09f6..7c12d21db35 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -4463,7 +4463,22 @@ public Observable sample(long period, TimeUnit unit) { public Observable sample(long period, TimeUnit unit, Scheduler scheduler) { return create(OperationSample.sample(this, period, unit, scheduler)); } - + + /** + * Return an Observable that emits the results of sampling the items + * emitted by this Observable when the sampler + * Observable produces an item or completes. + * + * @param sampler the Observable to use for sampling this + * + * @return an Observable that emits the results of sampling the items + * emitted by this Observable when the sampler + * Observable produces an item or completes. + */ + public Observable sample(Observable sampler) { + return create(new OperationSample.SampleWithObservable(this, sampler)); + } + /** * Returns an Observable that applies a function of your choosing to the * first item emitted by a source Observable, then feeds the result of that diff --git a/rxjava-core/src/main/java/rx/operators/OperationSample.java b/rxjava-core/src/main/java/rx/operators/OperationSample.java index 89da00e5fbc..f5f8f96e3f6 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSample.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSample.java @@ -25,6 +25,8 @@ import rx.Scheduler; import rx.Subscription; import rx.concurrency.Schedulers; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.SerialSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; @@ -115,4 +117,91 @@ public void call() { }); } } + /** + * Sample with the help of another observable. + * @see MSDN: Observable.Sample + */ + public static class SampleWithObservable implements OnSubscribeFunc { + final Observable source; + final Observable sampler; + public SampleWithObservable(Observable source, Observable sampler) { + this.source = source; + this.sampler = sampler; + } + @Override + public Subscription onSubscribe(Observer t1) { + return new ResultManager(t1).init(); + } + /** Observe source values. */ + class ResultManager implements Observer { + final Observer observer; + final CompositeSubscription cancel; + T value; + boolean valueTaken = true; + boolean done; + final Object guard; + public ResultManager(Observer observer) { + this.observer = observer; + cancel = new CompositeSubscription(); + guard = new Object(); + } + public Subscription init() { + cancel.add(source.subscribe(this)); + cancel.add(sampler.subscribe(new Sampler())); + + return cancel; + } + @Override + public void onNext(T args) { + synchronized (guard) { + valueTaken = false; + value = args; + } + } + + @Override + public void onError(Throwable e) { + synchronized (guard) { + if (!done) { + done = true; + observer.onError(e); + cancel.unsubscribe(); + } + } + } + + @Override + public void onCompleted() { + synchronized (guard) { + if (!done) { + done = true; + observer.onCompleted(); + cancel.unsubscribe(); + } + } + } + /** Take the latest value, but only once. */ + class Sampler implements Observer { + @Override + public void onNext(U args) { + synchronized (guard) { + if (!valueTaken && !done) { + valueTaken = true; + observer.onNext(value); + } + } + } + + @Override + public void onError(Throwable e) { + ResultManager.this.onError(e); + } + + @Override + public void onCompleted() { + ResultManager.this.onCompleted(); + } + } + } + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationSampleTest.java b/rxjava-core/src/test/java/rx/operators/OperationSampleTest.java index d868697f9ff..18316889c13 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSampleTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSampleTest.java @@ -15,7 +15,6 @@ */ package rx.operators; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.util.concurrent.TimeUnit; @@ -28,12 +27,14 @@ import rx.Observer; import rx.Subscription; import rx.concurrency.TestScheduler; +import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; public class OperationSampleTest { private TestScheduler scheduler; private Observer observer; + private Observer observer2; @Before @SuppressWarnings("unchecked") @@ -41,6 +42,7 @@ public class OperationSampleTest { public void before() { scheduler = new TestScheduler(); observer = mock(Observer.class); + observer2 = mock(Observer.class); } @Test @@ -105,4 +107,159 @@ public void call() { verify(observer, times(1)).onCompleted(); verify(observer, never()).onError(any(Throwable.class)); } + @Test + public void sampleWithSamplerNormal() { + PublishSubject source = PublishSubject.create(); + PublishSubject sampler = PublishSubject.create(); + + Observable m = source.sample(sampler); + m.subscribe(observer2); + + source.onNext(1); + source.onNext(2); + sampler.onNext(1); + source.onNext(3); + source.onNext(4); + sampler.onNext(2); + source.onCompleted(); + sampler.onNext(3); + + + InOrder inOrder = inOrder(observer2); + inOrder.verify(observer2, never()).onNext(1); + inOrder.verify(observer2, times(1)).onNext(2); + inOrder.verify(observer2, never()).onNext(3); + inOrder.verify(observer2, times(1)).onNext(4); + inOrder.verify(observer2, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + @Test + public void sampleWithSamplerNoDuplicates() { + PublishSubject source = PublishSubject.create(); + PublishSubject sampler = PublishSubject.create(); + + Observable m = source.sample(sampler); + m.subscribe(observer2); + + source.onNext(1); + source.onNext(2); + sampler.onNext(1); + sampler.onNext(1); + + source.onNext(3); + source.onNext(4); + sampler.onNext(2); + sampler.onNext(2); + + source.onCompleted(); + sampler.onNext(3); + + + InOrder inOrder = inOrder(observer2); + inOrder.verify(observer2, never()).onNext(1); + inOrder.verify(observer2, times(1)).onNext(2); + inOrder.verify(observer2, never()).onNext(3); + inOrder.verify(observer2, times(1)).onNext(4); + inOrder.verify(observer2, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + @Test + public void sampleWithSamplerTerminatingEarly() { + PublishSubject source = PublishSubject.create(); + PublishSubject sampler = PublishSubject.create(); + + Observable m = source.sample(sampler); + m.subscribe(observer2); + + source.onNext(1); + source.onNext(2); + sampler.onNext(1); + sampler.onCompleted(); + + source.onNext(3); + source.onNext(4); + + + + InOrder inOrder = inOrder(observer2); + inOrder.verify(observer2, never()).onNext(1); + inOrder.verify(observer2, times(1)).onNext(2); + inOrder.verify(observer2, times(1)).onCompleted(); + inOrder.verify(observer2, never()).onNext(any()); + verify(observer, never()).onError(any(Throwable.class)); + } + @Test + public void sampleWithSamplerEmitAndTerminate() { + PublishSubject source = PublishSubject.create(); + PublishSubject sampler = PublishSubject.create(); + + Observable m = source.sample(sampler); + m.subscribe(observer2); + + source.onNext(1); + source.onNext(2); + sampler.onNext(1); + source.onNext(3); + source.onCompleted(); + sampler.onNext(2); + sampler.onCompleted(); + + InOrder inOrder = inOrder(observer2); + inOrder.verify(observer2, never()).onNext(1); + inOrder.verify(observer2, times(1)).onNext(2); + inOrder.verify(observer2, never()).onNext(3); + inOrder.verify(observer2, times(1)).onCompleted(); + inOrder.verify(observer2, never()).onNext(any()); + verify(observer, never()).onError(any(Throwable.class)); + } + @Test + public void sampleWithSamplerEmptySource() { + PublishSubject source = PublishSubject.create(); + PublishSubject sampler = PublishSubject.create(); + + Observable m = source.sample(sampler); + m.subscribe(observer2); + + source.onCompleted(); + sampler.onNext(1); + + InOrder inOrder = inOrder(observer2); + inOrder.verify(observer2, times(1)).onCompleted(); + verify(observer2, never()).onNext(any()); + verify(observer, never()).onError(any(Throwable.class)); + } + @Test + public void sampleWithSamplerSourceThrows() { + PublishSubject source = PublishSubject.create(); + PublishSubject sampler = PublishSubject.create(); + + Observable m = source.sample(sampler); + m.subscribe(observer2); + + source.onNext(1); + source.onError(new RuntimeException("Forced failure!")); + sampler.onNext(1); + + InOrder inOrder = inOrder(observer2); + inOrder.verify(observer2, times(1)).onError(any(Throwable.class)); + verify(observer2, never()).onNext(any()); + verify(observer, never()).onCompleted(); + } + @Test + public void sampleWithSamplerThrows() { + PublishSubject source = PublishSubject.create(); + PublishSubject sampler = PublishSubject.create(); + + Observable m = source.sample(sampler); + m.subscribe(observer2); + + source.onNext(1); + sampler.onNext(1); + sampler.onError(new RuntimeException("Forced failure!")); + + InOrder inOrder = inOrder(observer2); + inOrder.verify(observer2, times(1)).onNext(1); + inOrder.verify(observer2, times(1)).onError(any(RuntimeException.class)); + verify(observer, never()).onCompleted(); + } }