From 6629821ea3200d3a5b93c9d45aab6d57485fcebf Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 19 Jul 2022 12:24:16 -0400 Subject: [PATCH] feat: use PingAndWarm request for channel priming (#1179) Switching channel priming from sending fake ReadRowsRequest to PingAndWarm request, which on the server side will list all the tables for an instance. In the settings we won't need to specify the table Ids to prime. --- .../data/v2/BigtableDataSettings.java | 21 ++-- .../data/v2/stub/BigtableChannelPrimer.java | 116 ++++-------------- .../data/v2/stub/EnhancedBigtableStub.java | 31 ++++- .../v2/stub/EnhancedBigtableStubSettings.java | 61 +++++++-- .../v2/BigtableDataClientFactoryTest.java | 54 ++++---- .../v2/stub/BigtableChannelPrimerTest.java | 69 ++++------- .../EnhancedBigtableStubSettingsTest.java | 11 ++ .../v2/stub/EnhancedBigtableStubTest.java | 20 +-- 8 files changed, 187 insertions(+), 196 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java index edde257527..0ee0d8b24a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java @@ -215,10 +215,10 @@ public boolean isRefreshingChannel() { } /** - * Gets the table ids that will be used to send warmup requests when {@link - * #isRefreshingChannel()} is enabled. + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. */ - @BetaApi("Channel priming is not currently stable and may change in the future") + @Deprecated public List getPrimingTableIds() { return stubSettings.getPrimedTableIds(); } @@ -377,23 +377,20 @@ public boolean isRefreshingChannel() { } /** - * Configure the tables that can be used to prime a channel during a refresh. - * - *

These tables work in conjunction with {@link #setRefreshingChannel(boolean)}. When a - * channel is refreshed, it will send a request to each table to warm up the serverside caches - * before admitting the new channel into the channel pool. + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. */ - @BetaApi("Channel priming is not currently stable and may change in the future") + @Deprecated public Builder setPrimingTableIds(String... tableIds) { stubSettings.setPrimedTableIds(tableIds); return this; } /** - * Gets the table ids that will be used to send warmup requests when {@link - * #setRefreshingChannel(boolean)} is enabled. + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. */ - @BetaApi("Channel priming is not currently stable and may change in the future") + @Deprecated public List getPrimingTableIds() { return stubSettings.getPrimedTableIds(); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java index 93f2cf3ef0..303f52a8e2 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -15,7 +15,6 @@ */ package com.google.cloud.bigtable.data.v2.stub; -import com.google.api.core.ApiFuture; import com.google.api.core.BetaApi; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; @@ -23,25 +22,13 @@ import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.auth.Credentials; -import com.google.bigtable.v2.ReadRowsRequest; -import com.google.bigtable.v2.RowFilter; -import com.google.bigtable.v2.RowSet; -import com.google.bigtable.v2.TableName; -import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter; -import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.bigtable.v2.PingAndWarmRequest; +import com.google.cloud.bigtable.data.v2.internal.NameUtil; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.protobuf.ByteString; -import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.logging.Logger; -import org.threeten.bp.Duration; /** * A channel warmer that ensures that a Bigtable channel is ready to be used before being added to @@ -54,18 +41,10 @@ class BigtableChannelPrimer implements ChannelPrimer { private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString()); - static ByteString PRIMING_ROW_KEY = ByteString.copyFromUtf8("nonexistent-priming-row"); - private static Duration PRIME_REQUEST_TIMEOUT = Duration.ofSeconds(30); - private final EnhancedBigtableStubSettings settingsTemplate; - private final List tableIds; static BigtableChannelPrimer create( - Credentials credentials, - String projectId, - String instanceId, - String appProfileId, - List tableIds) { + Credentials credentials, String projectId, String instanceId, String appProfileId) { EnhancedBigtableStubSettings.Builder builder = EnhancedBigtableStubSettings.newBuilder() .setProjectId(projectId) @@ -75,28 +54,12 @@ static BigtableChannelPrimer create( .setExecutorProvider( InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build()); - // Disable retries for priming request - builder - .readRowSettings() - .setRetrySettings( - builder - .readRowSettings() - .getRetrySettings() - .toBuilder() - .setMaxAttempts(1) - .setJittered(false) - .setInitialRpcTimeout(PRIME_REQUEST_TIMEOUT) - .setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT) - .setTotalTimeout(PRIME_REQUEST_TIMEOUT) - .build()); - return new BigtableChannelPrimer(builder.build(), tableIds); + return new BigtableChannelPrimer(builder.build()); } - private BigtableChannelPrimer( - EnhancedBigtableStubSettings settingsTemplate, List tableIds) { + private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate) { Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null"); this.settingsTemplate = settingsTemplate; - this.tableIds = ImmutableList.copyOf(tableIds); } @Override @@ -110,25 +73,7 @@ public void primeChannel(ManagedChannel managedChannel) { } private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException { - if (tableIds.isEmpty()) { - waitForChannelReady(managedChannel); - } else { - sendPrimeRequests(managedChannel); - } - } - - private void waitForChannelReady(ManagedChannel managedChannel) { - for (int i = 0; i < 30; i++) { - ConnectivityState connectivityState = managedChannel.getState(true); - if (connectivityState == ConnectivityState.READY) { - break; - } - try { - TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e) { - break; - } - } + sendPrimeRequests(managedChannel); } private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException { @@ -141,41 +86,24 @@ private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException .build(); try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) { - Map> primeFutures = new HashMap<>(); - - // Prime all of the table ids in parallel - for (String tableId : tableIds) { - ApiFuture f = - stub.createReadRowsRawCallable(new DefaultRowAdapter()) - .first() - .futureCall( - ReadRowsRequest.newBuilder() - .setTableName( - TableName.format( - primingSettings.getProjectId(), - primingSettings.getInstanceId(), - tableId)) - .setAppProfileId(primingSettings.getAppProfileId()) - .setRows(RowSet.newBuilder().addRowKeys(PRIMING_ROW_KEY).build()) - .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) - .setRowsLimit(1) - .build()); + PingAndWarmRequest request = + PingAndWarmRequest.newBuilder() + .setName( + NameUtil.formatInstanceName( + primingSettings.getProjectId(), primingSettings.getInstanceId())) + .setAppProfileId(primingSettings.getAppProfileId()) + .build(); - primeFutures.put(tableId, f); - } - - // Wait for all of the prime requests to complete. - for (Map.Entry> entry : primeFutures.entrySet()) { - try { - entry.getValue().get(); - } catch (Throwable e) { - if (e instanceof ExecutionException) { - e = e.getCause(); - } - LOG.warning( - String.format( - "Failed to prime channel for table: %s: %s", entry.getKey(), e.getMessage())); + try { + stub.pingAndWarmCallable().call(request); + } catch (Throwable e) { + // TODO: Not sure if we should swallow the error here. We are pre-emptively swapping + // channels if the new + // channel is bad. + if (e instanceof ExecutionException) { + e = e.getCause(); } + LOG.warning(String.format("Failed to prime channel: %s", e)); } } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index ec237aabf7..1550127e23 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -51,6 +51,8 @@ import com.google.bigtable.v2.MutateRowResponse; import com.google.bigtable.v2.MutateRowsRequest; import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.PingAndWarmRequest; +import com.google.bigtable.v2.PingAndWarmResponse; import com.google.bigtable.v2.ReadModifyWriteRowRequest; import com.google.bigtable.v2.ReadModifyWriteRowResponse; import com.google.bigtable.v2.ReadRowsRequest; @@ -104,6 +106,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -141,6 +144,7 @@ public class EnhancedBigtableStub implements AutoCloseable { private final UnaryCallable bulkMutateRowsCallable; private final UnaryCallable checkAndMutateRowCallable; private final UnaryCallable readModifyWriteRowCallable; + private final UnaryCallable pingAndWarmCallable; public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) throws IOException { @@ -181,8 +185,7 @@ public static EnhancedBigtableStubSettings finalizeSettings( credentials, settings.getProjectId(), settings.getInstanceId(), - settings.getAppProfileId(), - settings.getPrimedTableIds())) + settings.getAppProfileId())) .build()); } @@ -284,6 +287,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext bulkMutateRowsCallable = createBulkMutateRowsCallable(); checkAndMutateRowCallable = createCheckAndMutateRowCallable(); readModifyWriteRowCallable = createReadModifyWriteRowCallable(); + pingAndWarmCallable = createPingAndWarmCallable(); } // @@ -810,6 +814,25 @@ private UnaryCallable createUserFacin return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } + + private UnaryCallable createPingAndWarmCallable() { + UnaryCallable pingAndWarm = + GrpcRawCallableFactory.createUnaryCallable( + GrpcCallSettings.newBuilder() + .setMethodDescriptor(BigtableGrpc.getPingAndWarmMethod()) + .setParamsExtractor( + new RequestParamsExtractor() { + @Override + public Map extract(PingAndWarmRequest request) { + return ImmutableMap.of( + "name", request.getName(), + "app_profile_id", request.getAppProfileId()); + } + }) + .build(), + Collections.emptySet()); + return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext()); + } // // @@ -854,6 +877,10 @@ public UnaryCallable checkAndMutateRowCallable( public UnaryCallable readModifyWriteRowCallable() { return readModifyWriteRowCallable; } + + UnaryCallable pingAndWarmCallable() { + return pingAndWarmCallable; + } // private SpanName getSpanName(String methodName) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 395ba52b08..f2ae486d9e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -33,6 +33,7 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.auth.Credentials; +import com.google.bigtable.v2.PingAndWarmRequest; import com.google.cloud.bigtable.Version; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.KeyOffset; @@ -113,6 +114,9 @@ public class EnhancedBigtableStubSettings extends StubSettings READ_ROWS_RETRY_CODES = ImmutableSet.builder().addAll(IDEMPOTENT_RETRY_CODES).add(Code.ABORTED).build(); + // Priming request should have a shorter timeout + private static Duration PRIME_REQUEST_TIMEOUT = Duration.ofSeconds(30); + private static final RetrySettings READ_ROWS_RETRY_SETTINGS = RetrySettings.newBuilder() .setInitialRetryDelay(Duration.ofMillis(10)) @@ -173,6 +177,7 @@ public class EnhancedBigtableStubSettings extends StubSettings checkAndMutateRowSettings; private final UnaryCallSettings readModifyWriteRowSettings; + private final UnaryCallSettings pingAndWarmSettings; private EnhancedBigtableStubSettings(Builder builder) { super(builder); @@ -208,6 +213,7 @@ private EnhancedBigtableStubSettings(Builder builder) { bulkReadRowsSettings = builder.bulkReadRowsSettings.build(); checkAndMutateRowSettings = builder.checkAndMutateRowSettings.build(); readModifyWriteRowSettings = builder.readModifyWriteRowSettings.build(); + pingAndWarmSettings = builder.pingAndWarmSettings.build(); } /** Create a new builder. */ @@ -236,8 +242,11 @@ public boolean isRefreshingChannel() { return isRefreshingChannel; } - /** Gets the tables that will be primed during a channel refresh. */ - @BetaApi("Channel priming is not currently stable and might change in the future") + /** + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. + */ + @Deprecated public List getPrimedTableIds() { return primedTableIds; } @@ -491,6 +500,15 @@ public UnaryCallSettings readModifyWriteRowSettings() { return readModifyWriteRowSettings; } + /** + * Returns the object with the settings used for calls to PingAndWarm. + * + *

By default the retries are disabled for PingAndWarm and deadline is set to 30 seconds. + */ + UnaryCallSettings pingAndWarmSettings() { + return pingAndWarmSettings; + } + /** Returns a builder containing all the values of this settings class. */ public Builder toBuilder() { return new Builder(this); @@ -515,6 +533,7 @@ public static class Builder extends StubSettings.Builder checkAndMutateRowSettings; private final UnaryCallSettings.Builder readModifyWriteRowSettings; + private final UnaryCallSettings.Builder pingAndWarmSettings; /** * Initializes a new Builder with sane defaults for all settings. @@ -626,6 +645,15 @@ private Builder() { readModifyWriteRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); copyRetrySettings(baseDefaults.readModifyWriteRowSettings(), readModifyWriteRowSettings); + + pingAndWarmSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); + pingAndWarmSettings.setRetrySettings( + RetrySettings.newBuilder() + .setMaxAttempts(1) + .setInitialRpcTimeout(PRIME_REQUEST_TIMEOUT) + .setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT) + .setTotalTimeout(PRIME_REQUEST_TIMEOUT) + .build()); } private Builder(EnhancedBigtableStubSettings settings) { @@ -646,6 +674,7 @@ private Builder(EnhancedBigtableStubSettings settings) { bulkReadRowsSettings = settings.bulkReadRowsSettings.toBuilder(); checkAndMutateRowSettings = settings.checkAndMutateRowSettings.toBuilder(); readModifyWriteRowSettings = settings.readModifyWriteRowSettings.toBuilder(); + pingAndWarmSettings = settings.pingAndWarmSettings.toBuilder(); } // @@ -727,9 +756,8 @@ public String getAppProfileId() { /** * Sets if channels will gracefully refresh connections to Cloud Bigtable service. * - *

When enabled, this will wait for the connection to complete the SSL handshake. The effect - * can be enhanced by configuring table ids that can be used warm serverside caches using {@link - * #setPrimedTableIds(String...)}. + *

When enabled, this will wait for the connection to complete the SSL handshake and warm up + * serverside caches for all the tables of the instance. * * @see com.google.cloud.bigtable.data.v2.BigtableDataSettings.Builder#setRefreshingChannel */ @@ -739,8 +767,11 @@ public Builder setRefreshingChannel(boolean isRefreshingChannel) { return this; } - /** Configures which tables will be primed when a connection is created. */ - @BetaApi("Channel priming is not currently stable and might change in the future") + /** + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. + */ + @Deprecated public Builder setPrimedTableIds(String... tableIds) { this.primedTableIds = ImmutableList.copyOf(tableIds); return this; @@ -752,8 +783,11 @@ public boolean isRefreshingChannel() { return isRefreshingChannel; } - /** Gets the tables that will be primed during a channel refresh. */ - @BetaApi("Channel priming is not currently stable and might change in the future") + /** + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. + */ + @Deprecated public List getPrimedTableIds() { return primedTableIds; } @@ -809,6 +843,11 @@ public UnaryCallSettings.Builder readModifyWriteRowSett return readModifyWriteRowSettings; } + /** Returns the builder with the settings used for calls to PingAndWarm. */ + public UnaryCallSettings.Builder pingAndWarmSettings() { + return pingAndWarmSettings; + } + @SuppressWarnings("unchecked") public EnhancedBigtableStubSettings build() { Preconditions.checkState(projectId != null, "Project id must be set"); @@ -831,8 +870,7 @@ public EnhancedBigtableStubSettings build() { // Use shared credentials this.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); channelProviderBuilder.setChannelPrimer( - BigtableChannelPrimer.create( - credentials, projectId, instanceId, appProfileId, primedTableIds)); + BigtableChannelPrimer.create(credentials, projectId, instanceId, appProfileId)); this.setTransportChannelProvider(channelProviderBuilder.build()); } return new EnhancedBigtableStubSettings(this); @@ -857,6 +895,7 @@ public String toString() { .add("bulkReadRowsSettings", bulkReadRowsSettings) .add("checkAndMutateRowSettings", checkAndMutateRowSettings) .add("readModifyWriteRowSettings", readModifyWriteRowSettings) + .add("pingAndWarmSettings", pingAndWarmSettings) .add("parent", super.toString()) .toString(); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java index 67d4183b8b..ebda860851 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java @@ -26,16 +26,16 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.WatchdogProvider; import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.InstanceName; import com.google.bigtable.v2.MutateRowRequest; import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.PingAndWarmRequest; +import com.google.bigtable.v2.PingAndWarmResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; -import com.google.bigtable.v2.RowFilter; -import com.google.bigtable.v2.RowSet; import com.google.cloud.bigtable.data.v2.internal.NameUtil; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; import io.grpc.Attributes; import io.grpc.Server; import io.grpc.ServerTransportFilter; @@ -224,14 +224,12 @@ public void testCreateForInstanceWithAppProfileHasCorrectSettings() throws Excep @Test public void testCreateWithRefreshingChannel() throws Exception { - String[] tableIds = {"fake-table1", "fake-table2"}; int poolSize = 3; BigtableDataSettings.Builder builder = BigtableDataSettings.newBuilderForEmulator(server.getPort()) .setProjectId(DEFAULT_PROJECT_ID) .setInstanceId(DEFAULT_INSTANCE_ID) .setAppProfileId(DEFAULT_APP_PROFILE_ID) - .setPrimingTableIds(tableIds) .setRefreshingChannel(true); builder .stubSettings() @@ -258,26 +256,17 @@ public void testCreateWithRefreshingChannel() throws Exception { assertThat(setUpAttributes).hasSize(poolSize); // Make sure that prime requests were sent only once per table per connection - assertThat(service.readRowsRequests).hasSize(poolSize * tableIds.length); - List expectedRequests = new LinkedList<>(); - for (String tableId : tableIds) { - for (int i = 0; i < poolSize; i++) { - expectedRequests.add( - ReadRowsRequest.newBuilder() - .setTableName( - String.format( - "projects/%s/instances/%s/tables/%s", - DEFAULT_PROJECT_ID, DEFAULT_INSTANCE_ID, tableId)) - .setAppProfileId(DEFAULT_APP_PROFILE_ID) - .setRows( - RowSet.newBuilder() - .addRowKeys(ByteString.copyFromUtf8("nonexistent-priming-row"))) - .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) - .setRowsLimit(1) - .build()); - } + assertThat(service.pingAndWarmRequests).hasSize(poolSize); + List expectedRequests = new LinkedList<>(); + for (int i = 0; i < poolSize; i++) { + expectedRequests.add( + PingAndWarmRequest.newBuilder() + .setName(InstanceName.format(DEFAULT_PROJECT_ID, DEFAULT_INSTANCE_ID)) + .setAppProfileId(DEFAULT_APP_PROFILE_ID) + .build()); } - assertThat(service.readRowsRequests).containsExactly(expectedRequests.toArray()); + + assertThat(service.pingAndWarmRequests).containsExactly(expectedRequests.toArray()); // Wait for all the connections to close asynchronously factory.close(); @@ -316,6 +305,7 @@ private static class FakeBigtableService extends BigtableGrpc.BigtableImplBase { volatile MutateRowRequest lastRequest; BlockingQueue readRowsRequests = new LinkedBlockingDeque<>(); + BlockingQueue pingAndWarmRequests = new LinkedBlockingDeque<>(); private ApiFunction readRowsCallback = new ApiFunction() { @Override @@ -324,6 +314,14 @@ public ReadRowsResponse apply(ReadRowsRequest readRowsRequest) { } }; + private ApiFunction pingAndWarmCallback = + new ApiFunction() { + @Override + public PingAndWarmResponse apply(PingAndWarmRequest pingAndWarmRequest) { + return PingAndWarmResponse.getDefaultInstance(); + } + }; + @Override public void readRows( ReadRowsRequest request, StreamObserver responseObserver) { @@ -343,6 +341,14 @@ public void mutateRow( responseObserver.onNext(MutateRowResponse.getDefaultInstance()); responseObserver.onCompleted(); } + + @Override + public void pingAndWarm( + PingAndWarmRequest request, StreamObserver responseObserver) { + pingAndWarmRequests.add(request); + responseObserver.onNext(pingAndWarmCallback.apply(request)); + responseObserver.onCompleted(); + } } private static class BuilderAnswer implements Answer { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java index 7195e4105f..e1f22bebbd 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java @@ -21,12 +21,9 @@ import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.OAuth2Credentials; import com.google.bigtable.v2.BigtableGrpc.BigtableImplBase; -import com.google.bigtable.v2.ReadRowsRequest; -import com.google.bigtable.v2.ReadRowsResponse; -import com.google.bigtable.v2.RowFilter; -import com.google.bigtable.v2.RowSet; +import com.google.bigtable.v2.PingAndWarmRequest; +import com.google.bigtable.v2.PingAndWarmResponse; import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; -import com.google.common.collect.ImmutableList; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; @@ -75,8 +72,7 @@ public void setup() throws IOException { OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)), "fake-project", "fake-instance", - "fake-app-profile", - ImmutableList.of("table1", "table2")); + "fake-app-profile"); channel = ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build(); @@ -104,63 +100,45 @@ public void testCredentials() { @Test public void testRequests() { - final Queue requests = new ConcurrentLinkedQueue<>(); + final Queue requests = new ConcurrentLinkedQueue<>(); - fakeService.readRowsCallback = - new ApiFunction() { + fakeService.pingAndWarmCallback = + new ApiFunction() { @Override - public ReadRowsResponse apply(ReadRowsRequest req) { + public PingAndWarmResponse apply(PingAndWarmRequest req) { requests.add(req); - return ReadRowsResponse.getDefaultInstance(); + return PingAndWarmResponse.getDefaultInstance(); } }; primer.primeChannel(channel); assertThat(requests) .containsExactly( - ReadRowsRequest.newBuilder() - .setTableName("projects/fake-project/instances/fake-instance/tables/table1") + PingAndWarmRequest.newBuilder() + .setName("projects/fake-project/instances/fake-instance") .setAppProfileId("fake-app-profile") - .setRows(RowSet.newBuilder().addRowKeys(BigtableChannelPrimer.PRIMING_ROW_KEY)) - .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) - .setRowsLimit(1) - .build(), - ReadRowsRequest.newBuilder() - .setTableName("projects/fake-project/instances/fake-instance/tables/table2") - .setAppProfileId("fake-app-profile") - .setRows(RowSet.newBuilder().addRowKeys(BigtableChannelPrimer.PRIMING_ROW_KEY)) - .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) - .setRowsLimit(1) .build()); } @Test public void testErrorsAreLogged() { - fakeService.readRowsCallback = - new ApiFunction() { + fakeService.pingAndWarmCallback = + new ApiFunction() { @Override - public ReadRowsResponse apply(ReadRowsRequest req) { + public PingAndWarmResponse apply(PingAndWarmRequest pingAndWarmRequest) { throw new StatusRuntimeException(Status.FAILED_PRECONDITION); } }; primer.primeChannel(channel); - assertThat(logHandler.logs).hasSize(2); + assertThat(logHandler.logs).hasSize(1); for (LogRecord log : logHandler.logs) { assertThat(log.getMessage()).contains("FAILED_PRECONDITION"); } } @Test - public void testErrorsAreLoggedForBasic() { - BigtableChannelPrimer basicPrimer = - BigtableChannelPrimer.create( - OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)), - "fake-project", - "fake-instance", - "fake-app-profile", - ImmutableList.of()); - + public void testChannelErrorsAreLogged() { ManagedChannel channel = Mockito.mock( ManagedChannel.class, new ThrowsException(new UnsupportedOperationException())); @@ -168,7 +146,7 @@ public void testErrorsAreLoggedForBasic() { assertThat(logHandler.logs).hasSize(1); for (LogRecord log : logHandler.logs) { - assertThat(log.getMessage()).contains("Unexpected"); + assertThat(log.getMessage()).contains("UnsupportedOperationException"); } } @@ -187,20 +165,19 @@ public Listener interceptCall( } static class FakeService extends BigtableImplBase { - private ApiFunction readRowsCallback = - new ApiFunction() { + private ApiFunction pingAndWarmCallback = + new ApiFunction() { @Override - public ReadRowsResponse apply(ReadRowsRequest readRowsRequest) { - return ReadRowsResponse.getDefaultInstance(); + public PingAndWarmResponse apply(PingAndWarmRequest pingAndWarmRequest) { + return PingAndWarmResponse.getDefaultInstance(); } }; @Override - public void readRows( - ReadRowsRequest request, StreamObserver responseObserver) { - + public void pingAndWarm( + PingAndWarmRequest request, StreamObserver responseObserver) { try { - responseObserver.onNext(readRowsCallback.apply(request)); + responseObserver.onNext(pingAndWarmCallback.apply(request)); responseObserver.onCompleted(); } catch (RuntimeException e) { responseObserver.onError(e); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java index c4e5ea2e40..d5a22884b6 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java @@ -28,6 +28,7 @@ import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.gax.rpc.WatchdogProvider; import com.google.auth.Credentials; +import com.google.bigtable.v2.PingAndWarmRequest; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Query; @@ -645,6 +646,15 @@ public void checkAndMutateRowSettingsAreSane() { assertThat(builder.getRetryableCodes()).isEmpty(); } + @Test + public void pingAndWarmRetriesAreDisabled() { + UnaryCallSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder().pingAndWarmSettings(); + + assertThat(builder.getRetrySettings().getMaxAttempts()).isAtMost(1); + assertThat(builder.getRetrySettings().getInitialRpcTimeout()).isAtMost(Duration.ofSeconds(30)); + } + private void verifyRetrySettingAreSane(Set retryCodes, RetrySettings retrySettings) { assertThat(retryCodes).containsAtLeast(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE); @@ -701,6 +711,7 @@ public void isRefreshingChannelFalseValueTest() { "bulkReadRowsSettings", "checkAndMutateRowSettings", "readModifyWriteRowSettings", + "pingAndWarmSettings", }; @Test diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index bab2b55e76..8176435f15 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -35,6 +35,8 @@ import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.MutateRowsRequest; import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.PingAndWarmRequest; +import com.google.bigtable.v2.PingAndWarmResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.RowSet; @@ -261,15 +263,10 @@ public void testCreateReadRowsRawCallable() throws InterruptedException { @Test public void testChannelPrimerConfigured() throws IOException { EnhancedBigtableStubSettings settings = - defaultSettings - .toBuilder() - .setRefreshingChannel(true) - .setPrimedTableIds("table1", "table2") - .build(); + defaultSettings.toBuilder().setRefreshingChannel(true).build(); try (EnhancedBigtableStub ignored = EnhancedBigtableStub.create(settings)) { - // priming will issue a request per table on startup - assertThat(fakeDataService.requests).hasSize(2); + assertThat(fakeDataService.pingRequests).hasSize(1); } } @@ -515,6 +512,7 @@ public Listener interceptCall( private static class FakeDataService extends BigtableGrpc.BigtableImplBase { final BlockingQueue requests = Queues.newLinkedBlockingDeque(); + final BlockingQueue pingRequests = Queues.newLinkedBlockingDeque(); @SuppressWarnings("unchecked") ReadRowsRequest popLastRequest() throws InterruptedException { @@ -549,5 +547,13 @@ public void readRows( .build()); responseObserver.onCompleted(); } + + @Override + public void pingAndWarm( + PingAndWarmRequest request, StreamObserver responseObserver) { + pingRequests.add(request); + responseObserver.onNext(PingAndWarmResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } } }