Skip to content

Commit

Permalink
Remote: Add interoperability between Rx and ListenableFuture.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 360611295
  • Loading branch information
coeuvre authored and copybara-github committed Mar 3, 2021
1 parent 2cb919f commit 7a62c2d
Show file tree
Hide file tree
Showing 5 changed files with 481 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/actions:execution_requirements",
"//src/main/java/com/google/devtools/build/lib/analysis:blaze_version_info",
"//src/main/java/com/google/devtools/build/lib/authandtls",
"//src/main/java/com/google/devtools/build/lib/concurrent",
"//src/main/java/com/google/devtools/build/lib/remote:ExecutionStatusException",
"//src/main/java/com/google/devtools/build/lib/remote/common",
"//src/main/java/com/google/devtools/build/lib/remote/options",
Expand All @@ -28,6 +27,7 @@ java_library(
"//src/main/protobuf:failure_details_java_proto",
"//third_party:guava",
"//third_party:jsr305",
"//third_party:rxjava3",
"//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
"//third_party/protobuf:protobuf_java_util",
Expand Down
177 changes: 177 additions & 0 deletions src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2021 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.util;

import static com.google.common.base.Preconditions.checkState;

import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableEmitter;
import io.reactivex.rxjava3.core.CompletableObserver;
import io.reactivex.rxjava3.core.CompletableOnSubscribe;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

/** Methods for interoperating between Rx and ListenableFuture. */
public class RxFutures {

private RxFutures() {}

/**
* Returns a {@link Completable} that is complete once the supplied {@link ListenableFuture} has
* completed.
*
* <p>A {@link ListenableFuture>} represents some computation that is already in progress. We use
* {@link Callable} here to defer the execution of the thing that produces ListenableFuture until
* there is subscriber.
*
* <p>Errors are also propagated except for certain "fatal" exceptions defined by rxjava. Multiple
* subscriptions are not allowed.
*
* <p>Disposes the Completable to cancel the underlying ListenableFuture.
*/
public static Completable toCompletable(
Callable<ListenableFuture<Void>> callable, Executor executor) {
return Completable.create(new OnceCompletableOnSubscribe(callable, executor));
}

private static class OnceCompletableOnSubscribe implements CompletableOnSubscribe {
private final AtomicBoolean subscribed = new AtomicBoolean(false);

private final Callable<ListenableFuture<Void>> callable;
private final Executor executor;

private OnceCompletableOnSubscribe(
Callable<ListenableFuture<Void>> callable, Executor executor) {
this.callable = callable;
this.executor = executor;
}

@Override
public void subscribe(@NonNull CompletableEmitter emitter) throws Throwable {
try {
checkState(!subscribed.getAndSet(true), "This completable cannot be subscribed to twice");
ListenableFuture<Void> future = callable.call();
Futures.addCallback(
future,
new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void t) {
emitter.onComplete();
}

@Override
public void onFailure(Throwable throwable) {
/*
* CancellationException can be thrown in two cases:
* 1. The ListenableFuture itself is cancelled.
* 2. Completable is disposed by downstream.
*
* This check is used to prevent propagating CancellationException to downstream
* when it has already disposed the Completable.
*/
if (throwable instanceof CancellationException && emitter.isDisposed()) {
return;
}

emitter.onError(throwable);
}
},
executor);
emitter.setCancellable(() -> future.cancel(true));
} catch (Throwable t) {
// We failed to construct and listen to the LF. Following RxJava's own behaviour, prefer
// to pass RuntimeExceptions and Errors down to the subscriber except for certain
// "fatal" exceptions.
Exceptions.throwIfFatal(t);
executor.execute(() -> emitter.onError(t));
}
}
}

/**
* Returns a {@link ListenableFuture} that is complete once the {@link Completable} has completed.
*
* <p>Errors are also propagated. If the {@link ListenableFuture} is canceled, the subscription to
* the {@link Completable} will automatically be cancelled.
*/
public static ListenableFuture<Void> toListenableFuture(Completable completable) {
CompletableFuture future = new CompletableFuture();
completable.subscribe(
new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
future.setCancelCallback(d);
}

@Override
public void onComplete() {
// Making the Completable as complete.
future.set(null);
}

@Override
public void onError(Throwable e) {
future.setException(e);
}
});
return future;
}

private static final class CompletableFuture extends AbstractFuture<Void> {
private final AtomicReference<Disposable> cancelCallback = new AtomicReference<>();

private void setCancelCallback(Disposable cancelCallback) {
this.cancelCallback.set(cancelCallback);
// Just in case it was already canceled before we set the callback.
doCancelIfCancelled();
}

private void doCancelIfCancelled() {
if (isCancelled()) {
Disposable callback = cancelCallback.getAndSet(null);
if (callback != null) {
callback.dispose();
}
}
}

@Override
protected void afterDone() {
doCancelIfCancelled();
}

// Allow set to be called by other members.
@Override
protected boolean set(@Nullable Void t) {
return super.set(t);
}

// Allow setException to be called by other members.
@Override
protected boolean setException(Throwable throwable) {
return super.setException(throwable);
}
}
}
4 changes: 4 additions & 0 deletions src/test/java/com/google/devtools/build/lib/remote/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/exec:spawn_runner",
"//src/main/java/com/google/devtools/build/lib/remote",
"//src/main/java/com/google/devtools/build/lib/remote/common",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/util/io",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
"//src/test/java/com/google/devtools/build/lib/actions/util",
"//third_party:guava",
"//third_party:junit4",
"//third_party:rxjava3",
"//third_party:truth",
"//third_party/protobuf:protobuf_java",
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
Expand Down
Loading

0 comments on commit 7a62c2d

Please sign in to comment.