diff --git a/src/main/java/rx/internal/operators/BufferUntilSubscriber.java b/src/main/java/rx/internal/operators/BufferUntilSubscriber.java index 313e128cdd..11c85817c4 100644 --- a/src/main/java/rx/internal/operators/BufferUntilSubscriber.java +++ b/src/main/java/rx/internal/operators/BufferUntilSubscriber.java @@ -21,8 +21,7 @@ import rx.Observer; import rx.Subscriber; import rx.functions.Action0; -import rx.observers.EmptyObserver; -import rx.observers.Subscribers; +import rx.observers.Observers; import rx.subjects.Subject; import rx.subscriptions.Subscriptions; @@ -51,9 +50,6 @@ */ public class BufferUntilSubscriber extends Subject { - @SuppressWarnings("rawtypes") - private final static Observer EMPTY_OBSERVER = new EmptyObserver(); - /** * @warn create() undescribed * @return @@ -96,7 +92,7 @@ public void call(final Subscriber s) { s.add(Subscriptions.create(new Action0() { @Override public void call() { - state.observerRef = EMPTY_OBSERVER; + state.observerRef = Observers.empty(); } })); boolean win = false; diff --git a/src/main/java/rx/observers/EmptyObserver.java b/src/main/java/rx/observers/EmptyObserver.java deleted file mode 100644 index 7667fd769b..0000000000 --- a/src/main/java/rx/observers/EmptyObserver.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.observers; - -import rx.Observer; - - -/** - * Observer that does nothing... including swallowing errors. - */ -public class EmptyObserver implements Observer { - - @Override - public void onCompleted() { - - } - - @Override - public void onError(Throwable e) { - - } - - @Override - public void onNext(T args) { - - } - -} diff --git a/src/main/java/rx/observers/Observers.java b/src/main/java/rx/observers/Observers.java index 0d804cbae1..e9bf789372 100644 --- a/src/main/java/rx/observers/Observers.java +++ b/src/main/java/rx/observers/Observers.java @@ -46,9 +46,7 @@ public final void onNext(Object args) { /** * Returns an inert {@link Observer} that does nothing in response to the emissions or notifications from - * any {@code Observable} it subscribes to. This is different, however, from an {@link EmptyObserver}, in - * that it will throw an exception if its {@link Observer#onError onError} method is called (whereas - * {@code EmptyObserver} will swallow the error in such a case). + * any {@code Observable} it subscribes to but will throw an exception if its {@link Observer#onError onError} method is called. * * @return an inert {@code Observer} */ @@ -59,8 +57,8 @@ public static Observer empty() { /** * Creates an {@link Observer} that receives the emissions of any {@code Observable} it subscribes to via - * {@link Observer#onNext onNext} but ignores {@link Observer#onError onError} and - * {@link Observer#onCompleted onCompleted} notifications. + * {@link Observer#onNext onNext} but ignores {@link Observer#onCompleted onCompleted} notifications. + * It will throws an {@link OnErrorNotImplementedException} if {@link Observer#onError onError} is invoked. * * @param onNext * a function that handles each item emitted by an {@code Observable} diff --git a/src/main/java/rx/observers/TestObserver.java b/src/main/java/rx/observers/TestObserver.java index 3c9edad9df..e7f02131b6 100644 --- a/src/main/java/rx/observers/TestObserver.java +++ b/src/main/java/rx/observers/TestObserver.java @@ -36,8 +36,9 @@ public TestObserver(Observer delegate) { this.delegate = delegate; } + @SuppressWarnings("unchecked") public TestObserver() { - this.delegate = Observers.empty(); + this.delegate = (Observer) INERT; } @Override @@ -153,4 +154,23 @@ public void assertTerminalEvent() { } } + // do nothing ... including swallowing errors + private static Observer INERT = new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Object t) { + + } + + }; } diff --git a/src/test/java/rx/observers/TestObserverTest.java b/src/test/java/rx/observers/TestObserverTest.java index 9cfd547657..aa253f2cd2 100644 --- a/src/test/java/rx/observers/TestObserverTest.java +++ b/src/test/java/rx/observers/TestObserverTest.java @@ -119,5 +119,10 @@ public void testWrappingMockWhenUnsubscribeInvolved() { inOrder.verify(mockObserver, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } + + @Test + public void testErrorSwallowed() { + Observable.error(new RuntimeException()).subscribe(new TestObserver()); + } }