diff --git a/rxjava-core/src/main/java/rx/operators/OperationMap.java b/rxjava-core/src/main/java/rx/operators/OperationMap.java index 9eb2520420..940147b0b8 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMap.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMap.java @@ -15,12 +15,16 @@ */ package rx.operators; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; @@ -33,6 +37,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; +import rx.concurrency.Schedulers; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -59,17 +64,12 @@ public final class OperationMap { * @return a sequence that is the result of applying the transformation function to each item in the input sequence. */ public static OnSubscribeFunc map(final Observable sequence, final Func1 func) { - return new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(Observer observer) { - return new MapObservable(sequence, new Func2() { + return mapWithIndex(sequence, new Func2() { @Override public R call(T value, @SuppressWarnings("unused") Integer unused) { return func.call(value); } - }).onSubscribe(observer); - } - }; + }); } /** @@ -136,7 +136,8 @@ public MapObservable(Observable sequence, Func2 observer) { - return sequence.subscribe(new Observer() { + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + return subscription.wrap(sequence.subscribe(new SafeObserver(subscription, new Observer() { @Override public void onNext(T value) { observer.onNext(func.call(value, index)); @@ -152,7 +153,7 @@ public void onError(Throwable ex) { public void onCompleted() { observer.onCompleted(); } - }); + }))); } } @@ -366,6 +367,41 @@ public String call(String s) { assertEquals(1, c2.get()); } + @Test(expected = IllegalArgumentException.class) + public void testMapWithIssue417() { + Observable.from(1).observeOn(Schedulers.threadPoolForComputation()) + .map(new Func1() { + public Integer call(Integer arg0) { + throw new IllegalArgumentException("any error"); + } + }).toBlockingObservable().single(); + } + + @Test + public void testMapWithErrorInFuncAndThreadPoolScheduler() throws InterruptedException { + // The error will throw in one of threads in the thread pool. + // If map does not handle it, the error will disappear. + // so map needs to handle the error by itself. + final CountDownLatch latch = new CountDownLatch(1); + Observable m = Observable.from("one") + .observeOn(Schedulers.threadPoolForComputation()) + .map(new Func1() { + public String call(String arg0) { + try { + throw new IllegalArgumentException("any error"); + } finally { + latch.countDown(); + } + } + }); + + m.subscribe(stringObserver); + latch.await(); + InOrder inorder = inOrder(stringObserver); + inorder.verify(stringObserver, times(1)).onError(any(IllegalArgumentException.class)); + inorder.verifyNoMoreInteractions(); + } + private static Map getMap(String prefix) { Map m = new HashMap(); m.put("firstName", prefix + "First");