Skip to content

Commit

Permalink
feat: use PingAndWarm request for channel priming
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Feb 18, 2022
1 parent 9cc6fbc commit ef347fd
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,11 @@ public boolean isRefreshingChannel() {
/**
* Gets the table ids that will be used to send warmup requests when {@link
* #isRefreshingChannel()} is enabled.
*
* @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up
* requests will be sent to all table ids of the instance.
*/
@BetaApi("Channel priming is not currently stable and may change in the future")
@Deprecated
public List<String> getPrimingTableIds() {
return stubSettings.getPrimedTableIds();
}
Expand Down Expand Up @@ -377,13 +380,10 @@ public boolean isRefreshingChannel() {
}

/**
* Configure the tables that can be used to prime a channel during a refresh.
*
* <p>These tables work in conjunction with {@link #setRefreshingChannel(boolean)}. When a
* channel is refreshed, it will send a request to each table to warm up the serverside caches
* before admitting the new channel into the channel pool.
* @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up
* requests will be sent to all table ids of the instance.
*/
@BetaApi("Channel priming is not currently stable and may change in the future")
@Deprecated
public Builder setPrimingTableIds(String... tableIds) {
stubSettings.setPrimedTableIds(tableIds);
return this;
Expand All @@ -392,8 +392,11 @@ public Builder setPrimingTableIds(String... tableIds) {
/**
* Gets the table ids that will be used to send warmup requests when {@link
* #setRefreshingChannel(boolean)} is enabled.
*
* @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up
* requests will be sent to all table ids of the instance.
*/
@BetaApi("Channel priming is not currently stable and may change in the future")
@Deprecated
public List<String> getPrimingTableIds() {
return stubSettings.getPrimedTableIds();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,20 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.grpc.ChannelPrimer;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.auth.Credentials;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.RowFilter;
import com.google.bigtable.v2.RowSet;
import com.google.bigtable.v2.TableName;
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.bigtable.v2.InstanceName;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.threeten.bp.Duration;

Expand All @@ -58,14 +47,9 @@ class BigtableChannelPrimer implements ChannelPrimer {
private static Duration PRIME_REQUEST_TIMEOUT = Duration.ofSeconds(30);

private final EnhancedBigtableStubSettings settingsTemplate;
private final List<String> tableIds;

static BigtableChannelPrimer create(
Credentials credentials,
String projectId,
String instanceId,
String appProfileId,
List<String> tableIds) {
Credentials credentials, String projectId, String instanceId, String appProfileId) {
EnhancedBigtableStubSettings.Builder builder =
EnhancedBigtableStubSettings.newBuilder()
.setProjectId(projectId)
Expand All @@ -75,28 +59,12 @@ static BigtableChannelPrimer create(
.setExecutorProvider(
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build());

// Disable retries for priming request
builder
.readRowSettings()
.setRetrySettings(
builder
.readRowSettings()
.getRetrySettings()
.toBuilder()
.setMaxAttempts(1)
.setJittered(false)
.setInitialRpcTimeout(PRIME_REQUEST_TIMEOUT)
.setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT)
.setTotalTimeout(PRIME_REQUEST_TIMEOUT)
.build());
return new BigtableChannelPrimer(builder.build(), tableIds);
return new BigtableChannelPrimer(builder.build());
}

private BigtableChannelPrimer(
EnhancedBigtableStubSettings settingsTemplate, List<String> tableIds) {
private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate) {
Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null");
this.settingsTemplate = settingsTemplate;
this.tableIds = ImmutableList.copyOf(tableIds);
}

@Override
Expand All @@ -110,25 +78,7 @@ public void primeChannel(ManagedChannel managedChannel) {
}

private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException {
if (tableIds.isEmpty()) {
waitForChannelReady(managedChannel);
} else {
sendPrimeRequests(managedChannel);
}
}

private void waitForChannelReady(ManagedChannel managedChannel) {
for (int i = 0; i < 30; i++) {
ConnectivityState connectivityState = managedChannel.getState(true);
if (connectivityState == ConnectivityState.READY) {
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
break;
}
}
sendPrimeRequests(managedChannel);
}

private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException {
Expand All @@ -141,41 +91,21 @@ private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException
.build();

try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) {
Map<String, ApiFuture<?>> primeFutures = new HashMap<>();

// Prime all of the table ids in parallel
for (String tableId : tableIds) {
ApiFuture<Row> f =
stub.createReadRowsRawCallable(new DefaultRowAdapter())
.first()
.futureCall(
ReadRowsRequest.newBuilder()
.setTableName(
TableName.format(
primingSettings.getProjectId(),
primingSettings.getInstanceId(),
tableId))
.setAppProfileId(primingSettings.getAppProfileId())
.setRows(RowSet.newBuilder().addRowKeys(PRIMING_ROW_KEY).build())
.setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build())
.setRowsLimit(1)
.build());
PingAndWarmRequest request =
PingAndWarmRequest.newBuilder()
.setName(
InstanceName.format(
primingSettings.getProjectId(), primingSettings.getInstanceId()))
.setAppProfileId(primingSettings.getAppProfileId())
.build();

primeFutures.put(tableId, f);
}

// Wait for all of the prime requests to complete.
for (Map.Entry<String, ApiFuture<?>> entry : primeFutures.entrySet()) {
try {
entry.getValue().get();
} catch (Throwable e) {
if (e instanceof ExecutionException) {
e = e.getCause();
}
LOG.warning(
String.format(
"Failed to prime channel for table: %s: %s", entry.getKey(), e.getMessage()));
try {
stub.pingAndWarmCallable().call(request);
} catch (Throwable e) {
if (e instanceof ExecutionException) {
e = e.getCause();
}
LOG.warning(String.format("Failed to prime channel: %s", e));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
Expand Down Expand Up @@ -103,6 +105,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -140,6 +143,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;

public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
throws IOException {
Expand Down Expand Up @@ -180,8 +184,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
credentials,
settings.getProjectId(),
settings.getInstanceId(),
settings.getAppProfileId(),
settings.getPrimedTableIds()))
settings.getAppProfileId()))
.build());
}

Expand Down Expand Up @@ -276,6 +279,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext
bulkMutateRowsCallable = createBulkMutateRowsCallable();
checkAndMutateRowCallable = createCheckAndMutateRowCallable();
readModifyWriteRowCallable = createReadModifyWriteRowCallable();
pingAndWarmCallable = createPingAndWarmCallable();
}

// <editor-fold desc="Callable creators">
Expand Down Expand Up @@ -801,6 +805,23 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarm =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<PingAndWarmRequest, PingAndWarmResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getPingAndWarmMethod())
.setParamsExtractor(
new RequestParamsExtractor<PingAndWarmRequest>() {
@Override
public Map<String, String> extract(PingAndWarmRequest request) {
return ImmutableMap.of("app_profile_id", request.getAppProfileId());
}
})
.build(),
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext());
}
// </editor-fold>

