From 1a5b3a215b5388678241cadec26a962a512157ac Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 29 Jul 2022 15:04:11 -0400 Subject: [PATCH] fix: retry rst stream in mutations (#1327) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # ☕️ If you write sample code, please follow the [samples format]( https://github.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md). --- .../clirr-ignored-differences.xml | 5 + ...ble.java => ConvertExceptionCallable.java} | 9 +- .../data/v2/stub/EnhancedBigtableStub.java | 12 +- .../stub/mutaterows/MutateRowsRetryTest.java | 116 ++++++++++++++++++ 4 files changed, 133 insertions(+), 9 deletions(-) rename google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/{readrows/ReadRowsConvertExceptionCallable.java => ConvertExceptionCallable.java} (90%) create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml index 588327d0de..8a3edd69c0 100644 --- a/google-cloud-bigtable/clirr-ignored-differences.xml +++ b/google-cloud-bigtable/clirr-ignored-differences.xml @@ -71,4 +71,9 @@ 8001 com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable + + + 8001 + com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable + diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ConvertExceptionCallable.java similarity index 90% rename from google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable.java rename to google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ConvertExceptionCallable.java index 0c58f66441..ed50532fae 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ConvertExceptionCallable.java @@ -13,9 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.google.cloud.bigtable.data.v2.stub.readrows; +package com.google.cloud.bigtable.data.v2.stub; -import com.google.api.core.InternalApi; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.InternalException; @@ -26,14 +25,12 @@ /** * This callable converts the "Received rst stream" exception into a retryable {@link ApiException}. */ -@InternalApi -public final class ReadRowsConvertExceptionCallable +final class ConvertExceptionCallable extends ServerStreamingCallable { private final ServerStreamingCallable innerCallable; - public ReadRowsConvertExceptionCallable( - ServerStreamingCallable innerCallable) { + public ConvertExceptionCallable(ServerStreamingCallable innerCallable) { this.innerCallable = innerCallable; } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 1550127e23..301ecd66b5 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -86,7 +86,6 @@ import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor; -import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsConvertExceptionCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; @@ -414,7 +413,7 @@ public Map extract(ReadRowsRequest readRowsRequest) { // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code // which by default is not retryable. Convert the exception so it can be retried in the client. ServerStreamingCallable convertException = - new ReadRowsConvertExceptionCallable<>(withStatsHeaders); + new ConvertExceptionCallable<>(withStatsHeaders); ServerStreamingCallable merging = new RowMergingCallable<>(convertException, rowAdapter); @@ -704,6 +703,13 @@ public Map extract(MutateRowsRequest mutateRowsRequest) { ServerStreamingCallable withStatsHeaders = new StatsHeadersServerStreamingCallable<>(base); + // Sometimes MutateRows connections are disconnected via an RST frame. This error is transient + // and + // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code + // which by default is not retryable. Convert the exception so it can be retried in the client. + ServerStreamingCallable convertException = + new ConvertExceptionCallable<>(withStatsHeaders); + RetryAlgorithm retryAlgorithm = new RetryAlgorithm<>( new ApiResultRetryAlgorithm(), @@ -714,7 +720,7 @@ public Map extract(MutateRowsRequest mutateRowsRequest) { return new MutateRowsRetryingCallable( clientContext.getDefaultCallContext(), - withStatsHeaders, + convertException, retryingExecutor, settings.bulkMutateRowsSettings().getRetryableCodes()); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java new file mode 100644 index 0000000000..19ab6413a0 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java @@ -0,0 +1,116 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.mutaterows; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.InternalException; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsResponse; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.BulkMutation; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import com.google.common.collect.Queues; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class MutateRowsRetryTest { + + @Rule public GrpcServerRule serverRule = new GrpcServerRule(); + + private FakeBigtableService service; + private BigtableDataClient client; + + private AtomicInteger attemptCounter = new AtomicInteger(); + + @Before + public void setUp() throws IOException { + service = new FakeBigtableService(); + serverRule.getServiceRegistry().addService(service); + + BigtableDataSettings.Builder settings = + BigtableDataSettings.newBuilder() + .setProjectId("fake-project") + .setInstanceId("fake-instance") + .setCredentialsProvider(NoCredentialsProvider.create()); + + settings + .stubSettings() + .setTransportChannelProvider( + FixedTransportChannelProvider.create( + GrpcTransportChannel.create(serverRule.getChannel()))) + .build(); + + this.client = BigtableDataClient.create(settings.build()); + } + + @Test + public void testRetryRstStream() { + ApiException exception = + new InternalException( + new StatusRuntimeException( + Status.INTERNAL.withDescription( + "INTERNAL: HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream")), + GrpcStatusCode.of(Status.Code.INTERNAL), + false); + + service.expectations.add(exception); + + try { + client.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v"))); + } catch (ApiException e) { + Assert.fail("Rst stream errors should be retried"); + } + + Assert.assertEquals(attemptCounter.get(), 2); + } + + private class FakeBigtableService extends BigtableGrpc.BigtableImplBase { + Queue expectations = Queues.newArrayDeque(); + + @Override + public void mutateRows( + MutateRowsRequest request, StreamObserver responseObserver) { + attemptCounter.incrementAndGet(); + if (expectations.isEmpty()) { + responseObserver.onNext(MutateRowsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + Exception expectedRpc = expectations.poll(); + responseObserver.onError(expectedRpc); + } + } + } +}