Skip to content

Commit

Permalink
apacheGH-40745: [Java][FlightRPC] Support configuring backpressure th…
Browse files Browse the repository at this point in the history
…reshold (apache#41051)

### Rationale for this change

gRPC uses a default backpressure threshold that is too low for services that send large amounts of data such as Arrow Flight. This causes excessive blocking and reduces throughput.

### What changes are included in this PR?

* Update to grpc-java 1.63.0
* Add to FlightServer.Builder an option to set the number of bytes queued before blocking due to backpressure. Set the default to 10MB instead of gRPC's default of 64K.
* Add a ServerInterceptor for automating setting the backpressure threshold on ServerCalls.

### Are these changes tested?

Tested through existing unit tests.

### Are there any user-facing changes?

The FlightServer.Builder class has an extra configuration option to let users change the backpressure threshold themselves.

* GitHub Issue: apache#40745

Authored-by: James Duong <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
jduo authored and verma-kartik committed Apr 11, 2024
1 parent 8aee618 commit ee2ca5a
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.arrow.flight.auth2.Auth2Constants;
import org.apache.arrow.flight.auth2.CallHeaderAuthenticator;
import org.apache.arrow.flight.auth2.ServerCallHeaderAuthMiddleware;
import org.apache.arrow.flight.grpc.ServerBackpressureThresholdInterceptor;
import org.apache.arrow.flight.grpc.ServerInterceptorAdapter;
import org.apache.arrow.flight.grpc.ServerInterceptorAdapter.KeyFactory;
import org.apache.arrow.memory.BufferAllocator;
Expand Down Expand Up @@ -79,6 +80,9 @@ public class FlightServer implements AutoCloseable {
/** The maximum size of an individual gRPC message. This effectively disables the limit. */
static final int MAX_GRPC_MESSAGE_SIZE = Integer.MAX_VALUE;

/** The default number of bytes that can be queued on an output stream before blocking. */
public static final int DEFAULT_BACKPRESSURE_THRESHOLD = 10 * 1024 * 1024; // 10MB

/** Create a new instance from a gRPC server. For internal use only. */
private FlightServer(Location location, Server server, ExecutorService grpcExecutor) {
this.location = location;
Expand Down Expand Up @@ -179,6 +183,7 @@ public static final class Builder {
private CallHeaderAuthenticator headerAuthenticator = CallHeaderAuthenticator.NO_OP;
private ExecutorService executor = null;
private int maxInboundMessageSize = MAX_GRPC_MESSAGE_SIZE;
private int backpressureThreshold = DEFAULT_BACKPRESSURE_THRESHOLD;
private InputStream certChain;
private InputStream key;
private InputStream mTlsCACert;
Expand Down Expand Up @@ -300,6 +305,7 @@ public FlightServer build() {
.addService(
ServerInterceptors.intercept(
flightService,
new ServerBackpressureThresholdInterceptor(backpressureThreshold),
new ServerAuthInterceptor(authHandler)));

// Allow hooking into the gRPC builder. This is not guaranteed to be available on all Arrow versions or
Expand Down Expand Up @@ -336,6 +342,15 @@ public Builder maxInboundMessageSize(int maxMessageSize) {
return this;
}

/**
* Set the number of bytes that may be queued on a server output stream before writes are blocked.
*/
public Builder backpressureThreshold(int backpressureThreshold) {
Preconditions.checkArgument(backpressureThreshold > 0);
this.backpressureThreshold = backpressureThreshold;
return this;
}

/**
* A small utility function to ensure that InputStream attributes.
* are closed if they are not null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.flight.grpc;

import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;

/**
* An interceptor for specifying the number of bytes that can be queued before a call with an output stream
* gets blocked by backpressure.
*/
public class ServerBackpressureThresholdInterceptor implements ServerInterceptor {

private final int numBytes;

public ServerBackpressureThresholdInterceptor(int numBytes) {
this.numBytes = numBytes;
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
call.setOnReadyThreshold(numBytes);
return next.startCall(call, headers);
}
}
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
<dep.slf4j.version>2.0.11</dep.slf4j.version>
<dep.guava-bom.version>33.0.0-jre</dep.guava-bom.version>
<dep.netty-bom.version>4.1.108.Final</dep.netty-bom.version>
<dep.grpc-bom.version>1.62.2</dep.grpc-bom.version>
<dep.grpc-bom.version>1.63.0</dep.grpc-bom.version>
<dep.protobuf-bom.version>3.23.1</dep.protobuf-bom.version>
<dep.jackson-bom.version>2.17.0</dep.jackson-bom.version>
<dep.hadoop.version>3.4.0</dep.hadoop.version>
Expand Down

0 comments on commit ee2ca5a

Please sign in to comment.