diff --git a/src/main/java/rx/internal/operators/OperatorThrottleFirst.java b/src/main/java/rx/internal/operators/OperatorThrottleFirst.java index b7a368578b..d41f3262eb 100644 --- a/src/main/java/rx/internal/operators/OperatorThrottleFirst.java +++ b/src/main/java/rx/internal/operators/OperatorThrottleFirst.java @@ -38,7 +38,7 @@ public OperatorThrottleFirst(long windowDuration, TimeUnit unit, Scheduler sched public Subscriber call(final Subscriber subscriber) { return new Subscriber(subscriber) { - private long lastOnNext; + private long lastOnNext = -1; @Override public void onStart() { @@ -48,7 +48,7 @@ public void onStart() { @Override public void onNext(T v) { long now = scheduler.now(); - if (lastOnNext == 0 || now - lastOnNext >= timeInMilliseconds) { + if (lastOnNext == -1 || now - lastOnNext >= timeInMilliseconds) { lastOnNext = now; subscriber.onNext(v); } diff --git a/src/test/java/rx/internal/operators/OperatorThrottleFirstTest.java b/src/test/java/rx/internal/operators/OperatorThrottleFirstTest.java index e2f7374753..7bab85cd1d 100644 --- a/src/test/java/rx/internal/operators/OperatorThrottleFirstTest.java +++ b/src/test/java/rx/internal/operators/OperatorThrottleFirstTest.java @@ -19,6 +19,8 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import java.util.concurrent.TimeUnit; @@ -172,4 +174,44 @@ public void timed() { ts.assertNoErrors(); ts.assertCompleted(); } + + @Test + public void throttleWithoutAdvancingTimeOfTestScheduler() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + TestScheduler s = new TestScheduler(); + PublishSubject o = PublishSubject.create(); + o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer); + + // send events without calling advanceTimeBy/To + o.onNext(1); // deliver + o.onNext(2); // skip + o.onNext(3); // skip + o.onCompleted(); + + verify(observer).onNext(1); + verify(observer).onCompleted(); + verifyNoMoreInteractions(observer); + } + + @Test + public void throttleWithTestSchedulerTimeOfZero() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + TestScheduler s = new TestScheduler(); + PublishSubject o = PublishSubject.create(); + o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer); + + s.advanceTimeBy(0, TimeUnit.MILLISECONDS); + + // send events while TestScheduler's time is 0 + o.onNext(1); // deliver + o.onNext(2); // skip + o.onNext(3); // skip + o.onCompleted(); + + verify(observer).onNext(1); + verify(observer).onCompleted(); + verifyNoMoreInteractions(observer); + } }