From 39d49fcf3c7c1716c6d0d0c048598aa131899a1d Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 12 Dec 2013 23:23:38 -0800 Subject: [PATCH] ObserveOn Fixes - fix subscription leak (Composite+MultipleAssignment instead of just Composite) - add unit tests --- .../java/rx/operators/OperationObserveOn.java | 78 ++++++++------- .../src/test/java/rx/ErrorHandlingTests.java | 98 +++++++++++++++++++ .../rx/operators/OperationObserveOnTest.java | 64 ++++++------ 3 files changed, 174 insertions(+), 66 deletions(-) create mode 100644 rxjava-core/src/test/java/rx/ErrorHandlingTests.java diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index fed1cf4bb0..2f569ea75f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -27,7 +27,7 @@ import rx.schedulers.CurrentThreadScheduler; import rx.schedulers.ImmediateScheduler; import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; +import rx.subscriptions.MultipleAssignmentSubscription; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func2; @@ -64,62 +64,66 @@ public Subscription onSubscribe(final Observer observer) { return new Observation(observer).init(); } } + /** Observe through individual queue per observer. */ - private class Observation implements Action1> { + private class Observation { final Observer observer; - final CompositeSubscription s; - final ConcurrentLinkedQueue> queue; - final AtomicInteger counter; + final CompositeSubscription compositeSubscription = new CompositeSubscription(); + final MultipleAssignmentSubscription recursiveSubscription = new MultipleAssignmentSubscription(); + final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); + final AtomicInteger counter = new AtomicInteger(0); private volatile Scheduler recursiveScheduler; + public Observation(Observer observer) { this.observer = observer; - this.queue = new ConcurrentLinkedQueue>(); - this.counter = new AtomicInteger(0); - this.s = new CompositeSubscription(); } + public Subscription init() { - s.add(source.materialize().subscribe(this)); - return s; + compositeSubscription.add(source.materialize().subscribe(new SourceObserver())); + return compositeSubscription; } - @Override - public void call(Notification e) { - queue.offer(e); - if (counter.getAndIncrement() == 0) { - if (recursiveScheduler == null) { - s.add(scheduler.schedule(null, new Func2() { + private class SourceObserver implements Action1> { + + @Override + public void call(Notification e) { + queue.offer(e); + if (counter.getAndIncrement() == 0) { + if (recursiveScheduler == null) { + // compositeSubscription for the outer scheduler, recursive for inner + compositeSubscription.add(scheduler.schedule(null, new Func2() { @Override public Subscription call(Scheduler innerScheduler, T state) { // record innerScheduler so 'processQueue' can use it for all subsequent executions recursiveScheduler = innerScheduler; - + // once we have the innerScheduler we can start doing real work processQueue(); - - return Subscriptions.empty(); + return recursiveSubscription; } })); - } else { - processQueue(); + } else { + processQueue(); + } } } - } - void processQueue() { - s.add(recursiveScheduler.schedule(new Action1() { - @Override - public void call(Action0 self) { - Notification not = queue.poll(); - if (not != null) { - not.accept(observer); - } - // decrement count and if we still have work to do - // recursively schedule ourselves to process again - if (counter.decrementAndGet() > 0) { - self.call(); - } + void processQueue() { + recursiveSubscription.setSubscription(recursiveScheduler.schedule(new Action1() { + @Override + public void call(Action0 self) { + Notification not = queue.poll(); + if (not != null) { + not.accept(observer); + } - } - })); + // decrement count and if we still have work to do + // recursively schedule ourselves to process again + if (counter.decrementAndGet() > 0) { + self.call(); + } + } + })); + } } } } diff --git a/rxjava-core/src/test/java/rx/ErrorHandlingTests.java b/rxjava-core/src/test/java/rx/ErrorHandlingTests.java new file mode 100644 index 0000000000..baaddd0af3 --- /dev/null +++ b/rxjava-core/src/test/java/rx/ErrorHandlingTests.java @@ -0,0 +1,98 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx; + +import static org.junit.Assert.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import rx.schedulers.Schedulers; + +public class ErrorHandlingTests { + + /** + * Test that an error from a user provided Observer.onNext is handled and emitted to the onError + */ + @Test + public void testOnNextError() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference caughtError = new AtomicReference(); + Observable o = Observable.interval(50, TimeUnit.MILLISECONDS); + Observer observer = new Observer() { + + @Override + public void onCompleted() { + System.out.println("completed"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println("error: " + e); + caughtError.set(e); + latch.countDown(); + } + + @Override + public void onNext(Long args) { + throw new RuntimeException("forced failure"); + } + }; + o.subscribe(observer); + + latch.await(2000, TimeUnit.MILLISECONDS); + assertNotNull(caughtError.get()); + } + + /** + * Test that an error from a user provided Observer.onNext is handled and emitted to the onError + * even when done across thread boundaries with observeOn + */ + @Test + public void testOnNextErrorAcrossThread() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference caughtError = new AtomicReference(); + Observable o = Observable.interval(50, TimeUnit.MILLISECONDS); + Observer observer = new Observer() { + + @Override + public void onCompleted() { + System.out.println("completed"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println("error: " + e); + caughtError.set(e); + latch.countDown(); + } + + @Override + public void onNext(Long args) { + throw new RuntimeException("forced failure"); + } + }; + o.observeOn(Schedulers.newThread()).subscribe(observer); + + latch.await(2000, TimeUnit.MILLISECONDS); + assertNotNull(caughtError.get()); + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java index 69fa6b1718..554b583247 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java @@ -32,6 +32,7 @@ import rx.Observer; import rx.schedulers.Schedulers; import rx.schedulers.TestScheduler; +import rx.util.functions.Action0; import rx.util.functions.Action1; public class OperationObserveOnTest { @@ -88,55 +89,59 @@ public Void answer(InvocationOnMock invocation) throws Throwable { @Test @SuppressWarnings("unchecked") public void testThreadName() throws InterruptedException { + System.out.println("Main Thread: " + Thread.currentThread().getName()); Observable obs = Observable.from("one", null, "two", "three", "four"); Observer observer = mock(Observer.class); - - InOrder inOrder = inOrder(observer); + final String parentThreadName = Thread.currentThread().getName(); final CountDownLatch completedLatch = new CountDownLatch(1); - doAnswer(new Answer() { - - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - completedLatch.countDown(); - return null; - } - }).when(observer).onCompleted(); - - doAnswer(new Answer() { + // assert subscribe is on main thread + obs = obs.doOnEach(new Action1() { @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - completedLatch.countDown(); - - return null; + public void call(String s) { + String threadName = Thread.currentThread().getName(); + System.out.println("Source ThreadName: " + threadName + " Expected => " + parentThreadName); + assertEquals(parentThreadName, threadName); } - }).when(observer).onError(any(Exception.class)); + }); + + // assert observe is on new thread obs.observeOn(Schedulers.newThread()).doOnEach(new Action1() { @Override public void call(String t1) { String threadName = Thread.currentThread().getName(); boolean correctThreadName = threadName.startsWith("RxNewThreadScheduler"); - System.out.println("ThreadName: " + threadName + " Correct => " + correctThreadName); + System.out.println("ObserveOn ThreadName: " + threadName + " Correct => " + correctThreadName); assertTrue(correctThreadName); } + }).finallyDo(new Action0() { + + @Override + public void call() { + completedLatch.countDown(); + + } }).subscribe(observer); if (!completedLatch.await(1000, TimeUnit.MILLISECONDS)) { fail("timed out waiting"); } - inOrder.verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(5)).onNext(any(String.class)); + verify(observer, times(1)).onCompleted(); } + @Test public void observeOnTheSameSchedulerTwice() { TestScheduler scheduler = new TestScheduler(); - + Observable o = Observable.from(1, 2, 3); Observable o2 = o.observeOn(scheduler); @@ -144,15 +149,15 @@ public void observeOnTheSameSchedulerTwice() { Observer observer1 = mock(Observer.class); @SuppressWarnings("unchecked") Observer observer2 = mock(Observer.class); - + InOrder inOrder1 = inOrder(observer1); InOrder inOrder2 = inOrder(observer2); - + o2.subscribe(observer1); o2.subscribe(observer2); - + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); - + inOrder1.verify(observer1, times(1)).onNext(1); inOrder1.verify(observer1, times(1)).onNext(2); inOrder1.verify(observer1, times(1)).onNext(3); @@ -168,11 +173,12 @@ public void observeOnTheSameSchedulerTwice() { inOrder2.verifyNoMoreInteractions(); } + @Test public void observeSameOnMultipleSchedulers() { TestScheduler scheduler1 = new TestScheduler(); TestScheduler scheduler2 = new TestScheduler(); - + Observable o = Observable.from(1, 2, 3); Observable o1 = o.observeOn(scheduler1); Observable o2 = o.observeOn(scheduler2); @@ -181,16 +187,16 @@ public void observeSameOnMultipleSchedulers() { Observer observer1 = mock(Observer.class); @SuppressWarnings("unchecked") Observer observer2 = mock(Observer.class); - + InOrder inOrder1 = inOrder(observer1); InOrder inOrder2 = inOrder(observer2); - + o1.subscribe(observer1); o2.subscribe(observer2); - + scheduler1.advanceTimeBy(1, TimeUnit.SECONDS); scheduler2.advanceTimeBy(1, TimeUnit.SECONDS); - + inOrder1.verify(observer1, times(1)).onNext(1); inOrder1.verify(observer1, times(1)).onNext(2); inOrder1.verify(observer1, times(1)).onNext(3);