// <editor-fold desc="Callable accessors">
Expand Down Expand Up @@ -845,6 +866,10 @@ public UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable(
public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
return readModifyWriteRowCallable;
}

UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable() {
return pingAndWarmCallable;
}
// </editor-fold>

private SpanName getSpanName(String methodName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,13 @@ public boolean isRefreshingChannel() {
return isRefreshingChannel;
}

/** Gets the tables that will be primed during a channel refresh. */
@BetaApi("Channel priming is not currently stable and might change in the future")
/**
* Gets the tables that will be primed during a channel refresh.
*
* @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up
* requests will be sent to all table ids of the instance.
*/
@Deprecated
public List<String> getPrimedTableIds() {
return primedTableIds;
}
Expand Down Expand Up @@ -831,8 +836,7 @@ public EnhancedBigtableStubSettings build() {
// Use shared credentials
this.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
channelProviderBuilder.setChannelPrimer(
BigtableChannelPrimer.create(
credentials, projectId, instanceId, appProfileId, primedTableIds));
BigtableChannelPrimer.create(credentials, projectId, instanceId, appProfileId));
this.setTransportChannelProvider(channelProviderBuilder.build());
}
return new EnhancedBigtableStubSettings(this);
Expand Down
Loading

0 comments on commit ef347fd

Please sign in to comment.