Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ImmediateScheduler optimization for toObservableIterable #727

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.schedulers.ImmediateScheduler;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
Expand All @@ -37,16 +38,20 @@
public final class OperationToObservableIterable<T> {

public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
return new ToObservableIterable<T>(list, scheduler);
if (scheduler instanceof ImmediateScheduler) {
return new ToObservableIterable<T>(list);
} else {
return new ToObservableIterableScheduled<T>(list, scheduler);
}
}

public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list) {
return toObservableIterable(list, Schedulers.immediate());
return new ToObservableIterable<T>(list);
}

private static class ToObservableIterable<T> implements OnSubscribeFunc<T> {
private static class ToObservableIterableScheduled<T> implements OnSubscribeFunc<T> {

public ToObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
public ToObservableIterableScheduled(Iterable<? extends T> list, Scheduler scheduler) {
this.iterable = list;
this.scheduler = scheduler;
}
Expand Down Expand Up @@ -74,4 +79,25 @@ public void call(Action0 self) {
});
}
}

private static class ToObservableIterable<T> implements OnSubscribeFunc<T> {

public ToObservableIterable(Iterable<? extends T> list) {
this.iterable = list;
}

final Iterable<? extends T> iterable;

public Subscription onSubscribe(final Observer<? super T> observer) {
try {
for (T t : iterable) {
observer.onNext(t);
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return Subscriptions.empty();
}
}
}
133 changes: 0 additions & 133 deletions rxjava-core/src/test/java/rx/ObserveOnTests.java

This file was deleted.

104 changes: 104 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.mockito.InOrder;
Expand All @@ -34,6 +35,7 @@
import rx.schedulers.TestScheduler;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class OperationObserveOnTest {

Expand Down Expand Up @@ -210,6 +212,108 @@ public void observeSameOnMultipleSchedulers() {
inOrder2.verify(observer2, times(1)).onCompleted();
verify(observer2, never()).onError(any(Throwable.class));
inOrder2.verifyNoMoreInteractions();
}

/**
* Confirm that running on a NewThreadScheduler uses the same thread for the entire stream
*/
@Test
public void testObserveOnWithNewThreadScheduler() {
final AtomicInteger count = new AtomicInteger();
final int _multiple = 99;

Observable.range(1, 100000).map(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer t1) {
return t1 * _multiple;
}

}).observeOn(Schedulers.newThread())
.toBlockingObservable().forEach(new Action1<Integer>() {

@Override
public void call(Integer t1) {
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
assertTrue(Thread.currentThread().getName().startsWith("RxNewThreadScheduler"));
}

});
}

/**
* Confirm that running on a ThreadPoolScheduler allows multiple threads but is still ordered.
*/
@Test
public void testObserveOnWithThreadPoolScheduler() {
final AtomicInteger count = new AtomicInteger();
final int _multiple = 99;

Observable.range(1, 100000).map(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer t1) {
return t1 * _multiple;
}

}).observeOn(Schedulers.computation())
.toBlockingObservable().forEach(new Action1<Integer>() {

@Override
public void call(Integer t1) {
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
}

});
}

/**
* Attempts to confirm that when pauses exist between events, the ScheduledObserver
* does not lose or reorder any events since the scheduler will not block, but will
* be re-scheduled when it receives new events after each pause.
*
*
* This is non-deterministic in proving success, but if it ever fails (non-deterministically)
* it is a sign of potential issues as thread-races and scheduling should not affect output.
*/
@Test
public void testObserveOnOrderingConcurrency() {
final AtomicInteger count = new AtomicInteger();
final int _multiple = 99;

Observable.range(1, 10000).map(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer t1) {
if (randomIntFrom0to100() > 98) {
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return t1 * _multiple;
}

}).observeOn(Schedulers.computation())
.toBlockingObservable().forEach(new Action1<Integer>() {

@Override
public void call(Integer t1) {
assertEquals(count.incrementAndGet() * _multiple, t1.intValue());
assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
}

});
}

private static int randomIntFrom0to100() {
// XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml
long x = System.nanoTime();
x ^= (x << 21);
x ^= (x >>> 35);
x ^= (x << 4);
return Math.abs((int) x % 100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import rx.Observable;
import rx.Observer;
import rx.schedulers.Schedulers;

public class OperationToObservableIterableTest {

Expand All @@ -42,4 +43,18 @@ public void testIterable() {
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testIterableScheduled() {
Observable<String> observable = Observable.create(toObservableIterable(Arrays.<String> asList("one", "two", "three"), Schedulers.currentThread()));

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, times(1)).onNext("one");
verify(aObserver, times(1)).onNext("two");
verify(aObserver, times(1)).onNext("three");
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,31 @@ public String call(String s) {
assertTrue(strings.contains("names=>b-2"));
}

/**
* The order of execution is nondeterministic.
* @throws InterruptedException
*/
@SuppressWarnings("rawtypes")
@Test
public final void testSequenceOfActions() throws InterruptedException {
final Scheduler scheduler = getScheduler();

final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(2);
final Action0 first = mock(Action0.class);
final Action0 second = mock(Action0.class);

// make it wait until after the second is called
// make it wait until both the first and second are called
doAnswer(new Answer() {

@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
try {
return invocation.getMock();
} finally {
latch.countDown();
}
}
}).when(first).call();
doAnswer(new Answer() {

@Override
Expand Down
Loading