Skip to content

Commit

Permalink
Handle slow security policies without blocking gRPC threads. (#10633)
Browse files Browse the repository at this point in the history
* Handle slow security policies without blocking gRPC threads.

- Introduce PendingAuthListener to handle a ListenableFuture<Status>, progressing the gRPC through each stage in sequence once the future completes and is OK.
- Move unit tests away from `checkAuthorizationForService` and into `checkAuthorizationForServiceAsync` since that should be the only method called in production now.
- Some tests in `ServerSecurityPolicyTest` had their expectations updated; they previously called synchornous APIs that transformed failed `ListenableFuture<Status>` into one or another status. Now, we call the sync API, so those transformations do not happen anymore, thus the test needs to deal with failed futures directly.
- I couldn't figure out if this PR needs extra tests. AFAICT `BinderSecurityTest` should already cover the new codepaths, but please let me know otherwise.
  • Loading branch information
mateusazis authored Dec 6, 2023
1 parent 4477269 commit a053889
Show file tree
Hide file tree
Showing 6 changed files with 331 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracer
executorServicePool,
streamTracerFactories,
BinderInternal.createPolicyChecker(SecurityPolicies.serverInternalOnly()),
InboundParcelablePolicy.DEFAULT);
InboundParcelablePolicy.DEFAULT,
/* shutdownListener=*/ () -> {});

HostServices.configureService(addr,
HostServices.serviceParamsBuilder()
Expand Down
15 changes: 13 additions & 2 deletions binder/src/main/java/io/grpc/binder/BinderServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@
import io.grpc.internal.ServerImplBuilder;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;

import java.io.Closeable;
import java.io.File;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

import javax.annotation.Nullable;

/**
* Builder for a server that services requests from an Android Service.
*/
Expand Down Expand Up @@ -72,6 +77,7 @@ public static BinderServerBuilder forPort(int port) {
private ServerSecurityPolicy securityPolicy;
private InboundParcelablePolicy inboundParcelablePolicy;
private boolean isBuilt;
@Nullable private BinderTransportSecurity.ShutdownListener shutdownListener = null;

private BinderServerBuilder(
AndroidComponentAddress listenAddress,
Expand All @@ -85,7 +91,9 @@ private BinderServerBuilder(
schedulerPool,
streamTracerFactories,
BinderInternal.createPolicyChecker(securityPolicy),
inboundParcelablePolicy);
inboundParcelablePolicy,
// 'shutdownListener' should have been set by build()
checkNotNull(shutdownListener));
BinderInternal.setIBinder(binderReceiver, server.getHostBinder());
return server;
});
Expand Down Expand Up @@ -171,7 +179,10 @@ public Server build() {
checkState(!isBuilt, "BinderServerBuilder can only be used to build one server instance.");
isBuilt = true;
// We install the security interceptor last, so it's closest to the transport.
BinderTransportSecurity.installAuthInterceptor(this);
ObjectPool<? extends Executor> executorPool = serverImplBuilder.getExecutorPool();
Executor executor = executorPool.getObject();
BinderTransportSecurity.installAuthInterceptor(this, executor);
shutdownListener = () -> executorPool.returnObject(executor);
return super.build();
}
}
10 changes: 9 additions & 1 deletion binder/src/main/java/io/grpc/binder/internal/BinderServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder.
private final LeakSafeOneWayBinder hostServiceBinder;
private final BinderTransportSecurity.ServerPolicyChecker serverPolicyChecker;
private final InboundParcelablePolicy inboundParcelablePolicy;
private final BinderTransportSecurity.ShutdownListener transportSecurityShutdownListener;

@GuardedBy("this")
private ServerListener listener;
Expand All @@ -70,18 +71,24 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder.
@GuardedBy("this")
private boolean shutdown;

/**
* @param transportSecurityShutdownListener represents resources that should be cleaned up once
* the server shuts down.
*/
public BinderServer(
AndroidComponentAddress listenAddress,
ObjectPool<ScheduledExecutorService> executorServicePool,
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
BinderTransportSecurity.ServerPolicyChecker serverPolicyChecker,
InboundParcelablePolicy inboundParcelablePolicy) {
InboundParcelablePolicy inboundParcelablePolicy,
BinderTransportSecurity.ShutdownListener transportSecurityShutdownListener) {
this.listenAddress = listenAddress;
this.executorServicePool = executorServicePool;
this.streamTracerFactories =
ImmutableList.copyOf(checkNotNull(streamTracerFactories, "streamTracerFactories"));
this.serverPolicyChecker = checkNotNull(serverPolicyChecker, "serverPolicyChecker");
this.inboundParcelablePolicy = inboundParcelablePolicy;
this.transportSecurityShutdownListener = transportSecurityShutdownListener;
hostServiceBinder = new LeakSafeOneWayBinder(this);
}

