Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator: forEach #147

Merged
merged 6 commits into from
Feb 15, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Arrays;

import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

Expand Down Expand Up @@ -213,6 +214,44 @@ def class ObservableTests {
Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5));
}

@Test
public void testForEach() {
Observable.create(new AsyncObservable()).forEach({ result -> a.received(result)});
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
verify(a, times(1)).received(3);
}

@Test
public void testForEachWithError() {
try {
Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')});
fail("we expect an exception to be thrown");
}catch(Exception e) {
// do nothing as we expect this
}
}

def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {

public Subscription call(final Observer<Integer> observer) {
new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(50)
}catch(Exception e) {
// ignore
}
observer.onNext(1);
observer.onNext(2);
observer.onNext(3);
observer.onCompleted();
}
}).start();
return Observable.noOpSubscription();
}
}

def class TestFactory {
int counter = 1;
Expand Down
83 changes: 81 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -332,6 +334,84 @@ public void onNext(T args) {
});
}

/**
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
* <p>
* NOTE: This will block even if the Observable is asynchronous.
* <p>
* This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods.
*
* @param onNext
* {@link Action1}
* @throws RuntimeException
* if error occurs
*/
public void forEach(final Action1<T> onNext) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> exceptionFromOnError = new AtomicReference<Exception>();

subscribe(new Observer<T>() {
public void onCompleted() {
latch.countDown();
}

public void onError(Exception e) {
/*
* If we receive an onError event we set the reference on the outer thread
* so we can git it and throw after the latch.await().
*
* We do this instead of throwing directly since this may be on a different thread and the latch is still waiting.
*/
exceptionFromOnError.set(e);
latch.countDown();
}

public void onNext(T args) {
onNext.call(args);
}
});
// block until the subscription completes and then return
try {
latch.await();
} catch (InterruptedException e) {
// set the interrupted flag again so callers can still get it
// for more information see https://github.com/Netflix/RxJava/pull/147#issuecomment-13624780
Thread.currentThread().interrupt();
// using Runtime so it is not checked
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
}

if (exceptionFromOnError.get() != null) {
if (exceptionFromOnError.get() instanceof RuntimeException) {
throw (RuntimeException) exceptionFromOnError.get();
} else {
throw new RuntimeException(exceptionFromOnError.get());
}
}
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public void forEach(final Object o) {
if (o instanceof Action1) {
// in case a dynamic language is not correctly handling the overloaded methods and we receive an Action1 just forward to the correct method.
forEach((Action1) o);
}

// lookup and memoize onNext
if (o == null) {
throw new RuntimeException("onNext must be implemented");
}
final FuncN onNext = Functions.from(o);

forEach(new Action1() {

public void call(Object args) {
onNext.call(args);
}

});
}

/**
* Allow the {@link RxJavaErrorHandler} to receive the exception from onError.
*
Expand Down Expand Up @@ -2543,7 +2623,6 @@ public void testSequenceEqual() {
verify(result, times(1)).onNext(false);
}



}

}
9 changes: 7 additions & 2 deletions rxjava-core/src/main/java/rx/util/AtomicObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ public void onError(Exception e) {

@Override
public void onNext(T args) {
if (!isFinished.get()) {
actual.onNext(args);
try {
if (!isFinished.get()) {
actual.onNext(args);
}
}catch(Exception e) {
// handle errors if the onNext implementation fails, not just if the Observable fails
onError(e);
}
}

Expand Down