diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index 13d0cc3b57..c4985ba84a 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -164,6 +164,9 @@ public void onError(Throwable e) public void onNext(T args) { synchronized (subscriptions) { + if (isDone) { + return; + } history.add(args); for (Observer observer : new ArrayList>(subscriptions.values())) { observer.onNext(args); diff --git a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java index fef45ca63d..005b56ba5a 100644 --- a/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java +++ b/rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -56,6 +56,93 @@ public void testCompleted() { assertCompletedObserver(o2); } + @Test + public void testCompletedStopsEmittingData() { + ReplaySubject channel = ReplaySubject.create(); + @SuppressWarnings("unchecked") + Observer observerA = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observerB = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observerC = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observerD = mock(Observer.class); + + Subscription a = channel.subscribe(observerA); + Subscription b = channel.subscribe(observerB); + + InOrder inOrderA = inOrder(observerA); + InOrder inOrderB = inOrder(observerB); + InOrder inOrderC = inOrder(observerC); + InOrder inOrderD = inOrder(observerD); + + channel.onNext(42); + + // both A and B should have received 42 from before subscription + inOrderA.verify(observerA).onNext(42); + inOrderB.verify(observerB).onNext(42); + + a.unsubscribe(); + + // a should receive no more + inOrderA.verifyNoMoreInteractions(); + + channel.onNext(4711); + + // only be should receive 4711 at this point + inOrderB.verify(observerB).onNext(4711); + + channel.onCompleted(); + + // B is subscribed so should receive onCompleted + inOrderB.verify(observerB).onCompleted(); + + Subscription c = channel.subscribe(observerC); + + // when C subscribes it should receive 42, 4711, onCompleted + inOrderC.verify(observerC).onNext(42); + inOrderC.verify(observerC).onNext(4711); + inOrderC.verify(observerC).onCompleted(); + + // if further events are propagated they should be ignored + channel.onNext(13); + channel.onNext(14); + channel.onNext(15); + channel.onError(new RuntimeException()); + + // a new subscription should only receive what was emitted prior to terminal state onCompleted + Subscription d = channel.subscribe(observerD); + + inOrderD.verify(observerD).onNext(42); + inOrderD.verify(observerD).onNext(4711); + inOrderD.verify(observerD).onCompleted(); + + Mockito.verifyNoMoreInteractions(observerA); + Mockito.verifyNoMoreInteractions(observerB); + Mockito.verifyNoMoreInteractions(observerC); + Mockito.verifyNoMoreInteractions(observerD); + + } + + @Test + public void testCompletedAfterError() { + ReplaySubject subject = ReplaySubject.create(); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + + subject.onNext("one"); + subject.onError(testException); + subject.onNext("two"); + subject.onCompleted(); + subject.onError(new RuntimeException()); + + subject.subscribe(aObserver); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onError(testException); + verifyNoMoreInteractions(aObserver); + } + private void assertCompletedObserver(Observer aObserver) { InOrder inOrder = inOrder(aObserver);