Skip to content

Commit

Permalink
1.x: Fix multiple values produced by throttleFirst with TestScheduler
Browse files Browse the repository at this point in the history
When throttleFirst was operating on a TestScheduler, it delivered all items passed to it untill TestScheduler's time would change to a non-zero value.
  • Loading branch information
tony-root committed Aug 21, 2016
1 parent 27c782d commit d6014ec
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public OperatorThrottleFirst(long windowDuration, TimeUnit unit, Scheduler sched
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
return new Subscriber<T>(subscriber) {

private long lastOnNext;
private long lastOnNext = -1;

@Override
public void onStart() {
Expand All @@ -48,7 +48,7 @@ public void onStart() {
@Override
public void onNext(T v) {
long now = scheduler.now();
if (lastOnNext == 0 || now - lastOnNext >= timeInMilliseconds) {
if (lastOnNext == -1 || now - lastOnNext >= timeInMilliseconds) {
lastOnNext = now;
subscriber.onNext(v);
}
Expand Down
42 changes: 42 additions & 0 deletions src/test/java/rx/internal/operators/OperatorThrottleFirstTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -172,4 +174,44 @@ public void timed() {
ts.assertNoErrors();
ts.assertCompleted();
}

@Test
public void throttleWithoutAdvancingTimeOfTestScheduler() {
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
TestScheduler s = new TestScheduler();
PublishSubject<Integer> o = PublishSubject.create();
o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer);

// send events without calling advanceTimeBy/To
o.onNext(1); // deliver
o.onNext(2); // skip
o.onNext(3); // skip
o.onCompleted();

verify(observer).onNext(1);
verify(observer).onCompleted();
verifyNoMoreInteractions(observer);
}

@Test
public void throttleWithTestSchedulerTimeOfZero() {
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
TestScheduler s = new TestScheduler();
PublishSubject<Integer> o = PublishSubject.create();
o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer);

s.advanceTimeBy(0, TimeUnit.MILLISECONDS);

// send events while TestScheduler's time is 0
o.onNext(1); // deliver
o.onNext(2); // skip
o.onNext(3); // skip
o.onCompleted();

verify(observer).onNext(1);
verify(observer).onCompleted();
verifyNoMoreInteractions(observer);
}
}

0 comments on commit d6014ec

Please sign in to comment.