diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java index edde257527..0ee0d8b24a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java @@ -215,10 +215,10 @@ public boolean isRefreshingChannel() { } /** - * Gets the table ids that will be used to send warmup requests when {@link - * #isRefreshingChannel()} is enabled. + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. */ - @BetaApi("Channel priming is not currently stable and may change in the future") + @Deprecated public List getPrimingTableIds() { return stubSettings.getPrimedTableIds(); } @@ -377,23 +377,20 @@ public boolean isRefreshingChannel() { } /** - * Configure the tables that can be used to prime a channel during a refresh. - * - *

These tables work in conjunction with {@link #setRefreshingChannel(boolean)}. When a - * channel is refreshed, it will send a request to each table to warm up the serverside caches - * before admitting the new channel into the channel pool. + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. */ - @BetaApi("Channel priming is not currently stable and may change in the future") + @Deprecated public Builder setPrimingTableIds(String... tableIds) { stubSettings.setPrimedTableIds(tableIds); return this; } /** - * Gets the table ids that will be used to send warmup requests when {@link - * #setRefreshingChannel(boolean)} is enabled. + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. */ - @BetaApi("Channel priming is not currently stable and may change in the future") + @Deprecated public List getPrimingTableIds() { return stubSettings.getPrimedTableIds(); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java index 93f2cf3ef0..303f52a8e2 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -15,7 +15,6 @@ */ package com.google.cloud.bigtable.data.v2.stub; -import com.google.api.core.ApiFuture; import com.google.api.core.BetaApi; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; @@ -23,25 +22,13 @@ import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.auth.Credentials; -import com.google.bigtable.v2.ReadRowsRequest; -import com.google.bigtable.v2.RowFilter; -import com.google.bigtable.v2.RowSet; -import com.google.bigtable.v2.TableName; -import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter; -import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.bigtable.v2.PingAndWarmRequest; +import com.google.cloud.bigtable.data.v2.internal.NameUtil; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.protobuf.ByteString; -import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.logging.Logger; -import org.threeten.bp.Duration; /** * A channel warmer that ensures that a Bigtable channel is ready to be used before being added to @@ -54,18 +41,10 @@ class BigtableChannelPrimer implements ChannelPrimer { private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString()); - static ByteString PRIMING_ROW_KEY = ByteString.copyFromUtf8("nonexistent-priming-row"); - private static Duration PRIME_REQUEST_TIMEOUT = Duration.ofSeconds(30); - private final EnhancedBigtableStubSettings settingsTemplate; - private final List tableIds; static BigtableChannelPrimer create( - Credentials credentials, - String projectId, - String instanceId, - String appProfileId, - List tableIds) { + Credentials credentials, String projectId, String instanceId, String appProfileId) { EnhancedBigtableStubSettings.Builder builder = EnhancedBigtableStubSettings.newBuilder() .setProjectId(projectId) @@ -75,28 +54,12 @@ static BigtableChannelPrimer create( .setExecutorProvider( InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build()); - // Disable retries for priming request - builder - .readRowSettings() - .setRetrySettings( - builder - .readRowSettings() - .getRetrySettings() - .toBuilder() - .setMaxAttempts(1) - .setJittered(false) - .setInitialRpcTimeout(PRIME_REQUEST_TIMEOUT) - .setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT) - .setTotalTimeout(PRIME_REQUEST_TIMEOUT) - .build()); - return new BigtableChannelPrimer(builder.build(), tableIds); + return new BigtableChannelPrimer(builder.build()); } - private BigtableChannelPrimer( - EnhancedBigtableStubSettings settingsTemplate, List tableIds) { + private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate) { Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null"); this.settingsTemplate = settingsTemplate; - this.tableIds = ImmutableList.copyOf(tableIds); } @Override @@ -110,25 +73,7 @@ public void primeChannel(ManagedChannel managedChannel) { } private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException { - if (tableIds.isEmpty()) { - waitForChannelReady(managedChannel); - } else { - sendPrimeRequests(managedChannel); - } - } - - private void waitForChannelReady(ManagedChannel managedChannel) { - for (int i = 0; i < 30; i++) { - ConnectivityState connectivityState = managedChannel.getState(true); - if (connectivityState == ConnectivityState.READY) { - break; - } - try { - TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e) { - break; - } - } + sendPrimeRequests(managedChannel); } private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException { @@ -141,41 +86,24 @@ private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException .build(); try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) { - Map> primeFutures = new HashMap<>(); - - // Prime all of the table ids in parallel - for (String tableId : tableIds) { - ApiFuture f = - stub.createReadRowsRawCallable(new DefaultRowAdapter()) - .first() - .futureCall( - ReadRowsRequest.newBuilder() - .setTableName( - TableName.format( - primingSettings.getProjectId(), - primingSettings.getInstanceId(), - tableId)) - .setAppProfileId(primingSettings.getAppProfileId()) - .setRows(RowSet.newBuilder().addRowKeys(PRIMING_ROW_KEY).build()) - .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) - .setRowsLimit(1) - .build()); + PingAndWarmRequest request = + PingAndWarmRequest.newBuilder() + .setName( + NameUtil.formatInstanceName( + primingSettings.getProjectId(), primingSettings.getInstanceId())) + .setAppProfileId(primingSettings.getAppProfileId()) + .build(); - primeFutures.put(tableId, f); - } - - // Wait for all of the prime requests to complete. - for (Map.Entry> entry : primeFutures.entrySet()) { - try { - entry.getValue().get(); - } catch (Throwable e) { - if (e instanceof ExecutionException) { - e = e.getCause(); - } - LOG.warning( - String.format( - "Failed to prime channel for table: %s: %s", entry.getKey(), e.getMessage())); + try { + stub.pingAndWarmCallable().call(request); + } catch (Throwable e) { + // TODO: Not sure if we should swallow the error here. We are pre-emptively swapping + // channels if the new + // channel is bad. + if (e instanceof ExecutionException) { + e = e.getCause(); } + LOG.warning(String.format("Failed to prime channel: %s", e)); } } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index ec237aabf7..1550127e23 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -51,6 +51,8 @@ import com.google.bigtable.v2.MutateRowResponse; import com.google.bigtable.v2.MutateRowsRequest; import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.PingAndWarmRequest; +import com.google.bigtable.v2.PingAndWarmResponse; import com.google.bigtable.v2.ReadModifyWriteRowRequest; import com.google.bigtable.v2.ReadModifyWriteRowResponse; import com.google.bigtable.v2.ReadRowsRequest; @@ -104,6 +106,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -141,6 +144,7 @@ public class EnhancedBigtableStub implements AutoCloseable { private final UnaryCallable bulkMutateRowsCallable; private final UnaryCallable checkAndMutateRowCallable; private final UnaryCallable readModifyWriteRowCallable; + private final UnaryCallable pingAndWarmCallable; public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) throws IOException { @@ -181,8 +185,7 @@ public static EnhancedBigtableStubSettings finalizeSettings( credentials, settings.getProjectId(), settings.getInstanceId(), - settings.getAppProfileId(), - settings.getPrimedTableIds())) + settings.getAppProfileId())) .build()); } @@ -284,6 +287,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext bulkMutateRowsCallable = createBulkMutateRowsCallable(); checkAndMutateRowCallable = createCheckAndMutateRowCallable(); readModifyWriteRowCallable = createReadModifyWriteRowCallable(); + pingAndWarmCallable = createPingAndWarmCallable(); } // @@ -810,6 +814,25 @@ private UnaryCallable createUserFacin return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } + + private UnaryCallable createPingAndWarmCallable() { + UnaryCallable pingAndWarm = + GrpcRawCallableFactory.createUnaryCallable( + GrpcCallSettings.newBuilder() + .setMethodDescriptor(BigtableGrpc.getPingAndWarmMethod()) + .setParamsExtractor( + new RequestParamsExtractor() { + @Override + public Map extract(PingAndWarmRequest request) { + return ImmutableMap.of( + "name", request.getName(), + "app_profile_id", request.getAppProfileId()); + } + }) + .build(), + Collections.emptySet()); + return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext()); + } // // @@ -854,6 +877,10 @@ public UnaryCallable checkAndMutateRowCallable( public UnaryCallable readModifyWriteRowCallable() { return readModifyWriteRowCallable; } + + UnaryCallable pingAndWarmCallable() { + return pingAndWarmCallable; + } // private SpanName getSpanName(String methodName) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 395ba52b08..f2ae486d9e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -33,6 +33,7 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.auth.Credentials; +import com.google.bigtable.v2.PingAndWarmRequest; import com.google.cloud.bigtable.Version; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.KeyOffset; @@ -113,6 +114,9 @@ public class EnhancedBigtableStubSettings extends StubSettings READ_ROWS_RETRY_CODES = ImmutableSet.builder().addAll(IDEMPOTENT_RETRY_CODES).add(Code.ABORTED).build(); + // Priming request should have a shorter timeout + private static Duration PRIME_REQUEST_TIMEOUT = Duration.ofSeconds(30); + private static final RetrySettings READ_ROWS_RETRY_SETTINGS = RetrySettings.newBuilder() .setInitialRetryDelay(Duration.ofMillis(10)) @@ -173,6 +177,7 @@ public class EnhancedBigtableStubSettings extends StubSettings checkAndMutateRowSettings; private final UnaryCallSettings readModifyWriteRowSettings; + private final UnaryCallSettings pingAndWarmSettings; private EnhancedBigtableStubSettings(Builder builder) { super(builder); @@ -208,6 +213,7 @@ private EnhancedBigtableStubSettings(Builder builder) { bulkReadRowsSettings = builder.bulkReadRowsSettings.build(); checkAndMutateRowSettings = builder.checkAndMutateRowSettings.build(); readModifyWriteRowSettings = builder.readModifyWriteRowSettings.build(); + pingAndWarmSettings = builder.pingAndWarmSettings.build(); } /** Create a new builder. */ @@ -236,8 +242,11 @@ public boolean isRefreshingChannel() { return isRefreshingChannel; } - /** Gets the tables that will be primed during a channel refresh. */ - @BetaApi("Channel priming is not currently stable and might change in the future") + /** + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. + */ + @Deprecated public List getPrimedTableIds() { return primedTableIds; } @@ -491,6 +500,15 @@ public UnaryCallSettings readModifyWriteRowSettings() { return readModifyWriteRowSettings; } + /** + * Returns the object with the settings used for calls to PingAndWarm. + * + *

By default the retries are disabled for PingAndWarm and deadline is set to 30 seconds. + */ + UnaryCallSettings pingAndWarmSettings() { + return pingAndWarmSettings; + } + /** Returns a builder containing all the values of this settings class. */ public Builder toBuilder() { return new Builder(this); @@ -515,6 +533,7 @@ public static class Builder extends StubSettings.Builder checkAndMutateRowSettings; private final UnaryCallSettings.Builder readModifyWriteRowSettings; + private final UnaryCallSettings.Builder pingAndWarmSettings; /** * Initializes a new Builder with sane defaults for all settings. @@ -626,6 +645,15 @@ private Builder() { readModifyWriteRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); copyRetrySettings(baseDefaults.readModifyWriteRowSettings(), readModifyWriteRowSettings); + + pingAndWarmSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); + pingAndWarmSettings.setRetrySettings( + RetrySettings.newBuilder() + .setMaxAttempts(1) + .setInitialRpcTimeout(PRIME_REQUEST_TIMEOUT) + .setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT) + .setTotalTimeout(PRIME_REQUEST_TIMEOUT) + .build()); } private Builder(EnhancedBigtableStubSettings settings) { @@ -646,6 +674,7 @@ private Builder(EnhancedBigtableStubSettings settings) { bulkReadRowsSettings = settings.bulkReadRowsSettings.toBuilder(); checkAndMutateRowSettings = settings.checkAndMutateRowSettings.toBuilder(); readModifyWriteRowSettings = settings.readModifyWriteRowSettings.toBuilder(); + pingAndWarmSettings = settings.pingAndWarmSettings.toBuilder(); } // @@ -727,9 +756,8 @@ public String getAppProfileId() { /** * Sets if channels will gracefully refresh connections to Cloud Bigtable service. * - *

When enabled, this will wait for the connection to complete the SSL handshake. The effect - * can be enhanced by configuring table ids that can be used warm serverside caches using {@link - * #setPrimedTableIds(String...)}. + *

When enabled, this will wait for the connection to complete the SSL handshake and warm up + * serverside caches for all the tables of the instance. * * @see com.google.cloud.bigtable.data.v2.BigtableDataSettings.Builder#setRefreshingChannel */ @@ -739,8 +767,11 @@ public Builder setRefreshingChannel(boolean isRefreshingChannel) { return this; } - /** Configures which tables will be primed when a connection is created. */ - @BetaApi("Channel priming is not currently stable and might change in the future") + /** + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. + */ + @Deprecated public Builder setPrimedTableIds(String... tableIds) { this.primedTableIds = ImmutableList.copyOf(tableIds); return this; @@ -752,8 +783,11 @@ public boolean isRefreshingChannel() { return isRefreshingChannel; } - /** Gets the tables that will be primed during a channel refresh. */ - @BetaApi("Channel priming is not currently stable and might change in the future") + /** + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. + */ + @Deprecated public List getPrimedTableIds() { return primedTableIds; } @@ -809,6 +843,11 @@ public UnaryCallSettings.Builder readModifyWriteRowSett return readModifyWriteRowSettings; } + /** Returns the builder with the settings used for calls to PingAndWarm. */ + public UnaryCallSettings.Builder pingAndWarmSettings() { + return pingAndWarmSettings; + } + @SuppressWarnings("unchecked") public EnhancedBigtableStubSettings build() { Preconditions.checkState(projectId != null, "Project id must be set"); @@ -831,8 +870,7 @@ public EnhancedBigtableStubSettings build() { // Use shared credentials this.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); channelProviderBuilder.setChannelPrimer( - BigtableChannelPrimer.create( - credentials, projectId, instanceId, appProfileId, primedTableIds)); + BigtableChannelPrimer.create(credentials, projectId, instanceId, appProfileId)); this.setTransportChannelProvider(channelProviderBuilder.build()); } return new EnhancedBigtableStubSettings(this); @@ -857,6 +895,7 @@ public String toString() { .add("bulkReadRowsSettings", bulkReadRowsSettings) .add("checkAndMutateRowSettings", checkAndMutateRowSettings) .add("readModifyWriteRowSettings", readModifyWriteRowSettings) + .add("pingAndWarmSettings", pingAndWarmSettings) .add("parent", super.toString()) .toString(); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java index 67d4183b8b..ebda860851 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java @@ -26,16 +26,16 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.WatchdogProvider; import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.InstanceName; import com.google.bigtable.v2.MutateRowRequest; import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.PingAndWarmRequest; +import com.google.bigtable.v2.PingAndWarmResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; -import com.google.bigtable.v2.RowFilter; -import com.google.bigtable.v2.RowSet; import com.google.cloud.bigtable.data.v2.internal.NameUtil; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; import io.grpc.Attributes; import io.grpc.Server; import io.grpc.ServerTransportFilter; @@ -224,14 +224,12 @@ public void testCreateForInstanceWithAppProfileHasCorrectSettings() throws Excep @Test public void testCreateWithRefreshingChannel() throws Exception { - String[] tableIds = {"fake-table1", "fake-table2"}; int poolSize = 3; BigtableDataSettings.Builder builder = BigtableDataSettings.newBuilderForEmulator(server.getPort()) .setProjectId(DEFAULT_PROJECT_ID) .setInstanceId(DEFAULT_INSTANCE_ID) .setAppProfileId(DEFAULT_APP_PROFILE_ID) - .setPrimingTableIds(tableIds) .setRefreshingChannel(true); builder .stubSettings() @@ -258,26 +256,17 @@ public void testCreateWithRefreshingChannel() throws Exception { assertThat(setUpAttributes).hasSize(poolSize); // Make sure that prime requests were sent only once per table per connection - assertThat(service.readRowsRequests).hasSize(poolSize * tableIds.length); - List expectedRequests = new LinkedList<>(); - for (String tableId : tableIds) { - for (int i = 0; i < poolSize; i++) { - expectedRequests.add( - ReadRowsRequest.newBuilder() - .setTableName( - String.format( - "projects/%s/instances/%s/tables/%s", - DEFAULT_PROJECT_ID, DEFAULT_INSTANCE_ID, tableId)) - .setAppProfileId(DEFAULT_APP_PROFILE_ID) - .setRows( - RowSet.newBuilder() - .addRowKeys(ByteString.copyFromUtf8("nonexistent-priming-row"))) - .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) - .setRowsLimit(1) - .build()); - } + assertThat(service.pingAndWarmRequests).hasSize(poolSize); + List expectedRequests = new LinkedList<>(); + for (int i = 0; i < poolSize; i++) { + expectedRequests.add( + PingAndWarmRequest.newBuilder() + .setName(InstanceName.format(DEFAULT_PROJECT_ID, DEFAULT_INSTANCE_ID)) + .setAppProfileId(DEFAULT_APP_PROFILE_ID) + .build()); } - assertThat(service.readRowsRequests).containsExactly(expectedRequests.toArray()); + + assertThat(service.pingAndWarmRequests).containsExactly(expectedRequests.toArray()); // Wait for all the connections to close asynchronously factory.close(); @@ -316,6 +305,7 @@ private static class FakeBigtableService extends BigtableGrpc.BigtableImplBase { volatile MutateRowRequest lastRequest; BlockingQueue readRowsRequests = new LinkedBlockingDeque<>(); + BlockingQueue pingAndWarmRequests = new LinkedBlockingDeque<>(); private ApiFunction readRowsCallback = new ApiFunction() { @Override @@ -324,6 +314,14 @@ public ReadRowsResponse apply(ReadRowsRequest readRowsRequest) { } }; + private ApiFunction pingAndWarmCallback = + new ApiFunction() { + @Override + public PingAndWarmResponse apply(PingAndWarmRequest pingAndWarmRequest) { + return PingAndWarmResponse.getDefaultInstance(); + } + }; + @Override public void readRows( ReadRowsRequest request, StreamObserver responseObserver) { @@ -343,6 +341,14 @@ public void mutateRow( responseObserver.onNext(MutateRowResponse.getDefaultInstance()); responseObserver.onCompleted(); } + + @Override + public void pingAndWarm( + PingAndWarmRequest request, StreamObserver responseObserver) { + pingAndWarmRequests.add(request); + responseObserver.onNext(pingAndWarmCallback.apply(request)); + responseObserver.onCompleted(); + } } private static class BuilderAnswer implements Answer { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java index 7195e4105f..e1f22bebbd 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java @@ -21,12 +21,9 @@ import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.OAuth2Credentials; import com.google.bigtable.v2.BigtableGrpc.BigtableImplBase; -import com.google.bigtable.v2.ReadRowsRequest; -import com.google.bigtable.v2.ReadRowsResponse; -import com.google.bigtable.v2.RowFilter; -import com.google.bigtable.v2.RowSet; +import com.google.bigtable.v2.PingAndWarmRequest; +import com.google.bigtable.v2.PingAndWarmResponse; import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; -import com.google.common.collect.ImmutableList; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; @@ -75,8 +72,7 @@ public void setup() throws IOException { OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)), "fake-project", "fake-instance", - "fake-app-profile", - ImmutableList.of("table1", "table2")); + "fake-app-profile"); channel = ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build(); @@ -104,63 +100,45 @@ public void testCredentials() { @Test public void testRequests() { - final Queue requests = new ConcurrentLinkedQueue<>(); + final Queue requests = new ConcurrentLinkedQueue<>(); - fakeService.readRowsCallback = - new ApiFunction() { + fakeService.pingAndWarmCallback = + new ApiFunction() { @Override - public ReadRowsResponse apply(ReadRowsRequest req) { + public PingAndWarmResponse apply(PingAndWarmRequest req) { requests.add(req); - return ReadRowsResponse.getDefaultInstance(); + return PingAndWarmResponse.getDefaultInstance(); } }; primer.primeChannel(channel); assertThat(requests) .containsExactly( - ReadRowsRequest.newBuilder() - .setTableName("projects/fake-project/instances/fake-instance/tables/table1") + PingAndWarmRequest.newBuilder() + .setName("projects/fake-project/instances/fake-instance") .setAppProfileId("fake-app-profile") - .setRows(RowSet.newBuilder().addRowKeys(BigtableChannelPrimer.PRIMING_ROW_KEY)) - .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) - .setRowsLimit(1) - .build(), - ReadRowsRequest.newBuilder() - .setTableName("projects/fake-project/instances/fake-instance/tables/table2") - .setAppProfileId("fake-app-profile") - .setRows(RowSet.newBuilder().addRowKeys(BigtableChannelPrimer.PRIMING_ROW_KEY)) - .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) - .setRowsLimit(1) .build()); } @Test public void testErrorsAreLogged() { - fakeService.readRowsCallback = - new ApiFunction() { + fakeService.pingAndWarmCallback = + new ApiFunction() { @Override - public ReadRowsResponse apply(ReadRowsRequest req) { + public PingAndWarmResponse apply(PingAndWarmRequest pingAndWarmRequest) { throw new StatusRuntimeException(Status.FAILED_PRECONDITION); } }; primer.primeChannel(channel); - assertThat(logHandler.logs).hasSize(2); + assertThat(logHandler.logs).hasSize(1); for (LogRecord log : logHandler.logs) { assertThat(log.getMessage()).contains("FAILED_PRECONDITION"); } } @Test - public void testErrorsAreLoggedForBasic() { - BigtableChannelPrimer basicPrimer = - BigtableChannelPrimer.create( - OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)), - "fake-project", - "fake-instance", - "fake-app-profile", - ImmutableList.of()); - + public void testChannelErrorsAreLogged() { ManagedChannel channel = Mockito.mock( ManagedChannel.class, new ThrowsException(new UnsupportedOperationException())); @@ -168,7 +146,7 @@ public void testErrorsAreLoggedForBasic() { assertThat(logHandler.logs).hasSize(1); for (LogRecord log : logHandler.logs) { - assertThat(log.getMessage()).contains("Unexpected"); + assertThat(log.getMessage()).contains("UnsupportedOperationException"); } } @@ -187,20 +165,19 @@ public Listener interceptCall( } static class FakeService extends BigtableImplBase { - private ApiFunction readRowsCallback = - new ApiFunction() { + private ApiFunction pingAndWarmCallback = + new ApiFunction() { @Override - public ReadRowsResponse apply(ReadRowsRequest readRowsRequest) { - return ReadRowsResponse.getDefaultInstance(); + public PingAndWarmResponse apply(PingAndWarmRequest pingAndWarmRequest) { + return PingAndWarmResponse.getDefaultInstance(); } }; @Override - public void readRows( - ReadRowsRequest request, StreamObserver responseObserver) { - + public void pingAndWarm( + PingAndWarmRequest request, StreamObserver responseObserver) { try { - responseObserver.onNext(readRowsCallback.apply(request)); + responseObserver.onNext(pingAndWarmCallback.apply(request)); responseObserver.onCompleted(); } catch (RuntimeException e) { responseObserver.onError(e); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java index c4e5ea2e40..d5a22884b6 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java @@ -28,6 +28,7 @@ import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.gax.rpc.WatchdogProvider; import com.google.auth.Credentials; +import com.google.bigtable.v2.PingAndWarmRequest; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Query; @@ -645,6 +646,15 @@ public void checkAndMutateRowSettingsAreSane() { assertThat(builder.getRetryableCodes()).isEmpty(); } + @Test + public void pingAndWarmRetriesAreDisabled() { + UnaryCallSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder().pingAndWarmSettings(); + + assertThat(builder.getRetrySettings().getMaxAttempts()).isAtMost(1); + assertThat(builder.getRetrySettings().getInitialRpcTimeout()).isAtMost(Duration.ofSeconds(30)); + } + private void verifyRetrySettingAreSane(Set retryCodes, RetrySettings retrySettings) { assertThat(retryCodes).containsAtLeast(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE); @@ -701,6 +711,7 @@ public void isRefreshingChannelFalseValueTest() { "bulkReadRowsSettings", "checkAndMutateRowSettings", "readModifyWriteRowSettings", + "pingAndWarmSettings", }; @Test diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index bab2b55e76..8176435f15 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -35,6 +35,8 @@ import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.MutateRowsRequest; import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.PingAndWarmRequest; +import com.google.bigtable.v2.PingAndWarmResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.RowSet; @@ -261,15 +263,10 @@ public void testCreateReadRowsRawCallable() throws InterruptedException { @Test public void testChannelPrimerConfigured() throws IOException { EnhancedBigtableStubSettings settings = - defaultSettings - .toBuilder() - .setRefreshingChannel(true) - .setPrimedTableIds("table1", "table2") - .build(); + defaultSettings.toBuilder().setRefreshingChannel(true).build(); try (EnhancedBigtableStub ignored = EnhancedBigtableStub.create(settings)) { - // priming will issue a request per table on startup - assertThat(fakeDataService.requests).hasSize(2); + assertThat(fakeDataService.pingRequests).hasSize(1); } } @@ -515,6 +512,7 @@ public Listener interceptCall( private static class FakeDataService extends BigtableGrpc.BigtableImplBase { final BlockingQueue requests = Queues.newLinkedBlockingDeque(); + final BlockingQueue pingRequests = Queues.newLinkedBlockingDeque(); @SuppressWarnings("unchecked") ReadRowsRequest popLastRequest() throws InterruptedException { @@ -549,5 +547,13 @@ public void readRows( .build()); responseObserver.onCompleted(); } + + @Override + public void pingAndWarm( + PingAndWarmRequest request, StreamObserver responseObserver) { + pingRequests.add(request); + responseObserver.onNext(PingAndWarmResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } } }