diff --git a/src/test/java/io/reactivex/processors/AsyncProcessorTest.java b/src/test/java/io/reactivex/processors/AsyncProcessorTest.java index 847d363580..a3d17873e3 100644 --- a/src/test/java/io/reactivex/processors/AsyncProcessorTest.java +++ b/src/test/java/io/reactivex/processors/AsyncProcessorTest.java @@ -46,14 +46,14 @@ protected FlowableProcessor create() { @Test public void testNeverCompleted() { - AsyncProcessor subject = AsyncProcessor.create(); + AsyncProcessor processor = AsyncProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext("one"); - subject.onNext("two"); - subject.onNext("three"); + processor.onNext("one"); + processor.onNext("two"); + processor.onNext("three"); verify(observer, Mockito.never()).onNext(anyString()); verify(observer, Mockito.never()).onError(testException); @@ -62,15 +62,15 @@ public void testNeverCompleted() { @Test public void testCompleted() { - AsyncProcessor subject = AsyncProcessor.create(); + AsyncProcessor processor = AsyncProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext("one"); - subject.onNext("two"); - subject.onNext("three"); - subject.onComplete(); + processor.onNext("one"); + processor.onNext("two"); + processor.onNext("three"); + processor.onComplete(); verify(observer, times(1)).onNext("three"); verify(observer, Mockito.never()).onError(any(Throwable.class)); @@ -80,13 +80,13 @@ public void testCompleted() { @Test @Ignore("Null values not allowed") public void testNull() { - AsyncProcessor subject = AsyncProcessor.create(); + AsyncProcessor processor = AsyncProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext(null); - subject.onComplete(); + processor.onNext(null); + processor.onComplete(); verify(observer, times(1)).onNext(null); verify(observer, Mockito.never()).onError(any(Throwable.class)); @@ -95,16 +95,16 @@ public void testNull() { @Test public void testSubscribeAfterCompleted() { - AsyncProcessor subject = AsyncProcessor.create(); + AsyncProcessor processor = AsyncProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); - subject.onNext("one"); - subject.onNext("two"); - subject.onNext("three"); - subject.onComplete(); + processor.onNext("one"); + processor.onNext("two"); + processor.onNext("three"); + processor.onComplete(); - subject.subscribe(observer); + processor.subscribe(observer); verify(observer, times(1)).onNext("three"); verify(observer, Mockito.never()).onError(any(Throwable.class)); @@ -113,18 +113,18 @@ public void testSubscribeAfterCompleted() { @Test public void testSubscribeAfterError() { - AsyncProcessor subject = AsyncProcessor.create(); + AsyncProcessor processor = AsyncProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); - subject.onNext("one"); - subject.onNext("two"); - subject.onNext("three"); + processor.onNext("one"); + processor.onNext("two"); + processor.onNext("three"); RuntimeException re = new RuntimeException("failed"); - subject.onError(re); + processor.onError(re); - subject.subscribe(observer); + processor.subscribe(observer); verify(observer, times(1)).onError(re); verify(observer, Mockito.never()).onNext(any(String.class)); @@ -133,18 +133,18 @@ public void testSubscribeAfterError() { @Test public void testError() { - AsyncProcessor subject = AsyncProcessor.create(); + AsyncProcessor processor = AsyncProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext("one"); - subject.onNext("two"); - subject.onNext("three"); - subject.onError(testException); - subject.onNext("four"); - subject.onError(new Throwable()); - subject.onComplete(); + processor.onNext("one"); + processor.onNext("two"); + processor.onNext("three"); + processor.onError(testException); + processor.onNext("four"); + processor.onError(new Throwable()); + processor.onComplete(); verify(observer, Mockito.never()).onNext(anyString()); verify(observer, times(1)).onError(testException); @@ -153,14 +153,14 @@ public void testError() { @Test public void testUnsubscribeBeforeCompleted() { - AsyncProcessor subject = AsyncProcessor.create(); + AsyncProcessor processor = AsyncProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); TestSubscriber ts = new TestSubscriber(observer); - subject.subscribe(ts); + processor.subscribe(ts); - subject.onNext("one"); - subject.onNext("two"); + processor.onNext("one"); + processor.onNext("two"); ts.dispose(); @@ -168,8 +168,8 @@ public void testUnsubscribeBeforeCompleted() { verify(observer, Mockito.never()).onError(any(Throwable.class)); verify(observer, Mockito.never()).onComplete(); - subject.onNext("three"); - subject.onComplete(); + processor.onNext("three"); + processor.onComplete(); verify(observer, Mockito.never()).onNext(anyString()); verify(observer, Mockito.never()).onError(any(Throwable.class)); @@ -178,12 +178,12 @@ public void testUnsubscribeBeforeCompleted() { @Test public void testEmptySubjectCompleted() { - AsyncProcessor subject = AsyncProcessor.create(); + AsyncProcessor processor = AsyncProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onComplete(); + processor.onComplete(); InOrder inOrder = inOrder(observer); inOrder.verify(observer, never()).onNext(null); @@ -204,10 +204,10 @@ public void testSubscribeCompletionRaceCondition() { * With the synchronization code in place I can not get this to fail on my laptop. */ for (int i = 0; i < 50; i++) { - final AsyncProcessor subject = AsyncProcessor.create(); + final AsyncProcessor processor = AsyncProcessor.create(); final AtomicReference value1 = new AtomicReference(); - subject.subscribe(new Consumer() { + processor.subscribe(new Consumer() { @Override public void accept(String t1) { @@ -226,15 +226,15 @@ public void accept(String t1) { @Override public void run() { - subject.onNext("value"); - subject.onComplete(); + processor.onNext("value"); + processor.onComplete(); } }); - SubjectSubscriberThread t2 = new SubjectSubscriberThread(subject); - SubjectSubscriberThread t3 = new SubjectSubscriberThread(subject); - SubjectSubscriberThread t4 = new SubjectSubscriberThread(subject); - SubjectSubscriberThread t5 = new SubjectSubscriberThread(subject); + SubjectSubscriberThread t2 = new SubjectSubscriberThread(processor); + SubjectSubscriberThread t3 = new SubjectSubscriberThread(processor); + SubjectSubscriberThread t4 = new SubjectSubscriberThread(processor); + SubjectSubscriberThread t5 = new SubjectSubscriberThread(processor); t2.start(); t3.start(); @@ -262,18 +262,18 @@ public void run() { private static class SubjectSubscriberThread extends Thread { - private final AsyncProcessor subject; + private final AsyncProcessor processor; private final AtomicReference value = new AtomicReference(); - SubjectSubscriberThread(AsyncProcessor subject) { - this.subject = subject; + SubjectSubscriberThread(AsyncProcessor processor) { + this.processor = processor; } @Override public void run() { try { // a timeout exception will happen if we don't get a terminal state - String v = subject.timeout(2000, TimeUnit.MILLISECONDS).blockingSingle(); + String v = processor.timeout(2000, TimeUnit.MILLISECONDS).blockingSingle(); value.set(v); } catch (Exception e) { e.printStackTrace(); diff --git a/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java b/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java index 9cb392adba..1a58aaf004 100644 --- a/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java +++ b/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java @@ -50,14 +50,14 @@ protected FlowableProcessor create() { @Test public void testThatSubscriberReceivesDefaultValueAndSubsequentEvents() { - BehaviorProcessor subject = BehaviorProcessor.createDefault("default"); + BehaviorProcessor processor = BehaviorProcessor.createDefault("default"); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext("one"); - subject.onNext("two"); - subject.onNext("three"); + processor.onNext("one"); + processor.onNext("two"); + processor.onNext("three"); verify(observer, times(1)).onNext("default"); verify(observer, times(1)).onNext("one"); @@ -69,15 +69,15 @@ public void testThatSubscriberReceivesDefaultValueAndSubsequentEvents() { @Test public void testThatSubscriberReceivesLatestAndThenSubsequentEvents() { - BehaviorProcessor subject = BehaviorProcessor.createDefault("default"); + BehaviorProcessor processor = BehaviorProcessor.createDefault("default"); - subject.onNext("one"); + processor.onNext("one"); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext("two"); - subject.onNext("three"); + processor.onNext("two"); + processor.onNext("three"); verify(observer, Mockito.never()).onNext("default"); verify(observer, times(1)).onNext("one"); @@ -89,13 +89,13 @@ public void testThatSubscriberReceivesLatestAndThenSubsequentEvents() { @Test public void testSubscribeThenOnComplete() { - BehaviorProcessor subject = BehaviorProcessor.createDefault("default"); + BehaviorProcessor processor = BehaviorProcessor.createDefault("default"); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext("one"); - subject.onComplete(); + processor.onNext("one"); + processor.onComplete(); verify(observer, times(1)).onNext("default"); verify(observer, times(1)).onNext("one"); @@ -105,12 +105,12 @@ public void testSubscribeThenOnComplete() { @Test public void testSubscribeToCompletedOnlyEmitsOnComplete() { - BehaviorProcessor subject = BehaviorProcessor.createDefault("default"); - subject.onNext("one"); - subject.onComplete(); + BehaviorProcessor processor = BehaviorProcessor.createDefault("default"); + processor.onNext("one"); + processor.onComplete(); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); verify(observer, never()).onNext("default"); verify(observer, never()).onNext("one"); @@ -120,13 +120,13 @@ public void testSubscribeToCompletedOnlyEmitsOnComplete() { @Test public void testSubscribeToErrorOnlyEmitsOnError() { - BehaviorProcessor subject = BehaviorProcessor.createDefault("default"); - subject.onNext("one"); + BehaviorProcessor processor = BehaviorProcessor.createDefault("default"); + processor.onNext("one"); RuntimeException re = new RuntimeException("test error"); - subject.onError(re); + processor.onError(re); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); verify(observer, never()).onNext("default"); verify(observer, never()).onNext("one"); @@ -181,15 +181,15 @@ public void testCompletedStopsEmittingData() { @Test public void testCompletedAfterErrorIsNotSent() { - BehaviorProcessor subject = BehaviorProcessor.createDefault("default"); + BehaviorProcessor processor = BehaviorProcessor.createDefault("default"); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext("one"); - subject.onError(testException); - subject.onNext("two"); - subject.onComplete(); + processor.onNext("one"); + processor.onError(testException); + processor.onNext("two"); + processor.onComplete(); verify(observer, times(1)).onNext("default"); verify(observer, times(1)).onNext("one"); @@ -200,15 +200,15 @@ public void testCompletedAfterErrorIsNotSent() { @Test public void testCompletedAfterErrorIsNotSent2() { - BehaviorProcessor subject = BehaviorProcessor.createDefault("default"); + BehaviorProcessor processor = BehaviorProcessor.createDefault("default"); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext("one"); - subject.onError(testException); - subject.onNext("two"); - subject.onComplete(); + processor.onNext("one"); + processor.onError(testException); + processor.onNext("two"); + processor.onComplete(); verify(observer, times(1)).onNext("default"); verify(observer, times(1)).onNext("one"); @@ -217,7 +217,7 @@ public void testCompletedAfterErrorIsNotSent2() { verify(observer, never()).onComplete(); Subscriber o2 = TestHelper.mockSubscriber(); - subject.subscribe(o2); + processor.subscribe(o2); verify(o2, times(1)).onError(testException); verify(o2, never()).onNext(any()); verify(o2, never()).onComplete(); @@ -225,15 +225,15 @@ public void testCompletedAfterErrorIsNotSent2() { @Test public void testCompletedAfterErrorIsNotSent3() { - BehaviorProcessor subject = BehaviorProcessor.createDefault("default"); + BehaviorProcessor processor = BehaviorProcessor.createDefault("default"); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext("one"); - subject.onComplete(); - subject.onNext("two"); - subject.onComplete(); + processor.onNext("one"); + processor.onComplete(); + processor.onNext("two"); + processor.onComplete(); verify(observer, times(1)).onNext("default"); verify(observer, times(1)).onNext("one"); @@ -242,7 +242,7 @@ public void testCompletedAfterErrorIsNotSent3() { verify(observer, never()).onNext("two"); Subscriber o2 = TestHelper.mockSubscriber(); - subject.subscribe(o2); + processor.subscribe(o2); verify(o2, times(1)).onComplete(); verify(o2, never()).onNext(any()); verify(observer, never()).onError(any(Throwable.class)); diff --git a/src/test/java/io/reactivex/processors/PublishProcessorTest.java b/src/test/java/io/reactivex/processors/PublishProcessorTest.java index 918650c8a1..42f43bb1a6 100644 --- a/src/test/java/io/reactivex/processors/PublishProcessorTest.java +++ b/src/test/java/io/reactivex/processors/PublishProcessorTest.java @@ -40,22 +40,22 @@ protected FlowableProcessor create() { @Test public void testCompleted() { - PublishProcessor subject = PublishProcessor.create(); + PublishProcessor processor = PublishProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext("one"); - subject.onNext("two"); - subject.onNext("three"); - subject.onComplete(); + processor.onNext("one"); + processor.onNext("two"); + processor.onNext("three"); + processor.onComplete(); Subscriber anotherSubscriber = TestHelper.mockSubscriber(); - subject.subscribe(anotherSubscriber); + processor.subscribe(anotherSubscriber); - subject.onNext("four"); - subject.onComplete(); - subject.onError(new Throwable()); + processor.onNext("four"); + processor.onComplete(); + processor.onError(new Throwable()); assertCompletedSubscriber(observer); // todo bug? assertNeverSubscriber(anotherSubscriber); @@ -113,22 +113,22 @@ private void assertCompletedSubscriber(Subscriber observer) { @Test public void testError() { - PublishProcessor subject = PublishProcessor.create(); + PublishProcessor processor = PublishProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext("one"); - subject.onNext("two"); - subject.onNext("three"); - subject.onError(testException); + processor.onNext("one"); + processor.onNext("two"); + processor.onNext("three"); + processor.onError(testException); Subscriber anotherSubscriber = TestHelper.mockSubscriber(); - subject.subscribe(anotherSubscriber); + processor.subscribe(anotherSubscriber); - subject.onNext("four"); - subject.onError(new Throwable()); - subject.onComplete(); + processor.onNext("four"); + processor.onError(new Throwable()); + processor.onComplete(); assertErrorSubscriber(observer); // todo bug? assertNeverSubscriber(anotherSubscriber); @@ -144,21 +144,21 @@ private void assertErrorSubscriber(Subscriber observer) { @Test public void testSubscribeMidSequence() { - PublishProcessor subject = PublishProcessor.create(); + PublishProcessor processor = PublishProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext("one"); - subject.onNext("two"); + processor.onNext("one"); + processor.onNext("two"); assertObservedUntilTwo(observer); Subscriber anotherSubscriber = TestHelper.mockSubscriber(); - subject.subscribe(anotherSubscriber); + processor.subscribe(anotherSubscriber); - subject.onNext("three"); - subject.onComplete(); + processor.onNext("three"); + processor.onComplete(); assertCompletedSubscriber(observer); assertCompletedStartingWithThreeSubscriber(anotherSubscriber); @@ -174,23 +174,23 @@ private void assertCompletedStartingWithThreeSubscriber(Subscriber obser @Test public void testUnsubscribeFirstSubscriber() { - PublishProcessor subject = PublishProcessor.create(); + PublishProcessor processor = PublishProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); TestSubscriber ts = new TestSubscriber(observer); - subject.subscribe(ts); + processor.subscribe(ts); - subject.onNext("one"); - subject.onNext("two"); + processor.onNext("one"); + processor.onNext("two"); ts.dispose(); assertObservedUntilTwo(observer); Subscriber anotherSubscriber = TestHelper.mockSubscriber(); - subject.subscribe(anotherSubscriber); + processor.subscribe(anotherSubscriber); - subject.onNext("three"); - subject.onComplete(); + processor.onNext("three"); + processor.onComplete(); assertObservedUntilTwo(observer); assertCompletedStartingWithThreeSubscriber(anotherSubscriber); @@ -220,7 +220,7 @@ public void testNestedSubscribe() { public Flowable apply(final Integer v) { countParent.incrementAndGet(); - // then subscribe to subject again (it will not receive the previous value) + // then subscribe to processor again (it will not receive the previous value) return s.map(new Function() { @Override diff --git a/src/test/java/io/reactivex/processors/ReplayProcessorBoundedConcurrencyTest.java b/src/test/java/io/reactivex/processors/ReplayProcessorBoundedConcurrencyTest.java index 5dcc0fb4e4..7edef31ecd 100644 --- a/src/test/java/io/reactivex/processors/ReplayProcessorBoundedConcurrencyTest.java +++ b/src/test/java/io/reactivex/processors/ReplayProcessorBoundedConcurrencyTest.java @@ -227,10 +227,10 @@ public void run() { @Test(timeout = 10000) public void testSubscribeCompletionRaceCondition() { for (int i = 0; i < 50; i++) { - final ReplayProcessor subject = ReplayProcessor.createUnbounded(); + final ReplayProcessor processor = ReplayProcessor.createUnbounded(); final AtomicReference value1 = new AtomicReference(); - subject.subscribe(new Consumer() { + processor.subscribe(new Consumer() { @Override public void accept(String t1) { @@ -249,15 +249,15 @@ public void accept(String t1) { @Override public void run() { - subject.onNext("value"); - subject.onComplete(); + processor.onNext("value"); + processor.onComplete(); } }); - SubjectObserverThread t2 = new SubjectObserverThread(subject); - SubjectObserverThread t3 = new SubjectObserverThread(subject); - SubjectObserverThread t4 = new SubjectObserverThread(subject); - SubjectObserverThread t5 = new SubjectObserverThread(subject); + SubjectObserverThread t2 = new SubjectObserverThread(processor); + SubjectObserverThread t3 = new SubjectObserverThread(processor); + SubjectObserverThread t4 = new SubjectObserverThread(processor); + SubjectObserverThread t5 = new SubjectObserverThread(processor); t2.start(); t3.start(); @@ -300,18 +300,18 @@ public void testRaceForTerminalState() { private static class SubjectObserverThread extends Thread { - private final ReplayProcessor subject; + private final ReplayProcessor processor; private final AtomicReference value = new AtomicReference(); - SubjectObserverThread(ReplayProcessor subject) { - this.subject = subject; + SubjectObserverThread(ReplayProcessor processor) { + this.processor = processor; } @Override public void run() { try { // a timeout exception will happen if we don't get a terminal state - String v = subject.timeout(2000, TimeUnit.MILLISECONDS).blockingSingle(); + String v = processor.timeout(2000, TimeUnit.MILLISECONDS).blockingSingle(); value.set(v); } catch (Exception e) { e.printStackTrace(); diff --git a/src/test/java/io/reactivex/processors/ReplayProcessorConcurrencyTest.java b/src/test/java/io/reactivex/processors/ReplayProcessorConcurrencyTest.java index ff9168339c..62b0817678 100644 --- a/src/test/java/io/reactivex/processors/ReplayProcessorConcurrencyTest.java +++ b/src/test/java/io/reactivex/processors/ReplayProcessorConcurrencyTest.java @@ -227,10 +227,10 @@ public void run() { @Test(timeout = 10000) public void testSubscribeCompletionRaceCondition() { for (int i = 0; i < 50; i++) { - final ReplayProcessor subject = ReplayProcessor.create(); + final ReplayProcessor processor = ReplayProcessor.create(); final AtomicReference value1 = new AtomicReference(); - subject.subscribe(new Consumer() { + processor.subscribe(new Consumer() { @Override public void accept(String t1) { @@ -249,15 +249,15 @@ public void accept(String t1) { @Override public void run() { - subject.onNext("value"); - subject.onComplete(); + processor.onNext("value"); + processor.onComplete(); } }); - SubjectObserverThread t2 = new SubjectObserverThread(subject); - SubjectObserverThread t3 = new SubjectObserverThread(subject); - SubjectObserverThread t4 = new SubjectObserverThread(subject); - SubjectObserverThread t5 = new SubjectObserverThread(subject); + SubjectObserverThread t2 = new SubjectObserverThread(processor); + SubjectObserverThread t3 = new SubjectObserverThread(processor); + SubjectObserverThread t4 = new SubjectObserverThread(processor); + SubjectObserverThread t5 = new SubjectObserverThread(processor); t2.start(); t3.start(); @@ -300,18 +300,18 @@ public void testRaceForTerminalState() { static class SubjectObserverThread extends Thread { - private final ReplayProcessor subject; + private final ReplayProcessor processor; private final AtomicReference value = new AtomicReference(); - SubjectObserverThread(ReplayProcessor subject) { - this.subject = subject; + SubjectObserverThread(ReplayProcessor processor) { + this.processor = processor; } @Override public void run() { try { // a timeout exception will happen if we don't get a terminal state - String v = subject.timeout(2000, TimeUnit.MILLISECONDS).blockingSingle(); + String v = processor.timeout(2000, TimeUnit.MILLISECONDS).blockingSingle(); value.set(v); } catch (Exception e) { e.printStackTrace(); diff --git a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java index 6a3f040fb2..e42f801f4a 100644 --- a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java +++ b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java @@ -44,25 +44,25 @@ protected FlowableProcessor create() { @Test public void testCompleted() { - ReplayProcessor subject = ReplayProcessor.create(); + ReplayProcessor processor = ReplayProcessor.create(); Subscriber o1 = TestHelper.mockSubscriber(); - subject.subscribe(o1); + processor.subscribe(o1); - subject.onNext("one"); - subject.onNext("two"); - subject.onNext("three"); - subject.onComplete(); + processor.onNext("one"); + processor.onNext("two"); + processor.onNext("three"); + processor.onComplete(); - subject.onNext("four"); - subject.onComplete(); - subject.onError(new Throwable()); + processor.onNext("four"); + processor.onComplete(); + processor.onError(new Throwable()); assertCompletedSubscriber(o1); // assert that subscribing a 2nd time gets the same data Subscriber o2 = TestHelper.mockSubscriber(); - subject.subscribe(o2); + processor.subscribe(o2); assertCompletedSubscriber(o2); } @@ -137,17 +137,17 @@ public void testCompletedStopsEmittingData() { @Test public void testCompletedAfterError() { - ReplayProcessor subject = ReplayProcessor.create(); + ReplayProcessor processor = ReplayProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); - subject.onNext("one"); - subject.onError(testException); - subject.onNext("two"); - subject.onComplete(); - subject.onError(new RuntimeException()); + processor.onNext("one"); + processor.onError(testException); + processor.onNext("two"); + processor.onComplete(); + processor.onError(new RuntimeException()); - subject.subscribe(observer); + processor.subscribe(observer); verify(observer).onSubscribe((Subscription)notNull()); verify(observer, times(1)).onNext("one"); verify(observer, times(1)).onError(testException); @@ -167,24 +167,24 @@ private void assertCompletedSubscriber(Subscriber observer) { @Test public void testError() { - ReplayProcessor subject = ReplayProcessor.create(); + ReplayProcessor processor = ReplayProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext("one"); - subject.onNext("two"); - subject.onNext("three"); - subject.onError(testException); + processor.onNext("one"); + processor.onNext("two"); + processor.onNext("three"); + processor.onError(testException); - subject.onNext("four"); - subject.onError(new Throwable()); - subject.onComplete(); + processor.onNext("four"); + processor.onError(new Throwable()); + processor.onComplete(); assertErrorSubscriber(observer); observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); assertErrorSubscriber(observer); } @@ -198,22 +198,22 @@ private void assertErrorSubscriber(Subscriber observer) { @Test public void testSubscribeMidSequence() { - ReplayProcessor subject = ReplayProcessor.create(); + ReplayProcessor processor = ReplayProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); - subject.subscribe(observer); + processor.subscribe(observer); - subject.onNext("one"); - subject.onNext("two"); + processor.onNext("one"); + processor.onNext("two"); assertObservedUntilTwo(observer); Subscriber anotherSubscriber = TestHelper.mockSubscriber(); - subject.subscribe(anotherSubscriber); + processor.subscribe(anotherSubscriber); assertObservedUntilTwo(anotherSubscriber); - subject.onNext("three"); - subject.onComplete(); + processor.onNext("three"); + processor.onComplete(); assertCompletedSubscriber(observer); assertCompletedSubscriber(anotherSubscriber); @@ -221,24 +221,24 @@ public void testSubscribeMidSequence() { @Test public void testUnsubscribeFirstSubscriber() { - ReplayProcessor subject = ReplayProcessor.create(); + ReplayProcessor processor = ReplayProcessor.create(); Subscriber observer = TestHelper.mockSubscriber(); TestSubscriber ts = new TestSubscriber(observer); - subject.subscribe(ts); + processor.subscribe(ts); - subject.onNext("one"); - subject.onNext("two"); + processor.onNext("one"); + processor.onNext("two"); ts.dispose(); assertObservedUntilTwo(observer); Subscriber anotherSubscriber = TestHelper.mockSubscriber(); - subject.subscribe(anotherSubscriber); + processor.subscribe(anotherSubscriber); assertObservedUntilTwo(anotherSubscriber); - subject.onNext("three"); - subject.onComplete(); + processor.onNext("three"); + processor.onComplete(); assertObservedUntilTwo(observer); assertCompletedSubscriber(anotherSubscriber); @@ -309,15 +309,15 @@ public void onNext(String v) { }; - ReplayProcessor subject = ReplayProcessor.create(); - subject.subscribe(observer1); - subject.onNext("one"); + ReplayProcessor processor = ReplayProcessor.create(); + processor.subscribe(observer1); + processor.onNext("one"); assertEquals("one", lastValueForSubscriber1.get()); - subject.onNext("two"); + processor.onNext("two"); assertEquals("two", lastValueForSubscriber1.get()); // use subscribeOn to make this async otherwise we deadlock as we are using CountDownLatches - subject.subscribeOn(Schedulers.newThread()).subscribe(observer2); + processor.subscribeOn(Schedulers.newThread()).subscribe(observer2); System.out.println("before waiting for one"); @@ -326,7 +326,7 @@ public void onNext(String v) { System.out.println("after waiting for one"); - subject.onNext("three"); + processor.onNext("three"); System.out.println("sent three"); @@ -335,9 +335,9 @@ public void onNext(String v) { System.out.println("about to send onComplete"); - subject.onComplete(); + processor.onComplete(); - System.out.println("completed subject"); + System.out.println("completed processor"); // release makeSlow.countDown(); diff --git a/src/test/java/io/reactivex/processors/SerializedProcessorTest.java b/src/test/java/io/reactivex/processors/SerializedProcessorTest.java index ba981bde07..db60b5c793 100644 --- a/src/test/java/io/reactivex/processors/SerializedProcessorTest.java +++ b/src/test/java/io/reactivex/processors/SerializedProcessorTest.java @@ -30,11 +30,11 @@ public class SerializedProcessorTest { @Test public void testBasic() { - SerializedProcessor subject = new SerializedProcessor(PublishProcessor. create()); + SerializedProcessor processor = new SerializedProcessor(PublishProcessor. create()); TestSubscriber ts = new TestSubscriber(); - subject.subscribe(ts); - subject.onNext("hello"); - subject.onComplete(); + processor.subscribe(ts); + processor.onNext("hello"); + processor.onComplete(); ts.awaitTerminalEvent(); ts.assertValue("hello"); }