Skip to content

Commit

Permalink
fix: retry rst stream in mutations (#1327)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://github.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
mutianf authored Jul 29, 2022
1 parent 30e7b90 commit 1a5b3a2
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 9 deletions.
5 changes: 5 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,9 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable</className>
</difference>
<!-- InternalApi that was moved -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable</className>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.readrows;
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.InternalException;
Expand All @@ -26,14 +25,12 @@
/**
* This callable converts the "Received rst stream" exception into a retryable {@link ApiException}.
*/
@InternalApi
public final class ReadRowsConvertExceptionCallable<ReadRowsRequest, RowT>
final class ConvertExceptionCallable<ReadRowsRequest, RowT>
extends ServerStreamingCallable<ReadRowsRequest, RowT> {

private final ServerStreamingCallable<ReadRowsRequest, RowT> innerCallable;

public ReadRowsConvertExceptionCallable(
ServerStreamingCallable<ReadRowsRequest, RowT> innerCallable) {
public ConvertExceptionCallable(ServerStreamingCallable<ReadRowsRequest, RowT> innerCallable) {
this.innerCallable = innerCallable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsConvertExceptionCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
Expand Down Expand Up @@ -414,7 +413,7 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> convertException =
new ReadRowsConvertExceptionCallable<>(withStatsHeaders);
new ConvertExceptionCallable<>(withStatsHeaders);

ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(convertException, rowAdapter);
Expand Down Expand Up @@ -704,6 +703,13 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withStatsHeaders =
new StatsHeadersServerStreamingCallable<>(base);

// Sometimes MutateRows connections are disconnected via an RST frame. This error is transient
// and
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> convertException =
new ConvertExceptionCallable<>(withStatsHeaders);

RetryAlgorithm<Void> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
Expand All @@ -714,7 +720,7 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {

return new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
withStatsHeaders,
convertException,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.mutaterows;

import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.InternalException;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.common.collect.Queues;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcServerRule;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class MutateRowsRetryTest {

@Rule public GrpcServerRule serverRule = new GrpcServerRule();

private FakeBigtableService service;
private BigtableDataClient client;

private AtomicInteger attemptCounter = new AtomicInteger();

@Before
public void setUp() throws IOException {
service = new FakeBigtableService();
serverRule.getServiceRegistry().addService(service);

BigtableDataSettings.Builder settings =
BigtableDataSettings.newBuilder()
.setProjectId("fake-project")
.setInstanceId("fake-instance")
.setCredentialsProvider(NoCredentialsProvider.create());

settings
.stubSettings()
.setTransportChannelProvider(
FixedTransportChannelProvider.create(
GrpcTransportChannel.create(serverRule.getChannel())))
.build();

this.client = BigtableDataClient.create(settings.build());
}

@Test
public void testRetryRstStream() {
ApiException exception =
new InternalException(
new StatusRuntimeException(
Status.INTERNAL.withDescription(
"INTERNAL: HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream")),
GrpcStatusCode.of(Status.Code.INTERNAL),
false);

service.expectations.add(exception);

try {
client.bulkMutateRows(
BulkMutation.create("fake-table")
.add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v")));
} catch (ApiException e) {
Assert.fail("Rst stream errors should be retried");
}

Assert.assertEquals(attemptCounter.get(), 2);
}

private class FakeBigtableService extends BigtableGrpc.BigtableImplBase {
Queue<Exception> expectations = Queues.newArrayDeque();

@Override
public void mutateRows(
MutateRowsRequest request, StreamObserver<MutateRowsResponse> responseObserver) {
attemptCounter.incrementAndGet();
if (expectations.isEmpty()) {
responseObserver.onNext(MutateRowsResponse.getDefaultInstance());
responseObserver.onCompleted();
} else {
Exception expectedRpc = expectations.poll();
responseObserver.onError(expectedRpc);
}
}
}
}

0 comments on commit 1a5b3a2

Please sign in to comment.