Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Single.fromCallable() #3418

Merged
merged 1 commit into from
Oct 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package rx;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -605,6 +606,43 @@ public final static <T> Single<T> from(Future<? extends T> future, Scheduler sch
return new Single<T>(OnSubscribeToObservableFuture.toObservableFuture(future)).subscribeOn(scheduler);
}

/**
* Returns a {@link Single} that invokes passed function and emits its result for each new Observer that subscribes.
* <p>
* Allows you to defer execution of passed function until Observer subscribes to the {@link Single}.
* It makes passed function "lazy".
* Result of the function invocation will be emitted by the {@link Single}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromCallable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param func
* function which execution should be deferred, it will be invoked when Observer will subscribe to the {@link Single}.
* @param <T>
* the type of the item emitted by the {@link Single}.
* @return a {@link Single} whose {@link Observer}s' subscriptions trigger an invocation of the given function.
*/
@Experimental
public static <T> Single<T> fromCallable(final Callable<? extends T> func) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should start out as experimental unless the RxJava contributors want to fast-track this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added @Experimental

return create(new OnSubscribe<T>() {
@Override
public void call(SingleSubscriber<? super T> singleSubscriber) {
final T value;

try {
value = func.call();
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
singleSubscriber.onError(t);
return;
}

singleSubscriber.onSuccess(value);
}
});
}

/**
* Returns a {@code Single} that emits a specified item.
* <p>
Expand Down
40 changes: 40 additions & 0 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -530,4 +532,42 @@ public void doOnErrorShouldThrowCompositeExceptionIfOnErrorActionThrows() {

verify(action).call(error);
}

@Test
public void shouldEmitValueFromCallable() throws Exception {
Callable<String> callable = mock(Callable.class);

when(callable.call()).thenReturn("value");

TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.fromCallable(callable)
.subscribe(testSubscriber);

testSubscriber.assertValue("value");
testSubscriber.assertNoErrors();

verify(callable).call();
}

@Test
public void shouldPassErrorFromCallable() throws Exception {
Callable<String> callable = mock(Callable.class);

Throwable error = new IllegalStateException();

when(callable.call()).thenThrow(error);

TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.fromCallable(callable)
.subscribe(testSubscriber);

testSubscriber.assertNoValues();
testSubscriber.assertError(error);

verify(callable).call();
}
}