Expand Down Expand Up @@ -125,6 +132,7 @@ public synchronized void shutdown() {
hostServiceBinder.detach();
listener.serverShutdown();
executorService = executorServicePool.returnObject(executorService);
transportSecurityShutdownListener.onServerShutdown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package io.grpc.binder.internal;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

import io.grpc.Attributes;
import io.grpc.Internal;
import io.grpc.Metadata;
Expand All @@ -28,9 +31,12 @@
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.internal.GrpcAttributes;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;

/**
* Manages security for an Android Service hosted gRPC server.
Expand All @@ -49,10 +55,11 @@ private BinderTransportSecurity() {}
* Install a security policy on an about-to-be created server.
*
* @param serverBuilder The ServerBuilder being used to create the server.
* @param executor The executor in which the authorization result will be handled.
*/
@Internal
public static void installAuthInterceptor(ServerBuilder<?> serverBuilder) {
serverBuilder.intercept(new ServerAuthInterceptor());
public static void installAuthInterceptor(ServerBuilder<?> serverBuilder, Executor executor) {
serverBuilder.intercept(new ServerAuthInterceptor(executor));
}

/**
Expand All @@ -78,30 +85,82 @@ public static void attachAuthAttrs(
* Authentication state is fetched from the call attributes, inherited from the transport.
*/
private static final class ServerAuthInterceptor implements ServerInterceptor {

private final Executor executor;

ServerAuthInterceptor(Executor executor) {
this.executor = executor;
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Status authStatus =
ListenableFuture<Status> authStatusFuture =
call.getAttributes()
.get(TRANSPORT_AUTHORIZATION_STATE)
.checkAuthorization(call.getMethodDescriptor());

// Most SecurityPolicy will have synchronous implementations that provide an
// immediately-resolved Future. In that case, short-circuit to avoid unnecessary allocations
// and asynchronous code if the authorization result is already present.
if (!authStatusFuture.isDone()) {
return newServerCallListenerForPendingAuthResult(authStatusFuture, call, headers, next);
}

Status authStatus;
try {
authStatus = Futures.getDone(authStatusFuture);
} catch (ExecutionException e) {
// Failed futures are treated as an internal error rather than a security rejection.
authStatus = Status.INTERNAL.withCause(e);
}

if (authStatus.isOk()) {
return next.startCall(call, headers);
} else {
call.close(authStatus, new Metadata());
return new ServerCall.Listener<ReqT>() {};
}
}

private <ReqT, RespT> ServerCall.Listener<ReqT> newServerCallListenerForPendingAuthResult(
ListenableFuture<Status> authStatusFuture,
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
PendingAuthListener<ReqT, RespT> listener = new PendingAuthListener<>();
Futures.addCallback(
authStatusFuture,
new FutureCallback<Status>() {
@Override
public void onSuccess(Status authStatus) {
if (!authStatus.isOk()) {
call.close(authStatus, new Metadata());
return;
}

listener.startCall(call, headers, next);
}

@Override
public void onFailure(Throwable t) {
call.close(
Status.INTERNAL.withCause(t).withDescription("Authorization future failed"),
new Metadata());
}
}, executor);
return listener;
}
}

/**
* Maintaines the authorization state for a single transport instance. This class lives for the
* Maintains the authorization state for a single transport instance. This class lives for the
* lifetime of a single transport.
*/
private static final class TransportAuthorizationState {
private final int uid;
private final ServerPolicyChecker serverPolicyChecker;
private final ConcurrentHashMap<String, Status> serviceAuthorization;
private final ConcurrentHashMap<String, ListenableFuture<Status>> serviceAuthorization;

TransportAuthorizationState(int uid, ServerPolicyChecker serverPolicyChecker) {
this.uid = uid;
Expand All @@ -111,32 +170,27 @@ private static final class TransportAuthorizationState {

/** Get whether we're authorized to make this call. */
@CheckReturnValue
Status checkAuthorization(MethodDescriptor<?, ?> method) {
ListenableFuture<Status> checkAuthorization(MethodDescriptor<?, ?> method) {
String serviceName = method.getServiceName();
// Only cache decisions if the method can be sampled for tracing,
// which is true for all generated methods. Otherwise, programatically
// created methods could casue this cahe to grow unbounded.
// which is true for all generated methods. Otherwise, programmatically
// created methods could cause this cache to grow unbounded.
boolean useCache = method.isSampledToLocalTracing();
Status authorization;
if (useCache) {
authorization = serviceAuthorization.get(serviceName);
@Nullable ListenableFuture<Status> authorization = serviceAuthorization.get(serviceName);
if (authorization != null) {
return authorization;
}
}
try {
// TODO(10566): provide a synchronous version of "checkAuthorization" to avoid blocking the
// calling thread on the completion of the future.
authorization =
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName).get();
} catch (ExecutionException e) {
// Do not cache this failure since it may be transient.
return Status.fromThrowable(e);
} catch (InterruptedException e) {
// Do not cache this failure since it may be transient.
Thread.currentThread().interrupt();
return Status.CANCELLED.withCause(e);
}
// Under high load, this may trigger a large number of concurrent authorization checks that
// perform essentially the same work and have the potential of exhausting the resources they
// depend on. This was a non-issue in the past with synchronous policy checks due to the
// fixed-size nature of the thread pool this method runs under.
//
// TODO(10669): evaluate if there should be at most a single pending authorization check per
// (uid, serviceName) pair at any given time.
ListenableFuture<Status> authorization =
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName);
if (useCache) {
serviceAuthorization.putIfAbsent(serviceName, authorization);
}
Expand Down Expand Up @@ -167,4 +221,12 @@ public interface ServerPolicyChecker {
*/
ListenableFuture<Status> checkAuthorizationForServiceAsync(int uid, String serviceName);
}

/**
* A listener invoked when the {@link io.grpc.binder.internal.BinderServer} shuts down, allowing
* resources to be potentially cleaned up.
*/
public interface ShutdownListener {
void onServerShutdown();
}
}
102 changes: 102 additions & 0 deletions binder/src/main/java/io/grpc/binder/internal/PendingAuthListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package io.grpc.binder.internal;

import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.Status;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.Nullable;

/**
* A {@link ServerCall.Listener} that can be returned by a {@link io.grpc.ServerInterceptor} to
* asynchronously advance the gRPC pending resolving a possibly asynchronous security policy check.
*/
final class PendingAuthListener<ReqT, RespT> extends ServerCall.Listener<ReqT> {

private final ConcurrentLinkedQueue<ListenerConsumer<ReqT>> pendingSteps =
new ConcurrentLinkedQueue<>();
private final AtomicReference<ServerCall.Listener<ReqT>> delegateRef =
new AtomicReference<>(null);

PendingAuthListener() {}

void startCall(ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
ServerCall.Listener<ReqT> delegate;
try {
delegate = next.startCall(call, headers);
} catch (RuntimeException e) {
call.close(
Status
.INTERNAL
.withCause(e)
.withDescription("Failed to start server call after authorization check"),
new Metadata());
return;
}
delegateRef.set(delegate);
maybeRunPendingSteps();
}

/**
* Runs any enqueued step in this ServerCall listener as long as the authorization check is
* complete. Otherwise, no-op and returns immediately.
*/
private void maybeRunPendingSteps() {
@Nullable ServerCall.Listener<ReqT> delegate = delegateRef.get();
if (delegate == null) {
return;
}

// This section is synchronized so that no 2 threads may attempt to retrieve elements from the
// queue in order but end up executing the steps out of order.
synchronized (this) {
ListenerConsumer<ReqT> nextStep;
while ((nextStep = pendingSteps.poll()) != null) {
nextStep.accept(delegate);
}
}
}

@Override
public void onCancel() {
pendingSteps.offer(ServerCall.Listener::onCancel);
maybeRunPendingSteps();
}

@Override
public void onComplete() {
pendingSteps.offer(ServerCall.Listener::onComplete);
maybeRunPendingSteps();
}

@Override
public void onHalfClose() {
pendingSteps.offer(ServerCall.Listener::onHalfClose);
maybeRunPendingSteps();
}

@Override
public void onMessage(ReqT message) {
pendingSteps.offer(delegate -> delegate.onMessage(message));
maybeRunPendingSteps();
}

@Override
public void onReady() {
pendingSteps.offer(ServerCall.Listener::onReady);
maybeRunPendingSteps();
}

/**
* Similar to Java8's {@link java.util.function.Consumer}, but redeclared in order to support
* Android SDK 21.
*/
private interface ListenerConsumer<ReqT> {
void accept(ServerCall.Listener<ReqT> listener);
}
}
Loading

0 comments on commit a053889

Please sign in to comment.