Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented mapWithIndex #381

Merged
merged 1 commit into from
Sep 14, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3031,11 +3031,29 @@ public Observable<T> where(Func1<? super T, Boolean> predicate) {
* a function to apply to each item emitted by the Observable
* @return an Observable that emits the items from the source Observable, transformed by the
* given function
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244306%28v=vs.103%29.aspx">MSDN: Observable.Select</a>
*/
public <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(OperationMap.map(this, func));
}

/**
* Returns an Observable that applies the given function to each item emitted by an
* Observable and emits the result.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/map.png">
*
* @param func
* a function to apply to each item emitted by the Observable. The function takes the
* index of the emitted item as additional parameter.
* @return an Observable that emits the items from the source Observable, transformed by the
* given function
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244311%28v=vs.103%29.aspx">MSDN: Observable.Select</a>
*/
public <R> Observable<R> mapWithIndex(Func2<? super T, Integer, ? extends R> func) {
return create(OperationMap.mapWithIndex(this, func));
}

/**
* Creates a new Observable by applying a function that you supply to each item emitted by
* the source Observable, where that function returns an Observable, and then merging those
Expand Down
166 changes: 126 additions & 40 deletions rxjava-core/src/main/java/rx/operators/OperationMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

Expand All @@ -33,6 +34,7 @@
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Applies a function of your choosing to every item emitted by an Observable, and returns this
Expand All @@ -56,8 +58,42 @@ public final class OperationMap {
* the type of the output sequence.
* @return a sequence that is the result of applying the transformation function to each item in the input sequence.
*/
public static <T, R> OnSubscribeFunc<R> map(Observable<? extends T> sequence, Func1<? super T, ? extends R> func) {
return new MapObservable<T, R>(sequence, func);
public static <T, R> OnSubscribeFunc<R> map(final Observable<? extends T> sequence, final Func1<? super T, ? extends R> func) {
return new OnSubscribeFunc<R>() {
@Override
public Subscription onSubscribe(Observer<? super R> observer) {
return new MapObservable<T, R>(sequence, new Func2<T, Integer, R>() {
@Override
public R call(T value, @SuppressWarnings("unused") Integer unused) {
return func.call(value);
}
}).onSubscribe(observer);
}
};
}

/**
* Accepts a sequence and a transformation function. Returns a sequence that is the result of
* applying the transformation function to each item in the sequence.
*
* @param sequence
* the input sequence.
* @param func
* a function to apply to each item in the sequence. The function gets the index of the emitted item
* as additional parameter.
* @param <T>
* the type of the input sequence.
* @param <R>
* the type of the output sequence.
* @return a sequence that is the result of applying the transformation function to each item in the input sequence.
*/
public static <T, R> OnSubscribeFunc<R> mapWithIndex(final Observable<? extends T> sequence, final Func2<? super T, Integer, ? extends R> func) {
return new OnSubscribeFunc<R>() {
@Override
public Subscription onSubscribe(Observer<? super R> observer) {
return new MapObservable<T, R>(sequence, func).onSubscribe(observer);
}
};
}

/**
Expand Down Expand Up @@ -89,56 +125,50 @@ public static <T, R> OnSubscribeFunc<R> mapMany(Observable<? extends T> sequence
* the type of the output sequence.
*/
private static class MapObservable<T, R> implements OnSubscribeFunc<R> {
public MapObservable(Observable<? extends T> sequence, Func1<? super T, ? extends R> func) {
public MapObservable(Observable<? extends T> sequence, Func2<? super T, Integer, ? extends R> func) {
this.sequence = sequence;
this.func = func;
}

private Observable<? extends T> sequence;

private Func1<? super T, ? extends R> func;

public Subscription onSubscribe(Observer<? super R> observer) {
return sequence.subscribe(new MapObserver<T, R>(observer, func));
}
}

/**
* An observer that applies a transformation function to each item and forwards the result to an inner observer.
*
* @param <T>
* the type of the observer items.
* @param <R>
* the type of the inner observer items.
*/
private static class MapObserver<T, R> implements Observer<T> {
public MapObserver(Observer<? super R> observer, Func1<? super T, ? extends R> func) {
this.observer = observer;
this.func = func;
}

Observer<? super R> observer;

Func1<? super T, ? extends R> func;
private final Observable<? extends T> sequence;
private final Func2<? super T, Integer, ? extends R> func;
private int index;

public void onNext(T value) {
// let the exception be thrown if func fails as a SafeObserver wrapping this will handle it
observer.onNext(func.call(value));
}
@Override
public Subscription onSubscribe(final Observer<? super R> observer) {
return sequence.subscribe(new Observer<T>() {
@Override
public void onNext(T value) {
observer.onNext(func.call(value, index));
index++;
}

public void onError(Throwable ex) {
observer.onError(ex);
}
@Override
public void onError(Throwable ex) {
observer.onError(ex);
}

public void onCompleted() {
observer.onCompleted();
@Override
public void onCompleted() {
observer.onCompleted();
}
});
}
}

public static class UnitTest {
@Mock
Observer<String> stringObserver;

@Mock
Observer<String> stringObserver2;

final static Func2<String, Integer, String> APPEND_INDEX = new Func2<String, Integer, String>() {
@Override
public String call(String value, Integer index) {
return value + index;
}
};

@Before
public void before() {
MockitoAnnotations.initMocks(this);
Expand All @@ -164,9 +194,42 @@ public String call(Map<String, String> map) {
verify(stringObserver, times(1)).onNext("OneFirst");
verify(stringObserver, times(1)).onNext("TwoFirst");
verify(stringObserver, times(1)).onCompleted();
}

@Test
public void testMapWithIndex() {
Observable<String> w = Observable.from("a", "b", "c");
Observable<String> m = Observable.create(mapWithIndex(w, APPEND_INDEX));
m.subscribe(stringObserver);
InOrder inOrder = inOrder(stringObserver);
inOrder.verify(stringObserver, times(1)).onNext("a0");
inOrder.verify(stringObserver, times(1)).onNext("b1");
inOrder.verify(stringObserver, times(1)).onNext("c2");
inOrder.verify(stringObserver, times(1)).onCompleted();
verify(stringObserver, never()).onError(any(Throwable.class));
}

@Test
public void testMapWithIndexAndMultipleSubscribers() {
Observable<String> w = Observable.from("a", "b", "c");
Observable<String> m = Observable.create(mapWithIndex(w, APPEND_INDEX));
m.subscribe(stringObserver);
m.subscribe(stringObserver2);
InOrder inOrder = inOrder(stringObserver);
inOrder.verify(stringObserver, times(1)).onNext("a0");
inOrder.verify(stringObserver, times(1)).onNext("b1");
inOrder.verify(stringObserver, times(1)).onNext("c2");
inOrder.verify(stringObserver, times(1)).onCompleted();
verify(stringObserver, never()).onError(any(Throwable.class));

InOrder inOrder2 = inOrder(stringObserver2);
inOrder2.verify(stringObserver2, times(1)).onNext("a0");
inOrder2.verify(stringObserver2, times(1)).onNext("b1");
inOrder2.verify(stringObserver2, times(1)).onNext("c2");
inOrder2.verify(stringObserver2, times(1)).onCompleted();
verify(stringObserver2, never()).onError(any(Throwable.class));
}

@Test
public void testMapMany() {
/* simulate a top-level async call which returns IDs */
Expand Down Expand Up @@ -246,12 +309,34 @@ public String call(Map<String, String> map) {

}

@Test
public void testMapWithError() {
Observable<String> w = Observable.from("one", "fail", "two", "three", "fail");
Observable<String> m = Observable.create(map(w, new Func1<String, String>() {
@Override
public String call(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
return s;
}
}));

m.subscribe(stringObserver);
verify(stringObserver, times(1)).onNext("one");
verify(stringObserver, never()).onNext("two");
verify(stringObserver, never()).onNext("three");
verify(stringObserver, never()).onCompleted();
verify(stringObserver, times(1)).onError(any(Throwable.class));
}

@Test
public void testMapWithSynchronousObservableContainingError() {
Observable<String> w = Observable.from("one", "fail", "two", "three", "fail");
final AtomicInteger c1 = new AtomicInteger();
final AtomicInteger c2 = new AtomicInteger();
Observable<String> m = Observable.create(map(w, new Func1<String, String>() {
@Override
public String call(String s) {
if ("fail".equals(s))
throw new RuntimeException("Forced Failure");
Expand All @@ -260,6 +345,7 @@ public String call(String s) {
return s;
}
})).map(new Func1<String, String>() {
@Override
public String call(String s) {
System.out.println("SecondMapper:" + s);
c2.incrementAndGet();
Expand All @@ -280,7 +366,7 @@ public String call(String s) {
assertEquals(1, c2.get());
}

private Map<String, String> getMap(String prefix) {
private static Map<String, String> getMap(String prefix) {
Map<String, String> m = new HashMap<String, String>();
m.put("firstName", prefix + "First");
m.put("lastName", prefix + "Last");
Expand Down