From d6014ec7ba5b4132b3ce8be91ed47f379d843453 Mon Sep 17 00:00:00 2001 From: Anton Rutkevich Date: Sun, 21 Aug 2016 15:34:43 +0300 Subject: [PATCH] 1.x: Fix multiple values produced by throttleFirst with TestScheduler When throttleFirst was operating on a TestScheduler, it delivered all items passed to it untill TestScheduler's time would change to a non-zero value. --- .../operators/OperatorThrottleFirst.java | 4 +- .../operators/OperatorThrottleFirstTest.java | 42 +++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorThrottleFirst.java b/src/main/java/rx/internal/operators/OperatorThrottleFirst.java index b7a368578b..d41f3262eb 100644 --- a/src/main/java/rx/internal/operators/OperatorThrottleFirst.java +++ b/src/main/java/rx/internal/operators/OperatorThrottleFirst.java @@ -38,7 +38,7 @@ public OperatorThrottleFirst(long windowDuration, TimeUnit unit, Scheduler sched public Subscriber call(final Subscriber subscriber) { return new Subscriber(subscriber) { - private long lastOnNext; + private long lastOnNext = -1; @Override public void onStart() { @@ -48,7 +48,7 @@ public void onStart() { @Override public void onNext(T v) { long now = scheduler.now(); - if (lastOnNext == 0 || now - lastOnNext >= timeInMilliseconds) { + if (lastOnNext == -1 || now - lastOnNext >= timeInMilliseconds) { lastOnNext = now; subscriber.onNext(v); } diff --git a/src/test/java/rx/internal/operators/OperatorThrottleFirstTest.java b/src/test/java/rx/internal/operators/OperatorThrottleFirstTest.java index e2f7374753..7bab85cd1d 100644 --- a/src/test/java/rx/internal/operators/OperatorThrottleFirstTest.java +++ b/src/test/java/rx/internal/operators/OperatorThrottleFirstTest.java @@ -19,6 +19,8 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import java.util.concurrent.TimeUnit; @@ -172,4 +174,44 @@ public void timed() { ts.assertNoErrors(); ts.assertCompleted(); } + + @Test + public void throttleWithoutAdvancingTimeOfTestScheduler() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + TestScheduler s = new TestScheduler(); + PublishSubject o = PublishSubject.create(); + o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer); + + // send events without calling advanceTimeBy/To + o.onNext(1); // deliver + o.onNext(2); // skip + o.onNext(3); // skip + o.onCompleted(); + + verify(observer).onNext(1); + verify(observer).onCompleted(); + verifyNoMoreInteractions(observer); + } + + @Test + public void throttleWithTestSchedulerTimeOfZero() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + TestScheduler s = new TestScheduler(); + PublishSubject o = PublishSubject.create(); + o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer); + + s.advanceTimeBy(0, TimeUnit.MILLISECONDS); + + // send events while TestScheduler's time is 0 + o.onNext(1); // deliver + o.onNext(2); // skip + o.onNext(3); // skip + o.onCompleted(); + + verify(observer).onNext(1); + verify(observer).onCompleted(); + verifyNoMoreInteractions(observer); + } }