Skip to content

Commit

Permalink
Fixed issue ReactiveX#417
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Oct 25, 2013
1 parent 685215f commit 8fe4367
Showing 1 changed file with 48 additions and 12 deletions.
60 changes: 48 additions & 12 deletions rxjava-core/src/main/java/rx/operators/OperationMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
*/
package rx.operators;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
Expand All @@ -33,6 +37,7 @@
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

Expand All @@ -59,17 +64,12 @@ public final class OperationMap {
* @return a sequence that is the result of applying the transformation function to each item in the input sequence.
*/
public static <T, R> OnSubscribeFunc<R> map(final Observable<? extends T> sequence, final Func1<? super T, ? extends R> func) {
return new OnSubscribeFunc<R>() {
@Override
public Subscription onSubscribe(Observer<? super R> observer) {
return new MapObservable<T, R>(sequence, new Func2<T, Integer, R>() {
return mapWithIndex(sequence, new Func2<T, Integer, R>() {
@Override
public R call(T value, @SuppressWarnings("unused") Integer unused) {
return func.call(value);
}
}).onSubscribe(observer);
}
};
});
}

/**
Expand Down Expand Up @@ -136,7 +136,8 @@ public MapObservable(Observable<? extends T> sequence, Func2<? super T, Integer,

@Override
public Subscription onSubscribe(final Observer<? super R> observer) {
return sequence.subscribe(new Observer<T>() {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
return subscription.wrap(sequence.subscribe(new SafeObserver<T>(subscription, new Observer<T>() {
@Override
public void onNext(T value) {
observer.onNext(func.call(value, index));
Expand All @@ -152,7 +153,7 @@ public void onError(Throwable ex) {
public void onCompleted() {
observer.onCompleted();
}
});
})));
}
}

Expand Down Expand Up @@ -366,6 +367,41 @@ public String call(String s) {
assertEquals(1, c2.get());
}

@Test(expected = IllegalArgumentException.class)
public void testMapWithIssue417() {
Observable.from(1).observeOn(Schedulers.threadPoolForComputation())
.map(new Func1<Integer, Integer>() {
public Integer call(Integer arg0) {
throw new IllegalArgumentException("any error");
}
}).toBlockingObservable().single();
}

@Test
public void testMapWithErrorInFuncAndThreadPoolScheduler() throws InterruptedException {
// The error will throw in one of threads in the thread pool.
// If map does not handle it, the error will disappear.
// so map needs to handle the error by itself.
final CountDownLatch latch = new CountDownLatch(1);
Observable<String> m = Observable.from("one")
.observeOn(Schedulers.threadPoolForComputation())
.map(new Func1<String, String>() {
public String call(String arg0) {
try {
throw new IllegalArgumentException("any error");
} finally {
latch.countDown();
}
}
});

m.subscribe(stringObserver);
latch.await();
InOrder inorder = inOrder(stringObserver);
inorder.verify(stringObserver, times(1)).onError(any(IllegalArgumentException.class));
inorder.verifyNoMoreInteractions();
}

private static Map<String, String> getMap(String prefix) {
Map<String, String> m = new HashMap<String, String>();
m.put("firstName", prefix + "First");
Expand Down

0 comments on commit 8fe4367

Please sign in to comment.