Skip to content

Commit

Permalink
ReplaySubject UnitTests
Browse files Browse the repository at this point in the history
Shows bug handling terminal state.
  • Loading branch information
benjchristensen committed Dec 11, 2013
1 parent 73fc535 commit 72bf445
Showing 1 changed file with 91 additions and 4 deletions.
95 changes: 91 additions & 4 deletions rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -56,6 +56,93 @@ public void testCompleted() {
assertCompletedObserver(o2);
}

@Test
public void testCompletedStopsEmittingData() {
ReplaySubject<Integer> channel = ReplaySubject.create();
@SuppressWarnings("unchecked")
Observer<Object> observerA = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<Object> observerB = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<Object> observerC = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<Object> observerD = mock(Observer.class);

Subscription a = channel.subscribe(observerA);
Subscription b = channel.subscribe(observerB);

InOrder inOrderA = inOrder(observerA);
InOrder inOrderB = inOrder(observerB);
InOrder inOrderC = inOrder(observerC);
InOrder inOrderD = inOrder(observerD);

channel.onNext(42);

// both A and B should have received 42 from before subscription
inOrderA.verify(observerA).onNext(42);
inOrderB.verify(observerB).onNext(42);

a.unsubscribe();

// a should receive no more
inOrderA.verifyNoMoreInteractions();

channel.onNext(4711);

// only be should receive 4711 at this point
inOrderB.verify(observerB).onNext(4711);

channel.onCompleted();

// B is subscribed so should receive onCompleted
inOrderB.verify(observerB).onCompleted();

Subscription c = channel.subscribe(observerC);

// when C subscribes it should receive 42, 4711, onCompleted
inOrderC.verify(observerC).onNext(42);
inOrderC.verify(observerC).onNext(4711);
inOrderC.verify(observerC).onCompleted();

// if further events are propagated they should be ignored
channel.onNext(13);
channel.onNext(14);
channel.onNext(15);
channel.onError(new RuntimeException());

// a new subscription should only receive what was emitted prior to terminal state onCompleted
Subscription d = channel.subscribe(observerD);

inOrderD.verify(observerD).onNext(42);
inOrderD.verify(observerD).onNext(4711);
inOrderD.verify(observerD).onCompleted();

Mockito.verifyNoMoreInteractions(observerA);
Mockito.verifyNoMoreInteractions(observerB);
Mockito.verifyNoMoreInteractions(observerC);
Mockito.verifyNoMoreInteractions(observerD);

}

@Test
public void testCompletedAfterError() {
ReplaySubject<String> subject = ReplaySubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);

subject.onNext("one");
subject.onError(testException);
subject.onNext("two");
subject.onCompleted();
subject.onError(new RuntimeException());

subject.subscribe(aObserver);
verify(aObserver, times(1)).onNext("one");
verify(aObserver, times(1)).onError(testException);
verifyNoMoreInteractions(aObserver);
}

private void assertCompletedObserver(Observer<String> aObserver) {
InOrder inOrder = inOrder(aObserver);

Expand Down

0 comments on commit 72bf445

Please sign in to comment.