Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

binder: Dispatch transact() calls on an Executor when FLAG_ONEWAY would not be respected. #8987

Merged
merged 2 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public void socketStats() throws Exception {}
@Override
public void flowControlPushBack() throws Exception {}

@Test
@Ignore("Not yet implemented. See https://github.com/grpc/grpc-java/issues/8931")
@Override
public void serverNotListening() throws Exception {}

@Test
@Ignore("This test isn't appropriate for BinderTransport.")
@Override
Expand Down
90 changes: 35 additions & 55 deletions binder/src/main/java/io/grpc/binder/internal/BinderTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ protected enum TransportState {
@Nullable
protected Status shutdownStatus;

@Nullable private IBinder outgoingBinder;
@Nullable private OneWayBinderProxy outgoingBinder;

private final FlowController flowController;

Expand Down Expand Up @@ -278,10 +278,10 @@ final void setState(TransportState newState) {
}

@GuardedBy("this")
protected boolean setOutgoingBinder(IBinder binder) {
protected boolean setOutgoingBinder(OneWayBinderProxy binder) {
this.outgoingBinder = binder;
try {
binder.linkToDeath(this, 0);
binder.getDelegate().linkToDeath(this, 0);
return true;
} catch (RemoteException re) {
return false;
Expand Down Expand Up @@ -326,39 +326,30 @@ final void sendSetupTransaction() {
}

@GuardedBy("this")
final void sendSetupTransaction(IBinder iBinder) {
Parcel parcel = Parcel.obtain();
try {
parcel.writeInt(WIRE_FORMAT_VERSION);
parcel.writeStrongBinder(incomingBinder);
if (!iBinder.transact(SETUP_TRANSPORT, parcel, null, IBinder.FLAG_ONEWAY)) {
shutdownInternal(
Status.UNAVAILABLE.withDescription("Failed sending SETUP_TRANSPORT transaction"), true);
jdcormie marked this conversation as resolved.
Show resolved Hide resolved
}
final void sendSetupTransaction(OneWayBinderProxy iBinder) {
try (ParcelHolder parcel = ParcelHolder.obtain()) {
parcel.get().writeInt(WIRE_FORMAT_VERSION);
parcel.get().writeStrongBinder(incomingBinder);
iBinder.transact(SETUP_TRANSPORT, parcel);
} catch (RemoteException re) {
shutdownInternal(statusFromRemoteException(re), true);
} finally {
parcel.recycle();
}
}

@GuardedBy("this")
private final void sendShutdownTransaction() {
if (outgoingBinder != null) {
try {
outgoingBinder.unlinkToDeath(this, 0);
outgoingBinder.getDelegate().unlinkToDeath(this, 0);
} catch (NoSuchElementException e) {
// Ignore.
}
Parcel parcel = Parcel.obtain();
try {
try (ParcelHolder parcel = ParcelHolder.obtain()) {
// Send empty flags to avoid a memory leak linked to empty parcels (b/207778694).
parcel.writeInt(0);
outgoingBinder.transact(SHUTDOWN_TRANSPORT, parcel, null, IBinder.FLAG_ONEWAY);
parcel.get().writeInt(0);
outgoingBinder.transact(SHUTDOWN_TRANSPORT, parcel);
} catch (RemoteException re) {
// Ignore.
} finally {
parcel.recycle();
}
}
}
Expand All @@ -369,14 +360,11 @@ protected synchronized void sendPing(int id) throws StatusException {
} else if (outgoingBinder == null) {
throw Status.FAILED_PRECONDITION.withDescription("Transport not ready.").asException();
} else {
Parcel parcel = Parcel.obtain();
try {
parcel.writeInt(id);
outgoingBinder.transact(PING, parcel, null, IBinder.FLAG_ONEWAY);
try (ParcelHolder parcel = ParcelHolder.obtain()) {
parcel.get().writeInt(id);
outgoingBinder.transact(PING, parcel);
} catch (RemoteException re) {
throw statusFromRemoteException(re).asException();
} finally {
parcel.recycle();
}
}
}
Expand All @@ -401,12 +389,10 @@ final void unregisterCall(int callId) {
}
}

final void sendTransaction(int callId, Parcel parcel) throws StatusException {
int dataSize = parcel.dataSize();
final void sendTransaction(int callId, ParcelHolder parcel) throws StatusException {
int dataSize = parcel.get().dataSize();
try {
if (!outgoingBinder.transact(callId, parcel, null, IBinder.FLAG_ONEWAY)) {
throw Status.UNAVAILABLE.withDescription("Failed sending transaction").asException();
}
outgoingBinder.transact(callId, parcel);
} catch (RemoteException re) {
throw statusFromRemoteException(re).asException();
}
Expand All @@ -416,16 +402,13 @@ final void sendTransaction(int callId, Parcel parcel) throws StatusException {
}

final void sendOutOfBandClose(int callId, Status status) {
Parcel parcel = Parcel.obtain();
try {
parcel.writeInt(0); // Placeholder for flags. Will be filled in below.
int flags = TransactionUtils.writeStatus(parcel, status);
TransactionUtils.fillInFlags(parcel, flags | TransactionUtils.FLAG_OUT_OF_BAND_CLOSE);
try (ParcelHolder parcel = ParcelHolder.obtain()) {
parcel.get().writeInt(0); // Placeholder for flags. Will be filled in below.
int flags = TransactionUtils.writeStatus(parcel.get(), status);
TransactionUtils.fillInFlags(parcel.get(), flags | TransactionUtils.FLAG_OUT_OF_BAND_CLOSE);
sendTransaction(callId, parcel);
} catch (StatusException e) {
logger.log(Level.WARNING, "Failed sending oob close transaction", e);
} finally {
parcel.recycle();
}
}

Expand Down Expand Up @@ -496,10 +479,12 @@ protected Inbound<?> createInbound(int callId) {
protected void handleSetupTransport(Parcel parcel) {}

@GuardedBy("this")
private final void handlePing(Parcel parcel) {
private final void handlePing(Parcel requestParcel) {
int id = requestParcel.readInt();
if (transportState == TransportState.READY) {
try {
outgoingBinder.transact(PING_RESPONSE, parcel, null, IBinder.FLAG_ONEWAY);
try (ParcelHolder replyParcel = ParcelHolder.obtain()) {
replyParcel.get().writeInt(id);
outgoingBinder.transact(PING_RESPONSE, replyParcel);
} catch (RemoteException re) {
// Ignore.
}
Expand All @@ -510,21 +495,15 @@ private final void handlePing(Parcel parcel) {
protected void handlePingResponse(Parcel parcel) {}

@GuardedBy("this")
private void sendAcknowledgeBytes(IBinder iBinder) {
private void sendAcknowledgeBytes(OneWayBinderProxy iBinder) {
// Send a transaction to acknowledge reception of incoming data.
long n = numIncomingBytes.get();
acknowledgedIncomingBytes = n;
Parcel parcel = Parcel.obtain();
try {
parcel.writeLong(n);
if (!iBinder.transact(ACKNOWLEDGE_BYTES, parcel, null, IBinder.FLAG_ONEWAY)) {
shutdownInternal(
Status.UNAVAILABLE.withDescription("Failed sending ack bytes transaction"), true);
}
try (ParcelHolder parcel = ParcelHolder.obtain()) {
parcel.get().writeLong(n);
iBinder.transact(ACKNOWLEDGE_BYTES, parcel);
} catch (RemoteException re) {
shutdownInternal(statusFromRemoteException(re), true);
} finally {
parcel.recycle();
}
}

Expand Down Expand Up @@ -607,7 +586,7 @@ void releaseExecutors() {

@Override
public synchronized void onBound(IBinder binder) {
sendSetupTransaction(binder);
sendSetupTransaction(OneWayBinderProxy.wrap(binder, offloadExecutor));
}

@Override
Expand Down Expand Up @@ -748,7 +727,7 @@ private void checkSecurityPolicy(IBinder binder) {
if (inState(TransportState.SETUP)) {
if (!authorization.isOk()) {
shutdownInternal(authorization, true);
} else if (!setOutgoingBinder(binder)) {
} else if (!setOutgoingBinder(OneWayBinderProxy.wrap(binder, offloadExecutor))) {
shutdownInternal(
Status.UNAVAILABLE.withDescription("Failed to observe outgoing binder"), true);
} else {
Expand Down Expand Up @@ -827,7 +806,8 @@ public BinderServerTransport(
IBinder callbackBinder) {
super(executorServicePool, attributes, buildLogId(attributes));
this.streamTracerFactories = streamTracerFactories;
setOutgoingBinder(callbackBinder);
// TODO(jdcormie): Plumb in the Server's executor() and use it here instead.
setOutgoingBinder(OneWayBinderProxy.wrap(callbackBinder, getScheduledExecutorService()));
}

public synchronized void setServerTransportListener(ServerTransportListener serverTransportListener) {
Expand Down
134 changes: 134 additions & 0 deletions binder/src/main/java/io/grpc/binder/internal/OneWayBinderProxy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package io.grpc.binder.internal;

import android.os.Binder;
import android.os.IBinder;
import android.os.Parcel;
import android.os.RemoteException;
import io.grpc.internal.SerializingExecutor;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Wraps an {@link IBinder} with a safe and uniformly asynchronous transaction API.
*
* <p>The android.os.Binder implementation of {@link IBinder} is problematic for clients that want
jdcormie marked this conversation as resolved.
Show resolved Hide resolved
* "oneway" transaction semantics because it implements transact() by invoking onTransact() on the
* caller's thread, even when the {@link IBinder#FLAG_ONEWAY} flag is set. Even though this behavior
* is documented, it's surprising and dangerous. Wrap your {@link IBinder}s with an instance of this
jdcormie marked this conversation as resolved.
Show resolved Hide resolved
* class to ensure the following out-of-process "oneway" semantics are always in effect:
*
* <ul>
* <li>transact() merely enqueues the transaction for processing. It doesn't wait for onTransact()
* to complete.
* <li>transact() may fail for programming errors or transport-layer errors that are immediately
* obvious on the caller's side, but never for an Exception or false return value from
* onTransact().
* <li>onTransact() runs without holding any of the locks held by the thread calling transact().
* <li>onTransact() calls are dispatched one at a time in the same happens-before order as the
* corresponding calls to transact().
* </ul>
*
* <p>NB: One difference that this class can't conceal is that calls to onTransact() are serialized
* per {@link OneWayBinderProxy} instance, not per instance of the wrapped {@link IBinder}. An
* android.os.Binder with in-process callers could still receive concurrent calls to onTransact() on
* different threads if callers used different {@link OneWayBinderProxy} instances or if that Binder
* also had out-of-process callers.
*/
public abstract class OneWayBinderProxy {
private static final Logger logger = Logger.getLogger(OneWayBinderProxy.class.getName());
protected final IBinder delegate;

private OneWayBinderProxy(IBinder iBinder) {
this.delegate = iBinder;
}

/**
* Returns a new instance of {@link OneWayBinderProxy} that wraps {@code iBinder}.
*
* @param iBinder the binder to wrap
* @param executor a non-direct Executor used to dispatch calls to onTransact(), if necessary
* @return a new instance of {@link OneWayBinderProxy}
*/
public static OneWayBinderProxy wrap(IBinder iBinder, Executor executor) {
jdcormie marked this conversation as resolved.
Show resolved Hide resolved
return (iBinder instanceof Binder)
? new InProcessImpl(iBinder, executor)
: new OutOfProcessImpl(iBinder);
}

/**
* Enqueues a transaction for the wrapped {@link IBinder} with guaranteed "oneway" semantics.
*
* <p>NB: Unlike {@link IBinder#transact}, implementations of this method take ownership of the
* {@code data} Parcel. When this method returns, {@code data} will normally be empty, but callers
* should still unconditionally {@link ParcelHolder#close()} it to avoid a leak in case they or
* the implementation throws before ownership is transferred.
*
* @param code identifies the type of this transaction
* @param data a non-empty container of the Parcel to be sent
* @throws RemoteException if the transaction could not even be queued for dispatch on the server.
* Failures from {@link Binder#onTransact} are *never* reported this way.
*/
public abstract void transact(int code, ParcelHolder data) throws RemoteException;

/**
* Returns the wrapped {@link IBinder} for the purpose of calling methods other than {@link
* IBinder#transact(int, Parcel, Parcel, int)}.
*/
public IBinder getDelegate() {
return delegate;
}

static class OutOfProcessImpl extends OneWayBinderProxy {
OutOfProcessImpl(IBinder iBinder) {
super(iBinder);
}

@Override
public void transact(int code, ParcelHolder data) throws RemoteException {
if (!transactAndRecycleParcel(code, data.release())) {
// This cannot happen (see g/android-binder/c/jM4NvS234Rw) but, just in case, let the caller
// handle it along with all the other possible transport-layer errors.
throw new RemoteException("BinderProxy#transact(" + code + ", FLAG_ONEWAY) returned false");
}
}
}

protected boolean transactAndRecycleParcel(int code, Parcel data) throws RemoteException {
try {
return delegate.transact(code, data, null, IBinder.FLAG_ONEWAY);
} finally {
data.recycle();
}
}

static class InProcessImpl extends OneWayBinderProxy {
private final SerializingExecutor executor;

InProcessImpl(IBinder binder, Executor executor) {
super(binder);
this.executor = new SerializingExecutor(executor);
}

@Override
public void transact(int code, ParcelHolder wrappedParcel) {
// Transfer ownership, taking care to handle any RuntimeException from execute().
Parcel parcel = wrappedParcel.get();
executor.execute(
() -> {
try {
if (!transactAndRecycleParcel(code, parcel)) {
// onTransact() in our same process returned this. Ignore it, just like Android
// would have if the android.os.Binder was in another process.
logger.log(Level.FINEST, "A oneway transaction was not understood - ignoring");
}
} catch (Exception e) {
// onTransact() in our same process threw this. Ignore it, just like Android would
// have if the android.os.Binder was in another process.
logger.log(Level.FINEST, "A oneway transaction threw - ignoring", e);
}
});
wrappedParcel.release();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdcormie, @markb74, this release() is suspicious. Should it be done within the executor instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OneWayBinderProxy contract says that callers of transact() must unconditionally call close() on 'wrappedParcel'. If we called release() inside the Runnable, the non-thread-safe wrappedParcel might be accessed concurrently from multiple threads.

The idea here is to transfer ownership to the Runnable as a raw Parcel in an Exception safe way. There are two cases. Either:

  1. execute() returns successfully, the Runnable promises to eventually recycle() 'parcel' in transactAndRecycleParcel(), and we release ownership of 'wrappedParcel' on the very next line.
    OR
  2. execute() throws RejectedExecutionException. In this case, the Runnable never runs and wrappedParcel.release() is never called so the caller of OneWayBinderProxy#transact() will recycle the Parcel when it calls ParcelHolder#close() in its finally.

LMK if you see a problem or have a better suggestion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, release() is not like a close(). It steals the ownership instead of releasing the object. (The holder releases ownership because it is transferring to the caller; the caller is not releasing ownership overall which is what I'm used to with a release method, where release is similar to unref/close.)

I think this is fine then. Thank you!

}
}
}
24 changes: 11 additions & 13 deletions binder/src/main/java/io/grpc/binder/internal/Outbound.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,14 @@ final void send() throws StatusException {
@GuardedBy("this")
@SuppressWarnings("fallthrough")
protected final void sendInternal() throws StatusException {
Parcel parcel = Parcel.obtain();
int flags = 0;
parcel.writeInt(0); // Placeholder for flags. Will be filled in below.
parcel.writeInt(transactionIndex++);
try {
try (ParcelHolder parcel = ParcelHolder.obtain()) {
int flags = 0;
parcel.get().writeInt(0); // Placeholder for flags. Will be filled in below.
parcel.get().writeInt(transactionIndex++);
switch (outboundState) {
case INITIAL:
flags |= TransactionUtils.FLAG_PREFIX;
flags |= writePrefix(parcel);
flags |= writePrefix(parcel.get());
onOutboundState(State.PREFIX_SENT);
if (!messageAvailable() && !suffixReady) {
break;
Expand All @@ -239,7 +238,7 @@ protected final void sendInternal() throws StatusException {
InputStream messageStream = peekNextMessage();
if (messageStream != null) {
flags |= TransactionUtils.FLAG_MESSAGE_DATA;
flags |= writeMessageData(parcel, messageStream);
flags |= writeMessageData(parcel.get(), messageStream);
} else {
checkState(suffixReady);
}
Expand All @@ -252,20 +251,19 @@ protected final void sendInternal() throws StatusException {
// Fall-through.
case ALL_MESSAGES_SENT:
flags |= TransactionUtils.FLAG_SUFFIX;
flags |= writeSuffix(parcel);
flags |= writeSuffix(parcel.get());
onOutboundState(State.SUFFIX_SENT);
break;
default:
throw new AssertionError();
}
TransactionUtils.fillInFlags(parcel, flags);
TransactionUtils.fillInFlags(parcel.get(), flags);
int dataSize = parcel.get().dataSize();
transport.sendTransaction(callId, parcel);
statsTraceContext.outboundWireSize(parcel.dataSize());
statsTraceContext.outboundUncompressedSize(parcel.dataSize());
statsTraceContext.outboundWireSize(dataSize);
statsTraceContext.outboundUncompressedSize(dataSize);
} catch (IOException e) {
throw Status.INTERNAL.withCause(e).asException();
} finally {
parcel.recycle();
}
}

Expand Down
Loading