Skip to content

Commit

Permalink
feat: extend channel priming logic to also send fake requests (#398)
Browse files Browse the repository at this point in the history
* feat: extend channel priming logic to also send fake requests

* finish

* remove debug

* copyright

* remove debug code from test

* fix deps

* address some feedback

* add additional error handling test + some comments
  • Loading branch information
igorbernstein2 authored Sep 1, 2020
1 parent 08f7d84 commit 6f1ead2
Show file tree
Hide file tree
Showing 10 changed files with 593 additions and 179 deletions.
4 changes: 4 additions & 0 deletions google-cloud-bigtable/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-iam-v1</artifactId>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.common.base.Strings;
import io.grpc.ManagedChannelBuilder;
import java.util.List;
import java.util.logging.Logger;
import javax.annotation.Nonnull;

Expand Down Expand Up @@ -185,11 +186,20 @@ public String getAppProfileId() {
}

/** Gets if channels will gracefully refresh connections to Cloud Bigtable service */
@BetaApi("This API depends on experimental gRPC APIs")
@BetaApi("Channel priming is not currently stable and may change in the future")
public boolean isRefreshingChannel() {
return stubSettings.isRefreshingChannel();
}

/**
* Gets the table ids that will be used to send warmup requests when {@link
* #isRefreshingChannel()} is enabled.
*/
@BetaApi("Channel priming is not currently stable and may change in the future")
public List<String> getPrimingTableIds() {
return stubSettings.getPrimedTableIds();
}

/** Returns the underlying RPC settings. */
public EnhancedBigtableStubSettings getStubSettings() {
return stubSettings;
Expand Down Expand Up @@ -307,18 +317,40 @@ public CredentialsProvider getCredentialsProvider() {
* connections, which causes the client to renegotiate the gRPC connection in the request path,
* which causes periodic spikes in latency
*/
@BetaApi("This API depends on experimental gRPC APIs")
@BetaApi("Channel priming is not currently stable and may change in the future")
public Builder setRefreshingChannel(boolean isRefreshingChannel) {
stubSettings.setRefreshingChannel(isRefreshingChannel);
return this;
}

/** Gets if channels will gracefully refresh connections to Cloud Bigtable service */
@BetaApi("This API depends on experimental gRPC APIs")
@BetaApi("Channel priming is not currently stable and may change in the future")
public boolean isRefreshingChannel() {
return stubSettings.isRefreshingChannel();
}

/**
* Configure the tables that can be used to prime a channel during a refresh.
*
* <p>These tables work in conjunction with {@link #setRefreshingChannel(boolean)}. When a
* channel is refreshed, it will send a request to each table to warm up the serverside caches
* before admitting the new channel into the channel pool.
*/
@BetaApi("Channel priming is not currently stable and may change in the future")
public Builder setPrimingTableIds(String... tableIds) {
stubSettings.setPrimedTableIds(tableIds);
return this;
}

/**
* Gets the table ids that will be used to send warmup requests when {@link
* #setRefreshingChannel(boolean)} is enabled.
*/
@BetaApi("Channel priming is not currently stable and may change in the future")
public List<String> getPrimingTableIds() {
return stubSettings.getPrimedTableIds();
}

/**
* Returns the underlying settings for making RPC calls. The settings should be changed with
* care.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import static com.google.cloud.bigtable.data.v2.models.Filters.FILTERS;

import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.grpc.ChannelPrimer;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.auth.Credentials;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.threeten.bp.Duration;

/**
* A channel warmer that ensures that a Bigtable channel is ready to be used before being added to
* the active {@link com.google.api.gax.grpc.ChannelPool}.
*
* <p>This implementation is subject to change in the future, but currently it will prime the
* channel by sending a ReadRow request for a hardcoded, non-existent row key.
*/
@BetaApi("Channel priming is not currently stable and might change in the future")
class BigtableChannelPrimer implements ChannelPrimer {
private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString());

static ByteString PRIMING_ROW_KEY = ByteString.copyFromUtf8("nonexistent-priming-row");
private static Duration PRIME_REQUEST_TIMEOUT = Duration.ofSeconds(30);

private final EnhancedBigtableStubSettings settingsTemplate;
private final List<String> tableIds;

static BigtableChannelPrimer create(
Credentials credentials,
String projectId,
String instanceId,
String appProfileId,
List<String> tableIds) {
EnhancedBigtableStubSettings.Builder builder =
EnhancedBigtableStubSettings.newBuilder()
.setProjectId(projectId)
.setInstanceId(instanceId)
.setAppProfileId(appProfileId)
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
.setExecutorProvider(
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build());

// Disable retries for priming request
builder
.readRowSettings()
.setRetrySettings(
builder
.readRowSettings()
.getRetrySettings()
.toBuilder()
.setMaxAttempts(1)
.setJittered(false)
.setInitialRpcTimeout(PRIME_REQUEST_TIMEOUT)
.setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT)
.setTotalTimeout(PRIME_REQUEST_TIMEOUT)
.build());
return new BigtableChannelPrimer(builder.build(), tableIds);
}

private BigtableChannelPrimer(
EnhancedBigtableStubSettings settingsTemplate, List<String> tableIds) {
Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null");
this.settingsTemplate = settingsTemplate;
this.tableIds = ImmutableList.copyOf(tableIds);
}

@Override
public void primeChannel(ManagedChannel managedChannel) {
try {
primeChannelUnsafe(managedChannel);
} catch (IOException | RuntimeException e) {
LOG.warning(
String.format("Unexpected error while trying to prime a channel: %s", e.getMessage()));
}
}

private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException {
if (tableIds.isEmpty()) {
waitForChannelReady(managedChannel);
} else {
sendPrimeRequests(managedChannel);
}
}

private void waitForChannelReady(ManagedChannel managedChannel) {
for (int i = 0; i < 30; i++) {
ConnectivityState connectivityState = managedChannel.getState(true);
if (connectivityState == ConnectivityState.READY) {
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
break;
}
}
}

private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException {
// Wrap the channel in a temporary stub
EnhancedBigtableStubSettings primingSettings =
settingsTemplate
.toBuilder()
.setTransportChannelProvider(
FixedTransportChannelProvider.create(GrpcTransportChannel.create(managedChannel)))
.build();

try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) {
Map<String, ApiFuture<?>> primeFutures = new HashMap<>();

// Prime all of the table ids in parallel
for (String tableId : tableIds) {
ApiFuture<Row> f =
stub.readRowCallable()
.futureCall(Query.create(tableId).rowKey(PRIMING_ROW_KEY).filter(FILTERS.block()));

primeFutures.put(tableId, f);
}

// Wait for all of the prime requests to complete.
for (Map.Entry<String, ApiFuture<?>> entry : primeFutures.entrySet()) {
try {
entry.getValue().get();
} catch (Throwable e) {
if (e instanceof ExecutionException) {
e = e.getCause();
}
LOG.warning(
String.format(
"Failed to prime channel for table: %s: %s", entry.getKey(), e.getMessage()));
}
}
}
}
}
Loading

0 comments on commit 6f1ead2

Please sign in to comment.