From ef347fd94a98e5efb955d83cae23e405dcb6b01a Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 18 Feb 2022 11:10:48 -0700 Subject: [PATCH] feat: use PingAndWarm request for channel priming --- .../data/v2/BigtableDataSettings.java | 19 +-- .../data/v2/stub/BigtableChannelPrimer.java | 108 +++--------------- .../data/v2/stub/EnhancedBigtableStub.java | 29 ++++- .../v2/stub/EnhancedBigtableStubSettings.java | 12 +- .../v2/BigtableDataClientFactoryTest.java | 58 ++++++---- .../v2/stub/BigtableChannelPrimerTest.java | 69 ++++------- .../v2/stub/EnhancedBigtableStubTest.java | 20 ++-- 7 files changed, 135 insertions(+), 180 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java index edde257527..e3e0c0cf34 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java @@ -217,8 +217,11 @@ public boolean isRefreshingChannel() { /** * Gets the table ids that will be used to send warmup requests when {@link * #isRefreshingChannel()} is enabled. + * + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. */ - @BetaApi("Channel priming is not currently stable and may change in the future") + @Deprecated public List getPrimingTableIds() { return stubSettings.getPrimedTableIds(); } @@ -377,13 +380,10 @@ public boolean isRefreshingChannel() { } /** - * Configure the tables that can be used to prime a channel during a refresh. - * - *

These tables work in conjunction with {@link #setRefreshingChannel(boolean)}. When a - * channel is refreshed, it will send a request to each table to warm up the serverside caches - * before admitting the new channel into the channel pool. + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. */ - @BetaApi("Channel priming is not currently stable and may change in the future") + @Deprecated public Builder setPrimingTableIds(String... tableIds) { stubSettings.setPrimedTableIds(tableIds); return this; @@ -392,8 +392,11 @@ public Builder setPrimingTableIds(String... tableIds) { /** * Gets the table ids that will be used to send warmup requests when {@link * #setRefreshingChannel(boolean)} is enabled. + * + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. */ - @BetaApi("Channel priming is not currently stable and may change in the future") + @Deprecated public List getPrimingTableIds() { return stubSettings.getPrimedTableIds(); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java index 93f2cf3ef0..2a68e1aadc 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -15,7 +15,6 @@ */ package com.google.cloud.bigtable.data.v2.stub; -import com.google.api.core.ApiFuture; import com.google.api.core.BetaApi; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; @@ -23,23 +22,13 @@ import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.auth.Credentials; -import com.google.bigtable.v2.ReadRowsRequest; -import com.google.bigtable.v2.RowFilter; -import com.google.bigtable.v2.RowSet; -import com.google.bigtable.v2.TableName; -import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter; -import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.bigtable.v2.InstanceName; +import com.google.bigtable.v2.PingAndWarmRequest; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; -import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import org.threeten.bp.Duration; @@ -58,14 +47,9 @@ class BigtableChannelPrimer implements ChannelPrimer { private static Duration PRIME_REQUEST_TIMEOUT = Duration.ofSeconds(30); private final EnhancedBigtableStubSettings settingsTemplate; - private final List tableIds; static BigtableChannelPrimer create( - Credentials credentials, - String projectId, - String instanceId, - String appProfileId, - List tableIds) { + Credentials credentials, String projectId, String instanceId, String appProfileId) { EnhancedBigtableStubSettings.Builder builder = EnhancedBigtableStubSettings.newBuilder() .setProjectId(projectId) @@ -75,28 +59,12 @@ static BigtableChannelPrimer create( .setExecutorProvider( InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build()); - // Disable retries for priming request - builder - .readRowSettings() - .setRetrySettings( - builder - .readRowSettings() - .getRetrySettings() - .toBuilder() - .setMaxAttempts(1) - .setJittered(false) - .setInitialRpcTimeout(PRIME_REQUEST_TIMEOUT) - .setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT) - .setTotalTimeout(PRIME_REQUEST_TIMEOUT) - .build()); - return new BigtableChannelPrimer(builder.build(), tableIds); + return new BigtableChannelPrimer(builder.build()); } - private BigtableChannelPrimer( - EnhancedBigtableStubSettings settingsTemplate, List tableIds) { + private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate) { Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null"); this.settingsTemplate = settingsTemplate; - this.tableIds = ImmutableList.copyOf(tableIds); } @Override @@ -110,25 +78,7 @@ public void primeChannel(ManagedChannel managedChannel) { } private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException { - if (tableIds.isEmpty()) { - waitForChannelReady(managedChannel); - } else { - sendPrimeRequests(managedChannel); - } - } - - private void waitForChannelReady(ManagedChannel managedChannel) { - for (int i = 0; i < 30; i++) { - ConnectivityState connectivityState = managedChannel.getState(true); - if (connectivityState == ConnectivityState.READY) { - break; - } - try { - TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e) { - break; - } - } + sendPrimeRequests(managedChannel); } private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException { @@ -141,41 +91,21 @@ private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException .build(); try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) { - Map> primeFutures = new HashMap<>(); - - // Prime all of the table ids in parallel - for (String tableId : tableIds) { - ApiFuture f = - stub.createReadRowsRawCallable(new DefaultRowAdapter()) - .first() - .futureCall( - ReadRowsRequest.newBuilder() - .setTableName( - TableName.format( - primingSettings.getProjectId(), - primingSettings.getInstanceId(), - tableId)) - .setAppProfileId(primingSettings.getAppProfileId()) - .setRows(RowSet.newBuilder().addRowKeys(PRIMING_ROW_KEY).build()) - .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) - .setRowsLimit(1) - .build()); + PingAndWarmRequest request = + PingAndWarmRequest.newBuilder() + .setName( + InstanceName.format( + primingSettings.getProjectId(), primingSettings.getInstanceId())) + .setAppProfileId(primingSettings.getAppProfileId()) + .build(); - primeFutures.put(tableId, f); - } - - // Wait for all of the prime requests to complete. - for (Map.Entry> entry : primeFutures.entrySet()) { - try { - entry.getValue().get(); - } catch (Throwable e) { - if (e instanceof ExecutionException) { - e = e.getCause(); - } - LOG.warning( - String.format( - "Failed to prime channel for table: %s: %s", entry.getKey(), e.getMessage())); + try { + stub.pingAndWarmCallable().call(request); + } catch (Throwable e) { + if (e instanceof ExecutionException) { + e = e.getCause(); } + LOG.warning(String.format("Failed to prime channel: %s", e)); } } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index d8daaa80e6..24a0583be4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -51,6 +51,8 @@ import com.google.bigtable.v2.MutateRowResponse; import com.google.bigtable.v2.MutateRowsRequest; import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.PingAndWarmRequest; +import com.google.bigtable.v2.PingAndWarmResponse; import com.google.bigtable.v2.ReadModifyWriteRowRequest; import com.google.bigtable.v2.ReadModifyWriteRowResponse; import com.google.bigtable.v2.ReadRowsRequest; @@ -103,6 +105,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -140,6 +143,7 @@ public class EnhancedBigtableStub implements AutoCloseable { private final UnaryCallable bulkMutateRowsCallable; private final UnaryCallable checkAndMutateRowCallable; private final UnaryCallable readModifyWriteRowCallable; + private final UnaryCallable pingAndWarmCallable; public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) throws IOException { @@ -180,8 +184,7 @@ public static EnhancedBigtableStubSettings finalizeSettings( credentials, settings.getProjectId(), settings.getInstanceId(), - settings.getAppProfileId(), - settings.getPrimedTableIds())) + settings.getAppProfileId())) .build()); } @@ -276,6 +279,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext bulkMutateRowsCallable = createBulkMutateRowsCallable(); checkAndMutateRowCallable = createCheckAndMutateRowCallable(); readModifyWriteRowCallable = createReadModifyWriteRowCallable(); + pingAndWarmCallable = createPingAndWarmCallable(); } // @@ -801,6 +805,23 @@ private UnaryCallable createUserFacin return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } + + private UnaryCallable createPingAndWarmCallable() { + UnaryCallable pingAndWarm = + GrpcRawCallableFactory.createUnaryCallable( + GrpcCallSettings.newBuilder() + .setMethodDescriptor(BigtableGrpc.getPingAndWarmMethod()) + .setParamsExtractor( + new RequestParamsExtractor() { + @Override + public Map extract(PingAndWarmRequest request) { + return ImmutableMap.of("app_profile_id", request.getAppProfileId()); + } + }) + .build(), + Collections.emptySet()); + return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext()); + } // // @@ -845,6 +866,10 @@ public UnaryCallable checkAndMutateRowCallable( public UnaryCallable readModifyWriteRowCallable() { return readModifyWriteRowCallable; } + + UnaryCallable pingAndWarmCallable() { + return pingAndWarmCallable; + } // private SpanName getSpanName(String methodName) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 395ba52b08..139e3bf66b 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -752,8 +752,13 @@ public boolean isRefreshingChannel() { return isRefreshingChannel; } - /** Gets the tables that will be primed during a channel refresh. */ - @BetaApi("Channel priming is not currently stable and might change in the future") + /** + * Gets the tables that will be primed during a channel refresh. + * + * @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up + * requests will be sent to all table ids of the instance. + */ + @Deprecated public List getPrimedTableIds() { return primedTableIds; } @@ -831,8 +836,7 @@ public EnhancedBigtableStubSettings build() { // Use shared credentials this.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); channelProviderBuilder.setChannelPrimer( - BigtableChannelPrimer.create( - credentials, projectId, instanceId, appProfileId, primedTableIds)); + BigtableChannelPrimer.create(credentials, projectId, instanceId, appProfileId)); this.setTransportChannelProvider(channelProviderBuilder.build()); } return new EnhancedBigtableStubSettings(this); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java index fd75bed956..071e54cfb5 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java @@ -26,17 +26,17 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.WatchdogProvider; import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.InstanceName; import com.google.bigtable.v2.MutateRowRequest; import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.PingAndWarmRequest; +import com.google.bigtable.v2.PingAndWarmResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; -import com.google.bigtable.v2.RowFilter; -import com.google.bigtable.v2.RowSet; import com.google.cloud.bigtable.data.v2.internal.NameUtil; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.protobuf.ByteString; import io.grpc.Attributes; import io.grpc.BindableService; import io.grpc.ServerInterceptor; @@ -231,14 +231,12 @@ public void testCreateForInstanceWithAppProfileHasCorrectSettings() throws Excep @Test public void testCreateWithRefreshingChannel() throws Exception { - String[] tableIds = {"fake-table1", "fake-table2"}; int poolSize = 3; BigtableDataSettings.Builder builder = BigtableDataSettings.newBuilderForEmulator(port) .setProjectId(DEFAULT_PROJECT_ID) .setInstanceId(DEFAULT_INSTANCE_ID) .setAppProfileId(DEFAULT_APP_PROFILE_ID) - .setPrimingTableIds(tableIds) .setRefreshingChannel(true); builder .stubSettings() @@ -265,26 +263,17 @@ public void testCreateWithRefreshingChannel() throws Exception { assertThat(setUpAttributes).hasSize(poolSize); // Make sure that prime requests were sent only once per table per connection - assertThat(service.readRowsRequests).hasSize(poolSize * tableIds.length); - List expectedRequests = new LinkedList<>(); - for (String tableId : tableIds) { - for (int i = 0; i < poolSize; i++) { - expectedRequests.add( - ReadRowsRequest.newBuilder() - .setTableName( - String.format( - "projects/%s/instances/%s/tables/%s", - DEFAULT_PROJECT_ID, DEFAULT_INSTANCE_ID, tableId)) - .setAppProfileId(DEFAULT_APP_PROFILE_ID) - .setRows( - RowSet.newBuilder() - .addRowKeys(ByteString.copyFromUtf8("nonexistent-priming-row"))) - .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) - .setRowsLimit(1) - .build()); - } + assertThat(service.pingAndWarmRequests).hasSize(poolSize); + List expectedRequests = new LinkedList<>(); + for (int i = 0; i < poolSize; i++) { + expectedRequests.add( + PingAndWarmRequest.newBuilder() + .setName(InstanceName.format(DEFAULT_PROJECT_ID, DEFAULT_INSTANCE_ID)) + .setAppProfileId(DEFAULT_APP_PROFILE_ID) + .build()); } - assertThat(service.readRowsRequests).containsExactly(expectedRequests.toArray()); + + assertThat(service.pingAndWarmRequests).containsExactly(expectedRequests.toArray()); // Wait for all the connections to close asynchronously factory.close(); @@ -323,6 +312,7 @@ private static class FakeBigtableService extends BigtableGrpc.BigtableImplBase { volatile MutateRowRequest lastRequest; BlockingQueue readRowsRequests = new LinkedBlockingDeque<>(); + BlockingQueue pingAndWarmRequests = new LinkedBlockingDeque<>(); private ApiFunction readRowsCallback = new ApiFunction() { @Override @@ -331,6 +321,14 @@ public ReadRowsResponse apply(ReadRowsRequest readRowsRequest) { } }; + private ApiFunction pingAndWarmCallback = + new ApiFunction() { + @Override + public PingAndWarmResponse apply(PingAndWarmRequest pingAndWarmRequest) { + return PingAndWarmResponse.getDefaultInstance(); + } + }; + @Override public void readRows( ReadRowsRequest request, StreamObserver responseObserver) { @@ -350,6 +348,18 @@ public void mutateRow( responseObserver.onNext(MutateRowResponse.getDefaultInstance()); responseObserver.onCompleted(); } + + @Override + public void pingAndWarm( + PingAndWarmRequest request, StreamObserver responseObserver) { + try { + pingAndWarmRequests.add(request); + responseObserver.onNext(pingAndWarmCallback.apply(request)); + responseObserver.onCompleted(); + } catch (RuntimeException e) { + responseObserver.onError(e); + } + } } private static class BuilderAnswer implements Answer { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java index 3748a5adff..4d506a5348 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimerTest.java @@ -21,12 +21,9 @@ import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.OAuth2Credentials; import com.google.bigtable.v2.BigtableGrpc.BigtableImplBase; -import com.google.bigtable.v2.ReadRowsRequest; -import com.google.bigtable.v2.ReadRowsResponse; -import com.google.bigtable.v2.RowFilter; -import com.google.bigtable.v2.RowSet; +import com.google.bigtable.v2.PingAndWarmRequest; +import com.google.bigtable.v2.PingAndWarmResponse; import com.google.cloud.bigtable.data.v2.FakeServiceHelper; -import com.google.common.collect.ImmutableList; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; @@ -74,8 +71,7 @@ public void setup() throws IOException { OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)), "fake-project", "fake-instance", - "fake-app-profile", - ImmutableList.of("table1", "table2")); + "fake-app-profile"); channel = ManagedChannelBuilder.forAddress("localhost", serviceHelper.getPort()) @@ -105,63 +101,45 @@ public void testCredentials() { @Test public void testRequests() { - final Queue requests = new ConcurrentLinkedQueue<>(); + final Queue requests = new ConcurrentLinkedQueue<>(); - fakeService.readRowsCallback = - new ApiFunction() { + fakeService.pingAndWarmCallback = + new ApiFunction() { @Override - public ReadRowsResponse apply(ReadRowsRequest req) { + public PingAndWarmResponse apply(PingAndWarmRequest req) { requests.add(req); - return ReadRowsResponse.getDefaultInstance(); + return PingAndWarmResponse.getDefaultInstance(); } }; primer.primeChannel(channel); assertThat(requests) .containsExactly( - ReadRowsRequest.newBuilder() - .setTableName("projects/fake-project/instances/fake-instance/tables/table1") + PingAndWarmRequest.newBuilder() + .setName("projects/fake-project/instances/fake-instance") .setAppProfileId("fake-app-profile") - .setRows(RowSet.newBuilder().addRowKeys(BigtableChannelPrimer.PRIMING_ROW_KEY)) - .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) - .setRowsLimit(1) - .build(), - ReadRowsRequest.newBuilder() - .setTableName("projects/fake-project/instances/fake-instance/tables/table2") - .setAppProfileId("fake-app-profile") - .setRows(RowSet.newBuilder().addRowKeys(BigtableChannelPrimer.PRIMING_ROW_KEY)) - .setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build()) - .setRowsLimit(1) .build()); } @Test public void testErrorsAreLogged() { - fakeService.readRowsCallback = - new ApiFunction() { + fakeService.pingAndWarmCallback = + new ApiFunction() { @Override - public ReadRowsResponse apply(ReadRowsRequest req) { + public PingAndWarmResponse apply(PingAndWarmRequest pingAndWarmRequest) { throw new StatusRuntimeException(Status.FAILED_PRECONDITION); } }; primer.primeChannel(channel); - assertThat(logHandler.logs).hasSize(2); + assertThat(logHandler.logs).hasSize(1); for (LogRecord log : logHandler.logs) { assertThat(log.getMessage()).contains("FAILED_PRECONDITION"); } } @Test - public void testErrorsAreLoggedForBasic() { - BigtableChannelPrimer basicPrimer = - BigtableChannelPrimer.create( - OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)), - "fake-project", - "fake-instance", - "fake-app-profile", - ImmutableList.of()); - + public void testChannelErrorsAreLogged() { ManagedChannel channel = Mockito.mock( ManagedChannel.class, new ThrowsException(new UnsupportedOperationException())); @@ -169,7 +147,7 @@ public void testErrorsAreLoggedForBasic() { assertThat(logHandler.logs).hasSize(1); for (LogRecord log : logHandler.logs) { - assertThat(log.getMessage()).contains("Unexpected"); + assertThat(log.getMessage()).contains("UnsupportedOperationException"); } } @@ -188,20 +166,19 @@ public Listener interceptCall( } static class FakeService extends BigtableImplBase { - private ApiFunction readRowsCallback = - new ApiFunction() { + private ApiFunction pingAndWarmCallback = + new ApiFunction() { @Override - public ReadRowsResponse apply(ReadRowsRequest readRowsRequest) { - return ReadRowsResponse.getDefaultInstance(); + public PingAndWarmResponse apply(PingAndWarmRequest pingAndWarmRequest) { + return PingAndWarmResponse.getDefaultInstance(); } }; @Override - public void readRows( - ReadRowsRequest request, StreamObserver responseObserver) { - + public void pingAndWarm( + PingAndWarmRequest request, StreamObserver responseObserver) { try { - responseObserver.onNext(readRowsCallback.apply(request)); + responseObserver.onNext(pingAndWarmCallback.apply(request)); responseObserver.onCompleted(); } catch (RuntimeException e) { responseObserver.onError(e); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index ae045123f1..4f5e5eb157 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -35,6 +35,8 @@ import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.MutateRowsRequest; import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.PingAndWarmRequest; +import com.google.bigtable.v2.PingAndWarmResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.RowSet; @@ -265,15 +267,10 @@ public void testCreateReadRowsRawCallable() throws InterruptedException { @Test public void testChannelPrimerConfigured() throws IOException { EnhancedBigtableStubSettings settings = - defaultSettings - .toBuilder() - .setRefreshingChannel(true) - .setPrimedTableIds("table1", "table2") - .build(); + defaultSettings.toBuilder().setRefreshingChannel(true).build(); try (EnhancedBigtableStub ignored = EnhancedBigtableStub.create(settings)) { - // priming will issue a request per table on startup - assertThat(fakeDataService.requests).hasSize(2); + assertThat(fakeDataService.pingRequests).hasSize(1); } } @@ -519,6 +516,7 @@ public Listener interceptCall( private static class FakeDataService extends BigtableGrpc.BigtableImplBase { final BlockingQueue requests = Queues.newLinkedBlockingDeque(); + final BlockingQueue pingRequests = Queues.newLinkedBlockingDeque(); @SuppressWarnings("unchecked") ReadRowsRequest popLastRequest() throws InterruptedException { @@ -553,5 +551,13 @@ public void readRows( .build()); responseObserver.onCompleted(); } + + @Override + public void pingAndWarm( + PingAndWarmRequest request, StreamObserver responseObserver) { + pingRequests.add(request); + responseObserver.onNext(PingAndWarmResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } } }