Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add gRPC-GCP channel pool as an option #1227

Merged
merged 18 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions google-cloud-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@
</build>

<dependencies>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>grpc-gcp</artifactId>
<version>1.0.0</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tend to use the versions specified in java-shared-dependencies. By that, I mean we usually do NOT specify version dependencies (like here), but instead rely on the versions provided by the POM import of that repository.

Could we do the same for this artifact (if it is part of any BOMs specified in the shared dependencies)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a part of any BOMs. If I add it to the java-shared-dependencies will all the projects depending on it automatically get grpc-gcp as a dependency or they must explicitly add it to their dependencies?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They must explicitly add it to their dependencies, but they won't need to specify a version. This makes sure that we are always using compatible library versions.

</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.ServiceRpc;
import com.google.cloud.TransportOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator;
import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator;
import com.google.cloud.spanner.admin.database.v1.DatabaseAdminSettings;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
import com.google.cloud.spanner.admin.instance.v1.InstanceAdminSettings;
Expand Down Expand Up @@ -103,6 +102,8 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final InstanceAdminStubSettings instanceAdminStubSettings;
private final DatabaseAdminStubSettings databaseAdminStubSettings;
private final Duration partitionedDmlTimeout;
private final boolean grpcGcpExtensionEnabled;
private final GcpManagedChannelOptions grpcGcpOptions;
private final boolean autoThrottleAdministrativeRequests;
private final RetrySettings retryAdministrativeRequestsSettings;
private final boolean trackTransactionStarter;
Expand Down Expand Up @@ -554,6 +555,8 @@ private SpannerOptions(Builder builder) {
throw SpannerExceptionFactory.newSpannerException(e);
}
partitionedDmlTimeout = builder.partitionedDmlTimeout;
grpcGcpExtensionEnabled = builder.grpcGcpExtensionEnabled;
grpcGcpOptions = builder.grpcGcpOptions;
autoThrottleAdministrativeRequests = builder.autoThrottleAdministrativeRequests;
retryAdministrativeRequestsSettings = builder.retryAdministrativeRequestsSettings;
trackTransactionStarter = builder.trackTransactionStarter;
Expand Down Expand Up @@ -658,6 +661,8 @@ public static class Builder
private DatabaseAdminStubSettings.Builder databaseAdminStubSettingsBuilder =
DatabaseAdminStubSettings.newBuilder();
private Duration partitionedDmlTimeout = Duration.ofHours(2L);
private boolean grpcGcpExtensionEnabled = false;
private GcpManagedChannelOptions grpcGcpOptions;
private RetrySettings retryAdministrativeRequestsSettings =
DEFAULT_ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS;
private boolean autoThrottleAdministrativeRequests = false;
Expand Down Expand Up @@ -707,6 +712,8 @@ private Builder() {
this.instanceAdminStubSettingsBuilder = options.instanceAdminStubSettings.toBuilder();
this.databaseAdminStubSettingsBuilder = options.databaseAdminStubSettings.toBuilder();
this.partitionedDmlTimeout = options.partitionedDmlTimeout;
this.grpcGcpExtensionEnabled = options.grpcGcpExtensionEnabled;
this.grpcGcpOptions = options.grpcGcpOptions;
this.autoThrottleAdministrativeRequests = options.autoThrottleAdministrativeRequests;
this.retryAdministrativeRequestsSettings = options.retryAdministrativeRequestsSettings;
this.trackTransactionStarter = options.trackTransactionStarter;
Expand Down Expand Up @@ -1035,6 +1042,28 @@ public Builder setHost(String host) {
return this;
}

/** Enables gRPC-GCP extension with the default settings. */
public Builder enableGrpcGcpExtension() {
this.grpcGcpExtensionEnabled = true;
return this;
}

/**
* Enables gRPC-GCP extension and uses provided options for configuration. The metric registry
* and default Spanner metric labels will be added automatically.
*/
public Builder enableGrpcGcpExtension(GcpManagedChannelOptions options) {
this.grpcGcpExtensionEnabled = true;
this.grpcGcpOptions = options;
return this;
}

/** Disables gRPC-GCP extension. */
public Builder disableGrpcGcpExtension() {
this.grpcGcpExtensionEnabled = false;
return this;
}

/**
* Sets the host of an emulator to use. By default the value is read from an environment
* variable. If the environment variable is not set, this will be <code>null</code>.
Expand Down Expand Up @@ -1128,6 +1157,14 @@ public Duration getPartitionedDmlTimeout() {
return partitionedDmlTimeout;
}

public boolean isGrpcGcpExtensionEnabled() {
return grpcGcpExtensionEnabled;
}

public GcpManagedChannelOptions getGrpcGcpOptions() {
return grpcGcpOptions;
}

public boolean isAutoThrottleAdministrativeRequests() {
return autoThrottleAdministrativeRequests;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.core.NanoClock;
Expand Down Expand Up @@ -54,6 +55,9 @@
import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.RetryHelper;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.grpc.GcpManagedChannelBuilder;
import com.google.cloud.grpc.GcpManagedChannelOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.AdminRequestsPerMinuteExceededException;
import com.google.cloud.spanner.ErrorCode;
Expand All @@ -80,6 +84,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Resources;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.iam.v1.GetIamPolicyRequest;
Expand Down Expand Up @@ -156,10 +161,13 @@
import com.google.spanner.v1.Transaction;
import io.grpc.CallCredentials;
import io.grpc.Context;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.opencensus.metrics.Metrics;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -249,6 +257,7 @@ private void awaitTermination() throws InterruptedException {
private static final String CLIENT_LIBRARY_LANGUAGE = "spanner-java";
public static final String DEFAULT_USER_AGENT =
CLIENT_LIBRARY_LANGUAGE + "/" + GaxProperties.getLibraryVersion(GapicSpannerRpc.class);
private static final String API_FILE = "grpc-gcp-apiconfig.json";

private final ManagedInstantiatingExecutorProvider executorProvider;
private boolean rpcIsClosed;
Expand Down Expand Up @@ -368,6 +377,9 @@ public GapicSpannerRpc(final SpannerOptions options) {
// whether the attempt is allowed is totally controlled by service owner.
.setAttemptDirectPath(true);

// If it is enabled in options uses the channel pool provided by the gRPC-GCP extension.
maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options);

TransportChannelProvider channelProvider =
MoreObjects.firstNonNull(
options.getChannelProvider(), defaultChannelProviderBuilder.build());
Expand Down Expand Up @@ -509,6 +521,62 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
}
}

private static String parseGrpcGcpApiConfig() {
try {
return Resources.toString(
GapicSpannerRpc.class.getResource(API_FILE), Charset.forName("UTF8"));
} catch (IOException e) {
throw newSpannerException(e);
}
}

// Enhance metric options for gRPC-GCP extension. Adds metric registry if not specified.
private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions options) {
GcpManagedChannelOptions grpcGcpOptions =
MoreObjects.firstNonNull(options.getGrpcGcpOptions(), new GcpManagedChannelOptions());
GcpMetricsOptions metricsOptions =
MoreObjects.firstNonNull(
grpcGcpOptions.getMetricsOptions(), GcpMetricsOptions.newBuilder().build());
GcpMetricsOptions.Builder metricsOptionsBuilder = GcpMetricsOptions.newBuilder(metricsOptions);
if (metricsOptions.getMetricRegistry() == null) {
metricsOptionsBuilder.withMetricRegistry(Metrics.getMetricRegistry());
}
// TODO: Add default labels with values: client_id, database, instance_id.
nimf marked this conversation as resolved.
Show resolved Hide resolved
if (metricsOptions.getNamePrefix().equals("")) {
metricsOptionsBuilder.withNamePrefix("cloud.google.com/java/spanner/gcp-channel-pool/");
nimf marked this conversation as resolved.
Show resolved Hide resolved
}
return GcpManagedChannelOptions.newBuilder(grpcGcpOptions)
.withMetricsOptions(metricsOptionsBuilder.build())
.build();
}

@SuppressWarnings("rawtypes")
private static void maybeEnableGrpcGcpExtension(
InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder,
final SpannerOptions options) {
if (!options.isGrpcGcpExtensionEnabled()) {
return;
}

final String jsonApiConfig = parseGrpcGcpApiConfig();
final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetrics(options);

ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> apiFunction =
channelBuilder -> {
if (options.getChannelConfigurator() != null) {
channelBuilder = options.getChannelConfigurator().apply(channelBuilder);
}
return GcpManagedChannelBuilder.forDelegateBuilder(channelBuilder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(grpcGcpOptions)
.setPoolSize(options.getNumChannels());
};

// Disable the GAX channel pooling functionality by setting the GAX channel pool size to 1.
// Enable gRPC-GCP channel pool via the channel configurator.
defaultChannelProviderBuilder.setPoolSize(1).setChannelConfigurator(apiFunction);
}

private static HeaderProvider headerProviderWithUserAgentFrom(HeaderProvider headerProvider) {
final Map<String, String> headersWithUserAgent = new HashMap<>(headerProvider.getHeaders());
String userAgent = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
{
"channelPool": {
"maxSize": 3,
nimf marked this conversation as resolved.
Show resolved Hide resolved
"maxConcurrentStreamsLowWatermark": 0
},
"method": [
{
"name": ["google.spanner.v1.Spanner/CreateSession"],
"affinity" : {
"command": "BIND",
"affinityKey": "name"
}
},
{
"name": ["google.spanner.v1.Spanner/BatchCreateSessions"],
nimf marked this conversation as resolved.
Show resolved Hide resolved
nimf marked this conversation as resolved.
Show resolved Hide resolved
"affinity" : {
"command": "BIND",
"affinityKey": "session.name"
}
},
{
"name": ["google.spanner.v1.Spanner/GetSession"],
"affinity": {
"command": "BOUND",
"affinityKey": "name"
}
},
{
"name": ["google.spanner.v1.Spanner/DeleteSession"],
"affinity": {
"command": "UNBIND",
"affinityKey": "name"
}
},
{
"name": ["google.spanner.v1.Spanner/ExecuteSql"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/ExecuteBatchDml"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/ExecuteStreamingSql"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/Read"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/StreamingRead"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/BeginTransaction"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/Commit"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/PartitionRead"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/PartitionQuery"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/Rollback"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
}
]
}
Loading