diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java index da33d262d8..be9ab4fc54 100644 --- a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java +++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java @@ -25,6 +25,7 @@ import rx.Subscriber; import rx.functions.Action1; import rx.functions.Func1; +import rx.functions.Functions; import rx.operators.BlockingOperatorLatest; import rx.operators.BlockingOperatorMostRecent; import rx.operators.BlockingOperatorNext; @@ -381,17 +382,7 @@ public T single(Func1 predicate) { * @see MSDN: Observable.SingleOrDefault */ public T singleOrDefault(T defaultValue) { - Iterator it = this.toIterable().iterator(); - - if (!it.hasNext()) { - return defaultValue; - } - - T result = it.next(); - if (it.hasNext()) { - throw new IllegalArgumentException("Sequence contains too many elements"); - } - return result; + return from(o.map(Functions.identity()).singleOrDefault(defaultValue)).single(); } /** diff --git a/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java b/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java index d9312e651f..75c047b099 100644 --- a/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java +++ b/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java @@ -16,10 +16,13 @@ package rx.observables; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Before; @@ -28,10 +31,14 @@ import org.mockito.MockitoAnnotations; import rx.Observable; +import rx.Observable.OnSubscribe; import rx.Subscriber; import rx.exceptions.TestException; +import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func1; +import rx.schedulers.Schedulers; +import rx.subscriptions.Subscriptions; public class BlockingObservableTest { @@ -377,4 +384,30 @@ public Boolean call(String args) { }); assertEquals("default", first); } + + @Test + public void testSingleOrDefaultUnsubscribe() throws InterruptedException { + final CountDownLatch unsubscribe = new CountDownLatch(1); + Observable o = Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber subscriber) { + subscriber.add(Subscriptions.create(new Action0() { + @Override + public void call() { + unsubscribe.countDown(); + } + })); + subscriber.onNext(1); + subscriber.onNext(2); + // Don't call `onCompleted` to emulate an infinite stream + } + }).subscribeOn(Schedulers.newThread()); + try { + o.toBlocking().singleOrDefault(-1); + fail("Expected IllegalArgumentException because there are 2 elements"); + } catch (IllegalArgumentException e) { + // Expected + } + assertTrue("Timeout means `unsubscribe` is not called", unsubscribe.await(30, TimeUnit.SECONDS)); + } }