Skip to content

Commit

Permalink
feat: use PingAndWarm request for channel priming (#1179)
Browse files Browse the repository at this point in the history
Switching channel priming from sending fake ReadRowsRequest to PingAndWarm request, which on the server side will list all the tables for an instance. In the settings we won't need to specify the table Ids to prime.
  • Loading branch information
mutianf authored Jul 19, 2022
1 parent 2aa490c commit 6629821
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,10 @@ public boolean isRefreshingChannel() {
}

/**
* Gets the table ids that will be used to send warmup requests when {@link
* #isRefreshingChannel()} is enabled.
* @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up
* requests will be sent to all table ids of the instance.
*/
@BetaApi("Channel priming is not currently stable and may change in the future")
@Deprecated
public List<String> getPrimingTableIds() {
return stubSettings.getPrimedTableIds();
}
Expand Down Expand Up @@ -377,23 +377,20 @@ public boolean isRefreshingChannel() {
}

/**
* Configure the tables that can be used to prime a channel during a refresh.
*
* <p>These tables work in conjunction with {@link #setRefreshingChannel(boolean)}. When a
* channel is refreshed, it will send a request to each table to warm up the serverside caches
* before admitting the new channel into the channel pool.
* @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up
* requests will be sent to all table ids of the instance.
*/
@BetaApi("Channel priming is not currently stable and may change in the future")
@Deprecated
public Builder setPrimingTableIds(String... tableIds) {
stubSettings.setPrimedTableIds(tableIds);
return this;
}

/**
* Gets the table ids that will be used to send warmup requests when {@link
* #setRefreshingChannel(boolean)} is enabled.
* @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up
* requests will be sent to all table ids of the instance.
*/
@BetaApi("Channel priming is not currently stable and may change in the future")
@Deprecated
public List<String> getPrimingTableIds() {
return stubSettings.getPrimedTableIds();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,20 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.grpc.ChannelPrimer;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.auth.Credentials;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.RowFilter;
import com.google.bigtable.v2.RowSet;
import com.google.bigtable.v2.TableName;
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.threeten.bp.Duration;

/**
* A channel warmer that ensures that a Bigtable channel is ready to be used before being added to
Expand All @@ -54,18 +41,10 @@
class BigtableChannelPrimer implements ChannelPrimer {
private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString());

static ByteString PRIMING_ROW_KEY = ByteString.copyFromUtf8("nonexistent-priming-row");
private static Duration PRIME_REQUEST_TIMEOUT = Duration.ofSeconds(30);

private final EnhancedBigtableStubSettings settingsTemplate;
private final List<String> tableIds;

static BigtableChannelPrimer create(
Credentials credentials,
String projectId,
String instanceId,
String appProfileId,
List<String> tableIds) {
Credentials credentials, String projectId, String instanceId, String appProfileId) {
EnhancedBigtableStubSettings.Builder builder =
EnhancedBigtableStubSettings.newBuilder()
.setProjectId(projectId)
Expand All @@ -75,28 +54,12 @@ static BigtableChannelPrimer create(
.setExecutorProvider(
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build());

// Disable retries for priming request
builder
.readRowSettings()
.setRetrySettings(
builder
.readRowSettings()
.getRetrySettings()
.toBuilder()
.setMaxAttempts(1)
.setJittered(false)
.setInitialRpcTimeout(PRIME_REQUEST_TIMEOUT)
.setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT)
.setTotalTimeout(PRIME_REQUEST_TIMEOUT)
.build());
return new BigtableChannelPrimer(builder.build(), tableIds);
return new BigtableChannelPrimer(builder.build());
}

private BigtableChannelPrimer(
EnhancedBigtableStubSettings settingsTemplate, List<String> tableIds) {
private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate) {
Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null");
this.settingsTemplate = settingsTemplate;
this.tableIds = ImmutableList.copyOf(tableIds);
}

@Override
Expand All @@ -110,25 +73,7 @@ public void primeChannel(ManagedChannel managedChannel) {
}

private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException {
if (tableIds.isEmpty()) {
waitForChannelReady(managedChannel);
} else {
sendPrimeRequests(managedChannel);
}
}

private void waitForChannelReady(ManagedChannel managedChannel) {
for (int i = 0; i < 30; i++) {
ConnectivityState connectivityState = managedChannel.getState(true);
if (connectivityState == ConnectivityState.READY) {
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
break;
}
}
sendPrimeRequests(managedChannel);
}

private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException {
Expand All @@ -141,41 +86,24 @@ private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException
.build();

try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) {
Map<String, ApiFuture<?>> primeFutures = new HashMap<>();

// Prime all of the table ids in parallel
for (String tableId : tableIds) {
ApiFuture<Row> f =
stub.createReadRowsRawCallable(new DefaultRowAdapter())
.first()
.futureCall(
ReadRowsRequest.newBuilder()
.setTableName(
TableName.format(
primingSettings.getProjectId(),
primingSettings.getInstanceId(),
tableId))
.setAppProfileId(primingSettings.getAppProfileId())
.setRows(RowSet.newBuilder().addRowKeys(PRIMING_ROW_KEY).build())
.setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build())
.setRowsLimit(1)
.build());
PingAndWarmRequest request =
PingAndWarmRequest.newBuilder()
.setName(
NameUtil.formatInstanceName(
primingSettings.getProjectId(), primingSettings.getInstanceId()))
.setAppProfileId(primingSettings.getAppProfileId())
.build();

primeFutures.put(tableId, f);
}

// Wait for all of the prime requests to complete.
for (Map.Entry<String, ApiFuture<?>> entry : primeFutures.entrySet()) {
try {
entry.getValue().get();
} catch (Throwable e) {
if (e instanceof ExecutionException) {
e = e.getCause();
}
LOG.warning(
String.format(
"Failed to prime channel for table: %s: %s", entry.getKey(), e.getMessage()));
try {
stub.pingAndWarmCallable().call(request);
} catch (Throwable e) {
// TODO: Not sure if we should swallow the error here. We are pre-emptively swapping
// channels if the new
// channel is bad.
if (e instanceof ExecutionException) {
e = e.getCause();
}
LOG.warning(String.format("Failed to prime channel: %s", e));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
Expand Down Expand Up @@ -104,6 +106,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -141,6 +144,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;

public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
throws IOException {
Expand Down Expand Up @@ -181,8 +185,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
credentials,
settings.getProjectId(),
settings.getInstanceId(),
settings.getAppProfileId(),
settings.getPrimedTableIds()))
settings.getAppProfileId()))
.build());
}

Expand Down Expand Up @@ -284,6 +287,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext
bulkMutateRowsCallable = createBulkMutateRowsCallable();
checkAndMutateRowCallable = createCheckAndMutateRowCallable();
readModifyWriteRowCallable = createReadModifyWriteRowCallable();
pingAndWarmCallable = createPingAndWarmCallable();
}

// <editor-fold desc="Callable creators">
Expand Down Expand Up @@ -810,6 +814,25 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarm =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<PingAndWarmRequest, PingAndWarmResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getPingAndWarmMethod())
.setParamsExtractor(
new RequestParamsExtractor<PingAndWarmRequest>() {
@Override
public Map<String, String> extract(PingAndWarmRequest request) {
return ImmutableMap.of(
"name", request.getName(),
"app_profile_id", request.getAppProfileId());
}
})
.build(),
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext());
}
// </editor-fold>

// <editor-fold desc="Callable accessors">
Expand Down Expand Up @@ -854,6 +877,10 @@ public UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable(
public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
return readModifyWriteRowCallable;
}

UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable() {
return pingAndWarmCallable;
}
// </editor-fold>

private SpanName getSpanName(String methodName) {
Expand Down
Loading

0 comments on commit 6629821

Please sign in to comment.