Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make interval work with multiple subscribers #379

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 132 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperationInterval.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.concurrency.TestScheduler;
import rx.observables.ConnectableObservable;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;

Expand All @@ -45,14 +46,20 @@ public final class OperationInterval {
* Creates an event each time interval.
*/
public static OnSubscribeFunc<Long> interval(long interval, TimeUnit unit) {
return new Interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor()));
return interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor()));
}

/**
* Creates an event each time interval.
*/
public static OnSubscribeFunc<Long> interval(long interval, TimeUnit unit, Scheduler scheduler) {
return new Interval(interval, unit, scheduler);
public static OnSubscribeFunc<Long> interval(final long interval, final TimeUnit unit, final Scheduler scheduler) {
// wrapped in order to work with multiple subscribers
return new OnSubscribeFunc<Long>() {
@Override
public Subscription onSubscribe(Observer<? super Long> observer) {
return new Interval(interval, unit, scheduler).onSubscribe(observer);
}
};
}

private static class Interval implements OnSubscribeFunc<Long> {
Expand Down Expand Up @@ -91,12 +98,14 @@ public void call() {
public static class UnitTest {
private TestScheduler scheduler;
private Observer<Long> observer;
private Observer<Long> observer2;

@Before
@SuppressWarnings("unchecked") // due to mocking
public void before() {
scheduler = new TestScheduler();
observer = mock(Observer.class);
observer2 = mock(Observer.class);
}

@Test
Expand All @@ -123,5 +132,125 @@ public void testInterval() {
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}

@Test
public void testWithMultipleSubscribersStartingAtSameTime() {
Observable<Long> w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler));
Subscription sub1 = w.subscribe(observer);
Subscription sub2 = w.subscribe(observer2);

verify(observer, never()).onNext(anyLong());
verify(observer2, never()).onNext(anyLong());

scheduler.advanceTimeTo(2, TimeUnit.SECONDS);

InOrder inOrder1 = inOrder(observer);
InOrder inOrder2 = inOrder(observer2);

inOrder1.verify(observer, times(1)).onNext(0L);
inOrder1.verify(observer, times(1)).onNext(1L);
inOrder1.verify(observer, never()).onNext(2L);
verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Throwable.class));

inOrder2.verify(observer2, times(1)).onNext(0L);
inOrder2.verify(observer2, times(1)).onNext(1L);
inOrder2.verify(observer2, never()).onNext(2L);
verify(observer2, never()).onCompleted();
verify(observer2, never()).onError(any(Throwable.class));

sub1.unsubscribe();
sub2.unsubscribe();
scheduler.advanceTimeTo(4, TimeUnit.SECONDS);

verify(observer, never()).onNext(2L);
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));

verify(observer2, never()).onNext(2L);
verify(observer2, times(1)).onCompleted();
verify(observer2, never()).onError(any(Throwable.class));
}

@Test
public void testWithMultipleStaggeredSubscribers() {
Observable<Long> w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler));
Subscription sub1 = w.subscribe(observer);

verify(observer, never()).onNext(anyLong());

scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
Subscription sub2 = w.subscribe(observer2);

InOrder inOrder1 = inOrder(observer);
inOrder1.verify(observer, times(1)).onNext(0L);
inOrder1.verify(observer, times(1)).onNext(1L);
inOrder1.verify(observer, never()).onNext(2L);

verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
verify(observer2, never()).onNext(anyLong());

scheduler.advanceTimeTo(4, TimeUnit.SECONDS);

inOrder1.verify(observer, times(1)).onNext(2L);
inOrder1.verify(observer, times(1)).onNext(3L);

InOrder inOrder2 = inOrder(observer2);
inOrder2.verify(observer2, times(1)).onNext(0L);
inOrder2.verify(observer2, times(1)).onNext(1L);

sub1.unsubscribe();
sub2.unsubscribe();

inOrder1.verify(observer, never()).onNext(anyLong());
inOrder1.verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));

inOrder2.verify(observer2, never()).onNext(anyLong());
inOrder2.verify(observer2, times(1)).onCompleted();
verify(observer2, never()).onError(any(Throwable.class));
}

@Test
public void testWithMultipleStaggeredSubscribersAndPublish() {
ConnectableObservable<Long> w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)).publish();
Subscription sub1 = w.subscribe(observer);
w.connect();

verify(observer, never()).onNext(anyLong());

scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
Subscription sub2 = w.subscribe(observer2);

InOrder inOrder1 = inOrder(observer);
inOrder1.verify(observer, times(1)).onNext(0L);
inOrder1.verify(observer, times(1)).onNext(1L);
inOrder1.verify(observer, never()).onNext(2L);

verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
verify(observer2, never()).onNext(anyLong());

scheduler.advanceTimeTo(4, TimeUnit.SECONDS);

inOrder1.verify(observer, times(1)).onNext(2L);
inOrder1.verify(observer, times(1)).onNext(3L);

InOrder inOrder2 = inOrder(observer2);
inOrder2.verify(observer2, times(1)).onNext(2L);
inOrder2.verify(observer2, times(1)).onNext(3L);

sub1.unsubscribe();
sub2.unsubscribe();

inOrder1.verify(observer, never()).onNext(anyLong());
inOrder1.verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Throwable.class));

inOrder2.verify(observer2, never()).onNext(anyLong());
inOrder2.verify(observer2, never()).onCompleted();
verify(observer2, never()).onError(any(Throwable.class));
}
}
}