From 1b56cd0ec31bc32362259fc722907e092bae081a Mon Sep 17 00:00:00 2001 From: rahul2393 Date: Tue, 20 Sep 2022 19:01:31 +0530 Subject: [PATCH] feat(spanner): retry spanner transactions and mutations when RST_STREAM error (#6699) * feat(spanner): retry spanner transactions and mutations when RST_STREAM internal errors is returned from backend. * added test for non-retryable internal error --- spanner/client_test.go | 18 ++++++++++ spanner/retry.go | 6 ++-- spanner/transaction.go | 77 ++++++++++++++++++++++++------------------ 3 files changed, 66 insertions(+), 35 deletions(-) diff --git a/spanner/client_test.go b/spanner/client_test.go index 31e4d5f92341..2f1ea80ff9c9 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -1579,6 +1579,24 @@ func TestClient_ApplyAtLeastOnceInvalidArgument(t *testing.T) { } } +func TestClient_ApplyAtLeastOnce_NonRetryableInternalErrors(t *testing.T) { + t.Parallel() + server, client, teardown := setupMockedTestServer(t) + defer teardown() + ms := []*Mutation{ + Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), + Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), + } + server.TestSpanner.PutExecutionTime(MethodCommitTransaction, + SimulatedExecutionTime{ + Errors: []error{status.Errorf(codes.Internal, "grpc: error while marshaling: string field contains invalid UTF-8")}, + }) + _, err := client.Apply(context.Background(), ms, ApplyAtLeastOnce()) + if status.Code(err) != codes.Internal { + t.Fatalf("Error mismatch:\ngot: %v\nwant: %v", err, codes.Internal) + } +} + func TestClient_Apply_ApplyOptions(t *testing.T) { t.Parallel() diff --git a/spanner/retry.go b/spanner/retry.go index fd127e8dfef0..d16cff5bf613 100644 --- a/spanner/retry.go +++ b/spanner/retry.go @@ -80,13 +80,13 @@ func (r *spannerRetryer) Retry(err error) (time.Duration, bool) { } // runWithRetryOnAbortedOrSessionNotFound executes the given function and -// retries it if it returns an Aborted or Session not found error. The retry -// is delayed if the error was Aborted. The delay between retries is the delay +// retries it if it returns an Aborted, Session not found error or certain Internal errors. The retry +// is delayed if the error was Aborted or Internal error. The delay between retries is the delay // returned by Cloud Spanner, or if none is returned, the calculated delay with // a minimum of 10ms and maximum of 32s. There is no delay before the retry if // the error was Session not found. func runWithRetryOnAbortedOrSessionNotFound(ctx context.Context, f func(context.Context) error) error { - retryer := onCodes(DefaultRetryBackoff, codes.Aborted) + retryer := onCodes(DefaultRetryBackoff, codes.Aborted, codes.Internal) funcWithRetry := func(ctx context.Context) error { for { err := f(ctx) diff --git a/spanner/transaction.go b/spanner/transaction.go index bf1e10944229..4fec6d7efce1 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -1387,51 +1387,64 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta ts time.Time sh *sessionHandle ) + defer func() { + if sh != nil { + sh.recycle() + } + }() mPb, err := mutationsProto(ms) if err != nil { // Malformed mutation found, just return the error. return ts, err } - // Retry-loop for aborted transactions. - // TODO: Replace with generic retryer. - for { - if sh == nil || sh.getID() == "" || sh.getClient() == nil { - // No usable session for doing the commit, take one from pool. - sh, err = t.sp.take(ctx) - if err != nil { - // sessionPool.Take already retries for session - // creations/retrivals. - return ts, err + // Make a retryer for Aborted and certain Internal errors. + retryer := onCodes(DefaultRetryBackoff, codes.Aborted, codes.Internal) + // Apply the mutation and retry if the commit is aborted. + applyMutationWithRetry := func(ctx context.Context) error { + for { + if sh == nil || sh.getID() == "" || sh.getClient() == nil { + // No usable session for doing the commit, take one from pool. + sh, err = t.sp.take(ctx) + if err != nil { + // sessionPool.Take already retries for session + // creations/retrivals. + return ToSpannerError(err) + } } - defer sh.recycle() - } - res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ - Session: sh.getID(), - Transaction: &sppb.CommitRequest_SingleUseTransaction{ - SingleUseTransaction: &sppb.TransactionOptions{ - Mode: &sppb.TransactionOptions_ReadWrite_{ - ReadWrite: &sppb.TransactionOptions_ReadWrite{}, + res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ + Session: sh.getID(), + Transaction: &sppb.CommitRequest_SingleUseTransaction{ + SingleUseTransaction: &sppb.TransactionOptions{ + Mode: &sppb.TransactionOptions_ReadWrite_{ + ReadWrite: &sppb.TransactionOptions_ReadWrite{}, + }, }, }, - }, - Mutations: mPb, - RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag), - }) - if err != nil && !isAbortedErr(err) { - if isSessionNotFoundError(err) { - // Discard the bad session. - sh.destroy() + Mutations: mPb, + RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag), + }) + if err != nil && !isAbortedErr(err) { + if isSessionNotFoundError(err) { + // Discard the bad session. + sh.destroy() + } + return toSpannerErrorWithCommitInfo(err, true) + } else if err == nil { + if tstamp := res.GetCommitTimestamp(); tstamp != nil { + ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) + } } - return ts, toSpannerErrorWithCommitInfo(err, true) - } else if err == nil { - if tstamp := res.GetCommitTimestamp(); tstamp != nil { - ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) + delay, shouldRetry := retryer.Retry(err) + if !shouldRetry { + return err + } + if err := gax.Sleep(ctx, delay); err != nil { + return err } - break } } - return ts, ToSpannerError(err) + return ts, applyMutationWithRetry(ctx) } // isAbortedErr returns true if the error indicates that an gRPC call is