Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented: ForEach #131

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,40 @@ def class ObservableTests {
Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5));
}

@Test
public void testForEach() {
Observable.toObservable(1, 3, 2, 5, 4).forEach({ result -> a.received(result)});
verify(a, times(1)).received(1);
verify(a, times(1)).received(3);
verify(a, times(1)).received(2);
verify(a, times(1)).received(5);
verify(a, times(1)).received(4);
}

@Test
public void testForEachWithComplete() {
Observable.toObservable(1, 3, 2, 5, 4).forEach({ result -> a.received(result)}, {a.received('done')});
verify(a, times(1)).received(1);
verify(a, times(1)).received(3);
verify(a, times(1)).received(2);
verify(a, times(1)).received(5);
verify(a, times(1)).received(4);
verify(a, times(1)).received("done");
}

@Test
public void testForEachWithCompleteAndError() {
Observable.toObservable(1, 3, 2, 5, 4).forEach({ result -> throw new RuntimeException('err')}, {a.received('done')}, {err -> a.received(err.message)});
verify(a, times(0)).received(1);
verify(a, times(0)).received(3);
verify(a, times(0)).received(2);
verify(a, times(0)).received(5);
verify(a, times(0)).received(4);
verify(a, times(1)).received("err");
verify(a, times(0)).received("done");
}


def class TestFactory {
int counter = 1;
Expand Down
177 changes: 177 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import rx.operators.OperationConcat;
import rx.operators.OperationFilter;
import rx.operators.OperationForEach;
import rx.operators.OperationLast;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
Expand Down Expand Up @@ -1781,6 +1782,122 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) {
});
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
*/
public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext) {
OperationForEach.forEach(sequence, onNext);
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
*/
public static <T> void forEach(final Observable<T> sequence, final Object onNext) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(onNext);
OperationForEach.forEach(sequence,
new Action1<T>() {

@Override
public void call(T t1) {
_f.call(t1);

}
});
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
* @param onCompleted
*/
public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext, final Action0 onCompleted) {
OperationForEach.forEach(sequence, onNext, onCompleted);
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
* @param onCompleted
*/
public static <T> void forEach(final Observable<T> sequence, final Object onNext, final Object onCompleted) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(onNext);
@SuppressWarnings("rawtypes")
final FuncN _f2 = Functions.from(onCompleted);
OperationForEach.forEach(sequence,
new Action1<T>() {

@Override
public void call(T t1) {
_f.call(t1);

}
}, new Action0() {

@Override
public void call() {
_f2.call();
}
});
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
* @param onCompleted
* @param onError
*/
public static <T> void forEach(final Observable<T> sequence, final Action1<T> onNext, final Action0 onCompleted,
final Action1<Exception> onError) {
OperationForEach.forEach(sequence, onNext, onCompleted, onError);
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
* @param onCompleted
* @param onError
*/
public static <T> void forEach(final Observable<T> sequence, final Object onNext, final Object onCompleted,
final Object onError) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(onNext);
@SuppressWarnings("rawtypes")
final FuncN _f2 = Functions.from(onCompleted);
@SuppressWarnings("rawtypes")
final FuncN _f3 = Functions.from(onError);
OperationForEach.forEach(sequence,
new Action1<T>() {

@Override
public void call(T t1) {
_f.call(t1);

}
}, new Action0() {

@Override
public void call() {
_f2.call();
}
}, new Action1<Exception>() {

@Override
public void call(Exception t1) {
_f3.call(t1);
}
});
}

/**
* Filters an Observable by discarding any of its emissions that do not meet some test.
* <p>
Expand Down Expand Up @@ -2303,6 +2420,66 @@ public Observable<T> take(final int num) {
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
*/
public void forEach(final Action1<T> onNext) {
forEach(this, onNext);
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
*/
public <T> void forEach(final Object onNext) {
forEach(this, onNext);
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
* @param onCompleted
*/
public void forEach(final Action1<T> onNext, final Action0 onCompleted) {
forEach(this, onNext, onCompleted);
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
* @param onCompleted
*/
public void forEach(final Object onNext, final Object onCompleted) {
forEach(this, onNext, onCompleted);
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
* @param onCompleted
* @param onError
*/
public void forEach(final Action1<T> onNext, final Action0 onCompleted, final Action1<Exception> onError) {
forEach(this, onNext, onCompleted, onError);
}

/**
* Invokes an action for each element in the sequence.
*
* @param onNext
* @param onCompleted
* @param onError
*/
public void forEach(final Object onNext, final Object onCompleted, final Object onError) {
forEach(this, onNext, onCompleted, onError);
}

/*
* Returns an Observable that emits the last <code>count</code> items emitted by the source
* Observable.
*
Expand Down
Loading