diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml
index c2ef87ebfd..6c562e5d16 100644
--- a/google-cloud-bigtable/pom.xml
+++ b/google-cloud-bigtable/pom.xml
@@ -87,6 +87,10 @@
com.google.api.grpcproto-google-iam-v1
+
+ com.google.auth
+ google-auth-library-credentials
+ com.google.authgoogle-auth-library-oauth2-http
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java
index 437ebc653b..8dd0fa6d97 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java
@@ -26,6 +26,7 @@
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.common.base.Strings;
import io.grpc.ManagedChannelBuilder;
+import java.util.List;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
@@ -185,11 +186,20 @@ public String getAppProfileId() {
}
/** Gets if channels will gracefully refresh connections to Cloud Bigtable service */
- @BetaApi("This API depends on experimental gRPC APIs")
+ @BetaApi("Channel priming is not currently stable and may change in the future")
public boolean isRefreshingChannel() {
return stubSettings.isRefreshingChannel();
}
+ /**
+ * Gets the table ids that will be used to send warmup requests when {@link
+ * #isRefreshingChannel()} is enabled.
+ */
+ @BetaApi("Channel priming is not currently stable and may change in the future")
+ public List getPrimingTableIds() {
+ return stubSettings.getPrimedTableIds();
+ }
+
/** Returns the underlying RPC settings. */
public EnhancedBigtableStubSettings getStubSettings() {
return stubSettings;
@@ -307,18 +317,40 @@ public CredentialsProvider getCredentialsProvider() {
* connections, which causes the client to renegotiate the gRPC connection in the request path,
* which causes periodic spikes in latency
*/
- @BetaApi("This API depends on experimental gRPC APIs")
+ @BetaApi("Channel priming is not currently stable and may change in the future")
public Builder setRefreshingChannel(boolean isRefreshingChannel) {
stubSettings.setRefreshingChannel(isRefreshingChannel);
return this;
}
/** Gets if channels will gracefully refresh connections to Cloud Bigtable service */
- @BetaApi("This API depends on experimental gRPC APIs")
+ @BetaApi("Channel priming is not currently stable and may change in the future")
public boolean isRefreshingChannel() {
return stubSettings.isRefreshingChannel();
}
+ /**
+ * Configure the tables that can be used to prime a channel during a refresh.
+ *
+ *
These tables work in conjunction with {@link #setRefreshingChannel(boolean)}. When a
+ * channel is refreshed, it will send a request to each table to warm up the serverside caches
+ * before admitting the new channel into the channel pool.
+ */
+ @BetaApi("Channel priming is not currently stable and may change in the future")
+ public Builder setPrimingTableIds(String... tableIds) {
+ stubSettings.setPrimedTableIds(tableIds);
+ return this;
+ }
+
+ /**
+ * Gets the table ids that will be used to send warmup requests when {@link
+ * #setRefreshingChannel(boolean)} is enabled.
+ */
+ @BetaApi("Channel priming is not currently stable and may change in the future")
+ public List getPrimingTableIds() {
+ return stubSettings.getPrimedTableIds();
+ }
+
/**
* Returns the underlying settings for making RPC calls. The settings should be changed with
* care.
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannel.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannel.java
deleted file mode 100644
index e34ecd750d..0000000000
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannel.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright 2019 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.internal;
-
-import com.google.api.core.BetaApi;
-import com.google.api.core.InternalApi;
-import com.google.api.gax.grpc.ChannelPrimer;
-import io.grpc.ConnectivityState;
-import io.grpc.ManagedChannel;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Establish a connection to the Cloud Bigtable service on managedChannel
- *
- *
This class is considered an internal implementation detail and not meant to be used by
- * applications.
- */
-@BetaApi("This API depends on gRPC experimental API")
-@InternalApi
-public final class RefreshChannel implements ChannelPrimer {
-
- /**
- * primeChannel establishes a connection to Cloud Bigtable service. This typically take less than
- * 1s. In case of service failure, an upper limit of 10s prevents primeChannel from looping
- * forever.
- */
- @Override
- public void primeChannel(ManagedChannel managedChannel) {
- for (int i = 0; i < 10; i++) {
- ConnectivityState connectivityState = managedChannel.getState(true);
- if (connectivityState == ConnectivityState.READY) {
- break;
- }
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- break;
- }
- }
- }
-}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java
new file mode 100644
index 0000000000..15be8f7309
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub;
+
+import static com.google.cloud.bigtable.data.v2.models.Filters.FILTERS;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.BetaApi;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.InstantiatingExecutorProvider;
+import com.google.api.gax.grpc.ChannelPrimer;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.auth.Credentials;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import io.grpc.ConnectivityState;
+import io.grpc.ManagedChannel;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+import org.threeten.bp.Duration;
+
+/**
+ * A channel warmer that ensures that a Bigtable channel is ready to be used before being added to
+ * the active {@link com.google.api.gax.grpc.ChannelPool}.
+ *
+ *
This implementation is subject to change in the future, but currently it will prime the
+ * channel by sending a ReadRow request for a hardcoded, non-existent row key.
+ */
+@BetaApi("Channel priming is not currently stable and might change in the future")
+class BigtableChannelPrimer implements ChannelPrimer {
+ private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString());
+
+ static ByteString PRIMING_ROW_KEY = ByteString.copyFromUtf8("nonexistent-priming-row");
+ private static Duration PRIME_REQUEST_TIMEOUT = Duration.ofSeconds(30);
+
+ private final EnhancedBigtableStubSettings settingsTemplate;
+ private final List tableIds;
+
+ static BigtableChannelPrimer create(
+ Credentials credentials,
+ String projectId,
+ String instanceId,
+ String appProfileId,
+ List tableIds) {
+ EnhancedBigtableStubSettings.Builder builder =
+ EnhancedBigtableStubSettings.newBuilder()
+ .setProjectId(projectId)
+ .setInstanceId(instanceId)
+ .setAppProfileId(appProfileId)
+ .setCredentialsProvider(FixedCredentialsProvider.create(credentials))
+ .setExecutorProvider(
+ InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build());
+
+ // Disable retries for priming request
+ builder
+ .readRowSettings()
+ .setRetrySettings(
+ builder
+ .readRowSettings()
+ .getRetrySettings()
+ .toBuilder()
+ .setMaxAttempts(1)
+ .setJittered(false)
+ .setInitialRpcTimeout(PRIME_REQUEST_TIMEOUT)
+ .setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT)
+ .setTotalTimeout(PRIME_REQUEST_TIMEOUT)
+ .build());
+ return new BigtableChannelPrimer(builder.build(), tableIds);
+ }
+
+ private BigtableChannelPrimer(
+ EnhancedBigtableStubSettings settingsTemplate, List tableIds) {
+ Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null");
+ this.settingsTemplate = settingsTemplate;
+ this.tableIds = ImmutableList.copyOf(tableIds);
+ }
+
+ @Override
+ public void primeChannel(ManagedChannel managedChannel) {
+ try {
+ primeChannelUnsafe(managedChannel);
+ } catch (IOException | RuntimeException e) {
+ LOG.warning(
+ String.format("Unexpected error while trying to prime a channel: %s", e.getMessage()));
+ }
+ }
+
+ private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException {
+ if (tableIds.isEmpty()) {
+ waitForChannelReady(managedChannel);
+ } else {
+ sendPrimeRequests(managedChannel);
+ }
+ }
+
+ private void waitForChannelReady(ManagedChannel managedChannel) {
+ for (int i = 0; i < 30; i++) {
+ ConnectivityState connectivityState = managedChannel.getState(true);
+ if (connectivityState == ConnectivityState.READY) {
+ break;
+ }
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+
+ private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException {
+ // Wrap the channel in a temporary stub
+ EnhancedBigtableStubSettings primingSettings =
+ settingsTemplate
+ .toBuilder()
+ .setTransportChannelProvider(
+ FixedTransportChannelProvider.create(GrpcTransportChannel.create(managedChannel)))
+ .build();
+
+ try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) {
+ Map> primeFutures = new HashMap<>();
+
+ // Prime all of the table ids in parallel
+ for (String tableId : tableIds) {
+ ApiFuture f =
+ stub.readRowCallable()
+ .futureCall(Query.create(tableId).rowKey(PRIMING_ROW_KEY).filter(FILTERS.block()));
+
+ primeFutures.put(tableId, f);
+ }
+
+ // Wait for all of the prime requests to complete.
+ for (Map.Entry> entry : primeFutures.entrySet()) {
+ try {
+ entry.getValue().get();
+ } catch (Throwable e) {
+ if (e instanceof ExecutionException) {
+ e = e.getCause();
+ }
+ LOG.warning(
+ String.format(
+ "Failed to prime channel for table: %s: %s", entry.getKey(), e.getMessage()));
+ }
+ }
+ }
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
index 8d9d2fc70c..d729d6244d 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
@@ -20,10 +20,12 @@
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatcherImpl;
import com.google.api.gax.core.BackgroundResource;
+import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcRawCallableFactory;
+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
@@ -38,6 +40,7 @@
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.api.gax.tracing.TracedUnaryCallable;
+import com.google.auth.Credentials;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.CheckAndMutateRowRequest;
import com.google.bigtable.v2.CheckAndMutateRowResponse;
@@ -120,65 +123,93 @@ public class EnhancedBigtableStub implements AutoCloseable {
public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
throws IOException {
- ClientContext clientContext = ClientContext.create(settings);
+ settings = finalizeSettings(settings, Tags.getTagger(), Stats.getStatsRecorder());
- return new EnhancedBigtableStub(
- settings, clientContext, Tags.getTagger(), Stats.getStatsRecorder());
+ return new EnhancedBigtableStub(settings, ClientContext.create(settings));
}
- @InternalApi("Visible for testing")
- public EnhancedBigtableStub(
- EnhancedBigtableStubSettings settings,
- ClientContext clientContext,
- Tagger tagger,
- StatsRecorder statsRecorder) {
- this.settings = settings;
+ public static EnhancedBigtableStubSettings finalizeSettings(
+ EnhancedBigtableStubSettings settings, Tagger tagger, StatsRecorder stats)
+ throws IOException {
+ EnhancedBigtableStubSettings.Builder builder = settings.toBuilder();
+
+ // TODO: this implementation is on the cusp of unwieldy, if we end up adding more features
+ // consider splitting it up by feature.
+
+ // Inject channel priming
+ if (settings.isRefreshingChannel()) {
+ // Fix the credentials so that they can be shared
+ Credentials credentials = null;
+ if (settings.getCredentialsProvider() != null) {
+ credentials = settings.getCredentialsProvider().getCredentials();
+ }
+ builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
+
+ // Inject the primer
+ InstantiatingGrpcChannelProvider transportProvider =
+ (InstantiatingGrpcChannelProvider) settings.getTransportChannelProvider();
+
+ builder.setTransportChannelProvider(
+ transportProvider
+ .toBuilder()
+ .setChannelPrimer(
+ BigtableChannelPrimer.create(
+ credentials,
+ settings.getProjectId(),
+ settings.getInstanceId(),
+ settings.getAppProfileId(),
+ settings.getPrimedTableIds()))
+ .build());
+ }
- this.clientContext =
- clientContext
- .toBuilder()
- .setTracerFactory(
- new CompositeTracerFactory(
- ImmutableList.of(
- // Add OpenCensus Tracing
- new OpencensusTracerFactory(
- ImmutableMap.builder()
- // Annotate traces with the same tags as metrics
- .put(
- RpcMeasureConstants.BIGTABLE_PROJECT_ID.getName(),
- settings.getProjectId())
- .put(
- RpcMeasureConstants.BIGTABLE_INSTANCE_ID.getName(),
- settings.getInstanceId())
- .put(
- RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID.getName(),
- settings.getAppProfileId())
- // Also annotate traces with library versions
- .put("gax", GaxGrpcProperties.getGaxGrpcVersion())
- .put("grpc", GaxGrpcProperties.getGrpcVersion())
- .put(
- "gapic",
- GaxProperties.getLibraryVersion(
- EnhancedBigtableStubSettings.class))
- .build()),
- // Add OpenCensus Metrics
- MetricsTracerFactory.create(
- tagger,
- statsRecorder,
- ImmutableMap.builder()
- .put(
- RpcMeasureConstants.BIGTABLE_PROJECT_ID,
- TagValue.create(settings.getProjectId()))
- .put(
- RpcMeasureConstants.BIGTABLE_INSTANCE_ID,
- TagValue.create(settings.getInstanceId()))
- .put(
- RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
- TagValue.create(settings.getAppProfileId()))
- .build()),
- // Add user configured tracer
- clientContext.getTracerFactory())))
- .build();
+ // Inject Opencensus instrumentation
+ builder.setTracerFactory(
+ new CompositeTracerFactory(
+ ImmutableList.of(
+ // Add OpenCensus Tracing
+ new OpencensusTracerFactory(
+ ImmutableMap.builder()
+ // Annotate traces with the same tags as metrics
+ .put(
+ RpcMeasureConstants.BIGTABLE_PROJECT_ID.getName(),
+ settings.getProjectId())
+ .put(
+ RpcMeasureConstants.BIGTABLE_INSTANCE_ID.getName(),
+ settings.getInstanceId())
+ .put(
+ RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID.getName(),
+ settings.getAppProfileId())
+ // Also annotate traces with library versions
+ .put("gax", GaxGrpcProperties.getGaxGrpcVersion())
+ .put("grpc", GaxGrpcProperties.getGrpcVersion())
+ .put(
+ "gapic",
+ GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class))
+ .build()),
+ // Add OpenCensus Metrics
+ MetricsTracerFactory.create(
+ tagger,
+ stats,
+ ImmutableMap.builder()
+ .put(
+ RpcMeasureConstants.BIGTABLE_PROJECT_ID,
+ TagValue.create(settings.getProjectId()))
+ .put(
+ RpcMeasureConstants.BIGTABLE_INSTANCE_ID,
+ TagValue.create(settings.getInstanceId()))
+ .put(
+ RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
+ TagValue.create(settings.getAppProfileId()))
+ .build()),
+ // Add user configured tracer
+ settings.getTracerFactory())));
+
+ return builder.build();
+ }
+
+ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext clientContext) {
+ this.settings = settings;
+ this.clientContext = clientContext;
this.requestContext =
RequestContext.create(
settings.getProjectId(), settings.getInstanceId(), settings.getAppProfileId());
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
index 1906228a30..d843265d1e 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
@@ -28,7 +28,6 @@
import com.google.api.gax.rpc.StubSettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
-import com.google.cloud.bigtable.data.v2.internal.RefreshChannel;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
@@ -150,6 +149,7 @@ public class EnhancedBigtableStubSettings extends StubSettings primedTableIds;
private final ServerStreamingCallSettings readRowsSettings;
private final UnaryCallSettings readRowSettings;
@@ -188,6 +188,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
instanceId = builder.instanceId;
appProfileId = builder.appProfileId;
isRefreshingChannel = builder.isRefreshingChannel;
+ primedTableIds = builder.primedTableIds;
// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
@@ -226,6 +227,12 @@ public boolean isRefreshingChannel() {
return isRefreshingChannel;
}
+ /** Gets the tables that will be primed during a channel refresh. */
+ @BetaApi("Channel priming is not currently stable and might change in the future")
+ public List getPrimedTableIds() {
+ return primedTableIds;
+ }
+
/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
@@ -483,6 +490,7 @@ public static class Builder extends StubSettings.Builder primedTableIds;
private final ServerStreamingCallSettings.Builder readRowsSettings;
private final UnaryCallSettings.Builder readRowSettings;
@@ -505,6 +513,7 @@ public static class Builder extends StubSettings.BuilderWhen enabled, this will wait for the connection to complete the SSL handshake. The effect
+ * can be enhanced by configuring table ids that can be used warm serverside caches using {@link
+ * #setPrimedTableIds(String...)}.
*
* @see com.google.cloud.bigtable.data.v2.BigtableDataSettings.Builder#setRefreshingChannel
*/
@@ -709,12 +723,25 @@ public Builder setRefreshingChannel(boolean isRefreshingChannel) {
return this;
}
+ /** Configures which tables will be primed when a connection is created. */
+ @BetaApi("Channel priming is not currently stable and might change in the future")
+ public Builder setPrimedTableIds(String... tableIds) {
+ this.primedTableIds = ImmutableList.copyOf(tableIds);
+ return this;
+ }
+
/** Gets if channels will gracefully refresh connections to Cloud Bigtable service */
@BetaApi("This API depends on experimental gRPC APIs")
public boolean isRefreshingChannel() {
return isRefreshingChannel;
}
+ /** Gets the tables that will be primed during a channel refresh. */
+ @BetaApi("Channel priming is not currently stable and might change in the future")
+ public List getPrimedTableIds() {
+ return primedTableIds;
+ }
+
/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder readRowsSettings() {
return readRowsSettings;
@@ -760,17 +787,10 @@ public EnhancedBigtableStubSettings build() {
Preconditions.checkState(projectId != null, "Project id must be set");
Preconditions.checkState(instanceId != null, "Instance id must be set");
- // Set ChannelPrimer on TransportChannelProvider so channels will gracefully refresh
- // connections to Cloud Bigtable service
if (isRefreshingChannel) {
Preconditions.checkArgument(
getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider,
"refreshingChannel only works with InstantiatingGrpcChannelProviders");
- InstantiatingGrpcChannelProvider.Builder channelBuilder =
- ((InstantiatingGrpcChannelProvider) getTransportChannelProvider())
- .toBuilder()
- .setChannelPrimer(new RefreshChannel());
- setTransportChannelProvider(channelBuilder.build());
}
return new EnhancedBigtableStubSettings(this);
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannelTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannelTest.java
deleted file mode 100644
index c41fa4d2a5..0000000000
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RefreshChannelTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2019 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.internal;
-
-import io.grpc.ConnectivityState;
-import io.grpc.ManagedChannel;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-@RunWith(JUnit4.class)
-public class RefreshChannelTest {
- // RefreshChannel should establish connection to the server through managedChannel.getState(true)
- @Test
- public void testGetStateIsCalled() {
- RefreshChannel refreshChannel = new RefreshChannel();
- ManagedChannel managedChannel = Mockito.mock(ManagedChannel.class);
-
- Mockito.doReturn(ConnectivityState.READY).when(managedChannel).getState(true);
-
- refreshChannel.primeChannel(managedChannel);
- Mockito.verify(managedChannel, Mockito.atLeastOnce()).getState(true);
- }
-}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java
new file mode 100644
index 0000000000..42d13a7ab1
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.api.core.ApiFunction;
+import com.google.auth.oauth2.AccessToken;
+import com.google.auth.oauth2.OAuth2Credentials;
+import com.google.bigtable.v2.BigtableGrpc.BigtableImplBase;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.ReadRowsResponse;
+import com.google.bigtable.v2.RowFilter;
+import com.google.bigtable.v2.RowSet;
+import com.google.common.collect.ImmutableList;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Metadata;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.logging.Handler;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+import org.mockito.internal.stubbing.answers.ThrowsException;
+
+@RunWith(JUnit4.class)
+public class BigtableChannelPrimerTest {
+ private static final String TOKEN_VALUE = "fake-token";
+
+ int port;
+ Server server;
+ FakeService fakeService;
+ MetadataInterceptor metadataInterceptor;
+ BigtableChannelPrimer primer;
+ ManagedChannel channel;
+ private LogHandler logHandler;
+
+ @Before
+ public void setup() throws IOException {
+ try (ServerSocket ss = new ServerSocket(0)) {
+ port = ss.getLocalPort();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ fakeService = new FakeService();
+ metadataInterceptor = new MetadataInterceptor();
+ server =
+ ServerBuilder.forPort(port).intercept(metadataInterceptor).addService(fakeService).build();
+ server.start();
+
+ primer =
+ BigtableChannelPrimer.create(
+ OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)),
+ "fake-project",
+ "fake-instance",
+ "fake-app-profile",
+ ImmutableList.of("table1", "table2"));
+
+ channel = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build();
+
+ logHandler = new LogHandler();
+ Logger.getLogger(BigtableChannelPrimer.class.toString()).addHandler(logHandler);
+ }
+
+ @After
+ public void teardown() {
+ Logger.getLogger(BigtableChannelPrimer.class.toString()).removeHandler(logHandler);
+ channel.shutdown();
+ server.shutdown();
+ }
+
+ @Test
+ public void testCredentials() {
+ primer.primeChannel(channel);
+
+ for (Metadata metadata : metadataInterceptor.metadataList) {
+ assertThat(metadata.get(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER)))
+ .isEqualTo("Bearer " + TOKEN_VALUE);
+ }
+ channel.shutdown();
+ }
+
+ @Test
+ public void testRequests() {
+ final Queue requests = new ConcurrentLinkedQueue<>();
+
+ fakeService.readRowsCallback =
+ new ApiFunction() {
+ @Override
+ public ReadRowsResponse apply(ReadRowsRequest req) {
+ requests.add(req);
+ return ReadRowsResponse.getDefaultInstance();
+ }
+ };
+ primer.primeChannel(channel);
+
+ assertThat(requests)
+ .containsExactly(
+ ReadRowsRequest.newBuilder()
+ .setTableName("projects/fake-project/instances/fake-instance/tables/table1")
+ .setAppProfileId("fake-app-profile")
+ .setRows(RowSet.newBuilder().addRowKeys(BigtableChannelPrimer.PRIMING_ROW_KEY))
+ .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build())
+ .setRowsLimit(1)
+ .build(),
+ ReadRowsRequest.newBuilder()
+ .setTableName("projects/fake-project/instances/fake-instance/tables/table2")
+ .setAppProfileId("fake-app-profile")
+ .setRows(RowSet.newBuilder().addRowKeys(BigtableChannelPrimer.PRIMING_ROW_KEY))
+ .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build())
+ .setRowsLimit(1)
+ .build());
+ }
+
+ @Test
+ public void testErrorsAreLogged() {
+ fakeService.readRowsCallback =
+ new ApiFunction() {
+ @Override
+ public ReadRowsResponse apply(ReadRowsRequest req) {
+ throw new StatusRuntimeException(Status.FAILED_PRECONDITION);
+ }
+ };
+ primer.primeChannel(channel);
+
+ assertThat(logHandler.logs).hasSize(2);
+ for (LogRecord log : logHandler.logs) {
+ assertThat(log.getMessage()).contains("FAILED_PRECONDITION");
+ }
+ }
+
+ @Test
+ public void testErrorsAreLoggedForBasic() {
+ BigtableChannelPrimer basicPrimer =
+ BigtableChannelPrimer.create(
+ OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)),
+ "fake-project",
+ "fake-instance",
+ "fake-app-profile",
+ ImmutableList.of());
+
+ ManagedChannel channel =
+ Mockito.mock(
+ ManagedChannel.class, new ThrowsException(new UnsupportedOperationException()));
+ primer.primeChannel(channel);
+
+ assertThat(logHandler.logs).hasSize(1);
+ for (LogRecord log : logHandler.logs) {
+ assertThat(log.getMessage()).contains("Unexpected");
+ }
+ }
+
+ private static class MetadataInterceptor implements ServerInterceptor {
+ ConcurrentLinkedQueue metadataList = new ConcurrentLinkedQueue<>();
+
+ @Override
+ public Listener interceptCall(
+ ServerCall serverCall,
+ Metadata metadata,
+ ServerCallHandler serverCallHandler) {
+ metadataList.add(metadata);
+
+ return serverCallHandler.startCall(serverCall, metadata);
+ }
+ }
+
+ static class FakeService extends BigtableImplBase {
+ private ApiFunction readRowsCallback =
+ new ApiFunction() {
+ @Override
+ public ReadRowsResponse apply(ReadRowsRequest readRowsRequest) {
+ return ReadRowsResponse.getDefaultInstance();
+ }
+ };
+
+ @Override
+ public void readRows(
+ ReadRowsRequest request, StreamObserver responseObserver) {
+
+ try {
+ responseObserver.onNext(readRowsCallback.apply(request));
+ responseObserver.onCompleted();
+ } catch (RuntimeException e) {
+ responseObserver.onError(e);
+ }
+ }
+ }
+
+ private static class LogHandler extends Handler {
+ private ConcurrentLinkedQueue logs = new ConcurrentLinkedQueue<>();
+
+ @Override
+ public void publish(LogRecord record) {
+ logs.add(record);
+ }
+
+ @Override
+ public void flush() {}
+
+ @Override
+ public void close() throws SecurityException {}
+ }
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
index be2d9c2a0f..b823930fb6 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
@@ -18,14 +18,13 @@
import static com.google.common.truth.Truth.assertThat;
import com.google.api.gax.core.NoCredentialsProvider;
-import com.google.api.gax.grpc.testing.InProcessServer;
-import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.v2.RowSet;
import com.google.cloud.bigtable.admin.v2.internal.NameUtil;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
import com.google.cloud.bigtable.data.v2.models.Query;
@@ -34,8 +33,11 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.StringValue;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
+import java.net.ServerSocket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.After;
@@ -49,37 +51,40 @@ public class EnhancedBigtableStubTest {
private static final String PROJECT_ID = "fake-project";
private static final String INSTANCE_ID = "fake-instance";
- private static final String FAKE_HOST_NAME = "fake-stub-host:123";
private static final String TABLE_NAME =
NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, "fake-table");
private static final String APP_PROFILE_ID = "app-profile-id";
- private InProcessServer> server;
+ private Server server;
private FakeDataService fakeDataService;
+ private EnhancedBigtableStubSettings defaultSettings;
private EnhancedBigtableStub enhancedBigtableStub;
@Before
public void setUp() throws IOException, IllegalAccessException, InstantiationException {
+ int port;
+ try (ServerSocket ss = new ServerSocket(0)) {
+ port = ss.getLocalPort();
+ }
fakeDataService = new FakeDataService();
- server = new InProcessServer<>(fakeDataService, FAKE_HOST_NAME);
+ server = ServerBuilder.forPort(port).addService(fakeDataService).build();
server.start();
- EnhancedBigtableStubSettings enhancedBigtableStubSettings =
- EnhancedBigtableStubSettings.newBuilder()
+ defaultSettings =
+ BigtableDataSettings.newBuilderForEmulator(port)
.setProjectId(PROJECT_ID)
.setInstanceId(INSTANCE_ID)
.setAppProfileId(APP_PROFILE_ID)
.setCredentialsProvider(NoCredentialsProvider.create())
- .setEndpoint(FAKE_HOST_NAME)
- .setTransportChannelProvider(LocalChannelProvider.create(FAKE_HOST_NAME))
- .build();
+ .build()
+ .getStubSettings();
- enhancedBigtableStub = EnhancedBigtableStub.create(enhancedBigtableStubSettings);
+ enhancedBigtableStub = EnhancedBigtableStub.create(defaultSettings);
}
@After
public void tearDown() {
- server.stop();
+ server.shutdown();
}
@Test
@@ -117,6 +122,21 @@ public void testCreateReadRowsRawCallable() throws InterruptedException {
assertThat(fakeDataService.popLastRequest()).isEqualTo(expectedRequest2);
}
+ @Test
+ public void testChannelPrimerConfigured() throws IOException {
+ EnhancedBigtableStubSettings settings =
+ defaultSettings
+ .toBuilder()
+ .setRefreshingChannel(true)
+ .setPrimedTableIds("table1", "table2")
+ .build();
+
+ try (EnhancedBigtableStub ignored = EnhancedBigtableStub.create(settings)) {
+ // priming will issue a request per table on startup
+ assertThat(fakeDataService.requests).hasSize(2);
+ }
+ }
+
private static class FakeDataService extends BigtableGrpc.BigtableImplBase {
final BlockingQueue