-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This commit adds BlockingSingle, the blocking version of rx.Single. BlockingSingle has the following methods: i `from(Single)` -- factory method for creating a `BlockingSingle` from a `Single` - `get()` -- returns the value emitted from the Single - `get(Func1<T,Boolean> predicate)` -- returns the value if it matches the provided predicate - `toFuture()` -- returns a `java.util.concurrent.Future` Adds Single.toBlocking
- Loading branch information
Showing
7 changed files
with
380 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/** | ||
* Copyright 2015 Netflix, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package rx.internal.util; | ||
|
||
import rx.Subscription; | ||
import rx.annotations.Experimental; | ||
|
||
import java.util.concurrent.CountDownLatch; | ||
|
||
/** | ||
* Utility functions relating to blocking types. | ||
* <p/> | ||
* Not intended to be part of the public API. | ||
*/ | ||
@Experimental | ||
public final class BlockingUtils { | ||
|
||
private BlockingUtils() { } | ||
|
||
/** | ||
* Blocks and waits for a {@link Subscription} to complete. | ||
* | ||
* @param latch a CountDownLatch | ||
* @param subscription the Subscription to wait on. | ||
*/ | ||
@Experimental | ||
public static void awaitForComplete(CountDownLatch latch, Subscription subscription) { | ||
if (latch.getCount() == 0) { | ||
// Synchronous observable completes before awaiting for it. | ||
// Skip await so InterruptedException will never be thrown. | ||
return; | ||
} | ||
// block until the subscription completes and then return | ||
try { | ||
latch.await(); | ||
} catch (InterruptedException e) { | ||
subscription.unsubscribe(); | ||
// set the interrupted flag again so callers can still get it | ||
// for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780 | ||
Thread.currentThread().interrupt(); | ||
// using Runtime so it is not checked | ||
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/** | ||
* Copyright 2015 Netflix, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package rx.singles; | ||
|
||
import rx.Single; | ||
import rx.SingleSubscriber; | ||
import rx.Subscription; | ||
import rx.annotations.Experimental; | ||
import rx.internal.operators.BlockingOperatorToFuture; | ||
import rx.internal.util.BlockingUtils; | ||
|
||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
/** | ||
* {@code BlockingSingle} is a blocking "version" of {@link Single} that provides blocking | ||
* operators. | ||
* <p/> | ||
* You construct a {@code BlockingSingle} from a {@code Single} with {@link #from(Single)} | ||
* or {@link Single#toBlocking()}. | ||
*/ | ||
@Experimental | ||
public class BlockingSingle<T> { | ||
private final Single<? extends T> single; | ||
|
||
private BlockingSingle(Single<? extends T> single) { | ||
this.single = single; | ||
} | ||
|
||
/** | ||
* Converts a {@link Single} into a {@code BlockingSingle}. | ||
* | ||
* @param single the {@link Single} you want to convert | ||
* @return a {@code BlockingSingle} version of {@code single} | ||
*/ | ||
@Experimental | ||
public static <T> BlockingSingle<T> from(Single<? extends T> single) { | ||
return new BlockingSingle<T>(single); | ||
} | ||
|
||
/** | ||
* Returns the item emitted by this {@code BlockingSingle}. | ||
* <p/> | ||
* If the underlying {@link Single} returns successfully, the value emitted | ||
* by the {@link Single} is returned. If the {@link Single} emits an error, | ||
* the throwable emitted ({@link SingleSubscriber#onError(Throwable)}) is | ||
* thrown. | ||
* | ||
* @return the value emitted by this {@code BlockingSingle} | ||
*/ | ||
@Experimental | ||
public T value() { | ||
final AtomicReference<T> returnItem = new AtomicReference<T>(); | ||
final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>(); | ||
final CountDownLatch latch = new CountDownLatch(1); | ||
Subscription subscription = single.subscribe(new SingleSubscriber<T>() { | ||
@Override | ||
public void onSuccess(T value) { | ||
returnItem.set(value); | ||
latch.countDown(); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable error) { | ||
returnException.set(error); | ||
latch.countDown(); | ||
} | ||
}); | ||
|
||
BlockingUtils.awaitForComplete(latch, subscription); | ||
Throwable throwable = returnException.get(); | ||
if (throwable != null) { | ||
if (throwable instanceof RuntimeException) { | ||
throw (RuntimeException) throwable; | ||
} | ||
throw new RuntimeException(throwable); | ||
} | ||
return returnItem.get(); | ||
} | ||
|
||
/** | ||
* Returns a {@link Future} representing the value emitted by this {@code BlockingSingle}. | ||
* | ||
* @return a {@link Future} that returns the value | ||
*/ | ||
@Experimental | ||
public Future<T> toFuture() { | ||
return BlockingOperatorToFuture.toFuture(single.toObservable()); | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/** | ||
* Copyright 2015 Netflix, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package rx.internal.util; | ||
|
||
import static org.mockito.Mockito.*; | ||
import static org.junit.Assert.*; | ||
|
||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
import org.junit.Test; | ||
|
||
import rx.Observable; | ||
import rx.Subscriber; | ||
import rx.Subscription; | ||
import rx.schedulers.Schedulers; | ||
|
||
/** | ||
* Test suite for {@link BlockingUtils}. | ||
*/ | ||
public class BlockingUtilsTest { | ||
@Test | ||
public void awaitCompleteShouldReturnIfCountIsZero() { | ||
Subscription subscription = mock(Subscription.class); | ||
CountDownLatch latch = new CountDownLatch(0); | ||
BlockingUtils.awaitForComplete(latch, subscription); | ||
verifyZeroInteractions(subscription); | ||
} | ||
|
||
@Test | ||
public void awaitCompleteShouldReturnOnEmpty() { | ||
final CountDownLatch latch = new CountDownLatch(1); | ||
Subscriber<Object> subscription = createSubscription(latch); | ||
Observable<Object> observable = Observable.empty().subscribeOn(Schedulers.newThread()); | ||
observable.subscribe(subscription); | ||
BlockingUtils.awaitForComplete(latch, subscription); | ||
} | ||
|
||
@Test | ||
public void awaitCompleteShouldReturnOnError() { | ||
final CountDownLatch latch = new CountDownLatch(1); | ||
Subscriber<Object> subscription = createSubscription(latch); | ||
Observable<Object> observable = Observable.error(new RuntimeException()).subscribeOn(Schedulers.newThread()); | ||
observable.subscribe(subscription); | ||
BlockingUtils.awaitForComplete(latch, subscription); | ||
} | ||
|
||
@Test | ||
public void shouldThrowRuntimeExceptionOnThreadInterrupted() throws Exception { | ||
final CountDownLatch latch = new CountDownLatch(1); | ||
final Subscription subscription = mock(Subscription.class); | ||
final AtomicReference<Exception> caught = new AtomicReference<Exception>(); | ||
Thread thread = new Thread(new Runnable() { | ||
@Override | ||
public void run() { | ||
Thread.currentThread().interrupt(); | ||
try { | ||
BlockingUtils.awaitForComplete(latch, subscription); | ||
} catch (RuntimeException e) { | ||
caught.set(e); | ||
} | ||
} | ||
}); | ||
thread.run(); | ||
verify(subscription).unsubscribe(); | ||
Exception actual = caught.get(); | ||
assertNotNull(actual); | ||
assertNotNull(actual.getCause()); | ||
assertTrue(actual.getCause() instanceof InterruptedException); | ||
} | ||
|
||
|
||
private static <T> Subscriber<T> createSubscription(final CountDownLatch latch) { | ||
return new Subscriber<T>() { | ||
@Override | ||
public void onNext(T t) { | ||
//no-oop | ||
} | ||
|
||
@Override | ||
public void onError(Throwable e) { | ||
latch.countDown(); | ||
} | ||
|
||
@Override | ||
public void onCompleted() { | ||
latch.countDown(); | ||
} | ||
}; | ||
} | ||
} |
Oops, something went wrong.