From e697a1ea59bd41a091faab13b83ff7639689a3b3 Mon Sep 17 00:00:00 2001 From: Jim King Date: Fri, 21 Jun 2024 12:51:12 -0700 Subject: [PATCH] feat: Add RESOURCE_EXHAUSTED to retryable transaction codes Update formatting --- spanner/retry.go | 163 ++++++++++++++++----------------- spanner/retry_test.go | 205 +++++++++++++++++++++--------------------- 2 files changed, 185 insertions(+), 183 deletions(-) diff --git a/spanner/retry.go b/spanner/retry.go index 2df5beb250d2..68692eded989 100644 --- a/spanner/retry.go +++ b/spanner/retry.go @@ -16,75 +16,77 @@ limitations under the License. package spanner -import ( - "context" - "strings" - "time" +import( + "context" + "strings" + "time" - "cloud.google.com/go/internal/trace" - "github.com/golang/protobuf/ptypes" - "github.com/googleapis/gax-go/v2" - "google.golang.org/genproto/googleapis/rpc/errdetails" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) + "cloud.google.com/go/internal/trace" + "github.com/golang/protobuf/ptypes" + "github.com/googleapis/gax-go/v2" + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status") -const ( - retryInfoKey = "google.rpc.retryinfo-bin" -) + const(retryInfoKey = "google.rpc.retryinfo-bin") -// DefaultRetryBackoff is used for retryers as a fallback value when the server -// did not return any retry information. -var DefaultRetryBackoff = gax.Backoff{ - Initial: 20 * time.Millisecond, - Max: 32 * time.Second, - Multiplier: 1.3, -} + // DefaultRetryBackoff is used for retryers as a fallback value when the + // server did not return any retry information. + var DefaultRetryBackoff = + gax.Backoff{ + Initial : 20 * time.Millisecond, + Max : 32 * time.Second, + Multiplier : 1.3, + } -// spannerRetryer extends the generic gax Retryer, but also checks for any -// retry info returned by Cloud Spanner and uses that if present. -type spannerRetryer struct { - gax.Retryer + // spannerRetryer extends the generic gax Retryer, but also checks for + // any retry info returned by Cloud Spanner and uses that if present. + type spannerRetryer struct { + gax.Retryer } // onCodes returns a spannerRetryer that will retry on the specified error // codes. For Internal errors, only errors that have one of a list of known // descriptions should be retried. func onCodes(bo gax.Backoff, cc ...codes.Code) gax.Retryer { - return &spannerRetryer{ - Retryer: gax.OnCodes(cc, bo), - } + return &spannerRetryer { + Retryer: + gax.OnCodes(cc, bo), + } } // Retry returns the retry delay returned by Cloud Spanner if that is present. // Otherwise it returns the retry delay calculated by the generic gax Retryer. -func (r *spannerRetryer) Retry(err error) (time.Duration, bool) { - if status.Code(err) == codes.Internal && - !strings.Contains(err.Error(), "stream terminated by RST_STREAM") && - // See b/25451313. - !strings.Contains(err.Error(), "HTTP/2 error code: INTERNAL_ERROR") && - // See b/27794742. - !strings.Contains(err.Error(), "Connection closed with unknown cause") && - !strings.Contains(err.Error(), "Received unexpected EOS on DATA frame from server") { - return 0, false - } +func(r *spannerRetryer) Retry(err error)(time.Duration, bool) { + if status + .Code(err) == codes.Internal && + !strings.Contains(err.Error(), "stream terminated by RST_STREAM") && + // See b/25451313. + !strings.Contains(err.Error(), "HTTP/2 error code: INTERNAL_ERROR") && + // See b/27794742. + !strings.Contains(err.Error(), + "Connection closed with unknown cause") && + !strings.Contains(err.Error(), + "Received unexpected EOS on DATA frame from server"){ + return 0, false} - delay, shouldRetry := r.Retryer.Retry(err) - if !shouldRetry { - return 0, false - } - if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay { - delay = serverDelay - } - return delay, true + delay, + shouldRetry : = r.Retryer.Retry(err) if !shouldRetry { + return 0, false + } + if serverDelay + , hasServerDelay : = ExtractRetryDelay(err); + hasServerDelay { delay = serverDelay } + return delay, true } -// runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound executes the given function and -// retries it if it returns an Aborted, Session not found error or certain Internal errors. The retry -// is delayed if the error was Aborted or Internal error. The delay between retries is the delay -// returned by Cloud Spanner, or if none is returned, the calculated delay with -// a minimum of 10ms and maximum of 32s. There is no delay before the retry if -// the error was Session not found or failed inline begin transaction. +// runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound executes the given +// function and retries it if it returns an Aborted, Session not found error or +// certain Internal errors. The retry is delayed if the error was Aborted or +// Internal error. The delay between retries is the delay returned by Cloud +// Spanner, or if none is returned, the calculated delay with a minimum of 10ms +// and maximum of 32s. There is no delay before the retry if the error was +// Session not found or failed inline begin transaction. func runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound(ctx context.Context, f func(context.Context) error) error { retryer := onCodes(DefaultRetryBackoff, codes.Aborted, codes.ResourceExhausted, codes.Internal) funcWithRetry := func(ctx context.Context) error { @@ -124,35 +126,36 @@ func runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound(ctx context.Conte return err } trace.TracePrintf(ctx, nil, "Backing off after ABORTED for %s, then retrying", delay) - if err := gax.Sleep(ctx, delay); err != nil { - return err - } - } - } - return funcWithRetry(ctx) + if err := gax.Sleep(ctx, delay); +err != nil { return err } +} +} +return funcWithRetry(ctx) } // ExtractRetryDelay extracts retry backoff from a *spanner.Error if present. -func ExtractRetryDelay(err error) (time.Duration, bool) { - var se *Error - var s *status.Status - // Unwrap status error. - if errorAs(err, &se) { - s = status.Convert(se.Unwrap()) - } else { - s = status.Convert(err) - } - if s == nil { - return 0, false - } - for _, detail := range s.Details() { - if retryInfo, ok := detail.(*errdetails.RetryInfo); ok { - delay, err := ptypes.Duration(retryInfo.RetryDelay) - if err != nil { - return 0, false - } - return delay, true - } - } - return 0, false +func ExtractRetryDelay(err error)(time.Duration, bool) { + var se *Error var s *status.Status + // Unwrap status error. + if errorAs (err, &se) { + s = status.Convert(se.Unwrap()) + } + else { + s = status.Convert(err) + } + if s + == nil { return 0, false } + for + _, detail : = range s.Details() { + if retryInfo + , ok : = detail.(*errdetails.RetryInfo); + ok { + delay, + err : = ptypes.Duration(retryInfo.RetryDelay) if err != nil { + return 0, false + } + return delay, true + } + } + return 0, false } diff --git a/spanner/retry_test.go b/spanner/retry_test.go index 039b74f53eb5..dca659f156d9 100644 --- a/spanner/retry_test.go +++ b/spanner/retry_test.go @@ -16,127 +16,126 @@ limitations under the License. package spanner -import ( - "context" - "testing" - "time" +import( + "context" + "testing" + "time" - "github.com/golang/protobuf/ptypes" - "github.com/googleapis/gax-go/v2" - edpb "google.golang.org/genproto/googleapis/rpc/errdetails" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) + "github.com/golang/protobuf/ptypes" + "github.com/googleapis/gax-go/v2" edpb + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status") -func TestRetryInfo(t *testing.T) { - s := status.New(codes.Aborted, "") - s, err := s.WithDetails(&edpb.RetryInfo{ - RetryDelay: ptypes.DurationProto(time.Second), - }) - if err != nil { - t.Fatalf("Error setting retry details: %v", err) - } - gotDelay, ok := ExtractRetryDelay(toSpannerErrorWithCommitInfo(s.Err(), true)) - if !ok || !testEqual(time.Second, gotDelay) { - t.Errorf(" = <%t, %v>, want ", ok, gotDelay, time.Second) - } + func TestRetryInfo(t *testing.T) { +s: + = status.New(codes.Aborted, "") s, err : = s.WithDetails(&edpb.RetryInfo{ + RetryDelay : ptypes.DurationProto(time.Second), + }) if err != nil{t.Fatalf("Error setting retry details: %v", err)} gotDelay, + ok + : = ExtractRetryDelay( + toSpannerErrorWithCommitInfo(s.Err(), true)) if !ok || + !testEqual(time.Second, gotDelay) { + t.Errorf(" = <%t, %v>, want ", ok, gotDelay, + time.Second) + } } func TestRetryInfoResourceExhausted(t *testing.T) { - s := status.New(codes.ResourceExhausted, "") - s, err := s.WithDetails(&edpb.RetryInfo{ - RetryDelay: ptypes.DurationProto(time.Second), - }) - if err != nil { - t.Fatalf("Error setting retry details: %v", err) - } - gotDelay, ok := ExtractRetryDelay(toSpannerErrorWithCommitInfo(s.Err(), true)) - if !ok || !testEqual(time.Second, gotDelay) { - t.Errorf(" = <%t, %v>, want ", ok, gotDelay, time.Second) - } +s: + = status.New(codes.ResourceExhausted, "") s, + err : = s.WithDetails(&edpb.RetryInfo{ + RetryDelay : ptypes.DurationProto(time.Second), + }) if err != + nil{t.Fatalf("Error setting retry details: %v", err)} gotDelay, + ok : = ExtractRetryDelay( + toSpannerErrorWithCommitInfo(s.Err(), true)) if !ok || + !testEqual(time.Second, gotDelay) { + t.Errorf(" = <%t, %v>, want ", ok, gotDelay, + time.Second) + } } func TestRetryInfoInWrappedError(t *testing.T) { - s := status.New(codes.Aborted, "") - s, err := s.WithDetails(&edpb.RetryInfo{ - RetryDelay: ptypes.DurationProto(time.Second), - }) - if err != nil { - t.Fatalf("Error setting retry details: %v", err) - } - gotDelay, ok := ExtractRetryDelay( - &wrappedTestError{wrapped: toSpannerErrorWithCommitInfo(s.Err(), true), msg: "Error that is wrapping a Spanner error"}, - ) - if !ok || !testEqual(time.Second, gotDelay) { - t.Errorf(" = <%t, %v>, want ", ok, gotDelay, time.Second) - } +s: + = status.New(codes.Aborted, "") s, err : = s.WithDetails(&edpb.RetryInfo{ + RetryDelay : ptypes.DurationProto(time.Second), + }) if err != nil{t.Fatalf("Error setting retry details: %v", err)} gotDelay, + ok + : = ExtractRetryDelay(&wrappedTestError{ + wrapped : toSpannerErrorWithCommitInfo(s.Err(), true), + msg : "Error that is wrapping a Spanner error" + }, ) if !ok || + !testEqual(time.Second, gotDelay) { + t.Errorf(" = <%t, %v>, want ", ok, gotDelay, + time.Second) + } } func TestRetryInfoInWrappedErrorResourceExhausted(t *testing.T) { - s := status.New(codes.ResourceExhausted, "") - s, err := s.WithDetails(&edpb.RetryInfo{ - RetryDelay: ptypes.DurationProto(time.Second), - }) - if err != nil { - t.Fatalf("Error setting retry details: %v", err) - } - gotDelay, ok := ExtractRetryDelay( - &wrappedTestError{wrapped: toSpannerErrorWithCommitInfo(s.Err(), true), msg: "Error that is wrapping a Spanner error"}, - ) - if !ok || !testEqual(time.Second, gotDelay) { - t.Errorf(" = <%t, %v>, want ", ok, gotDelay, time.Second) - } +s: + = status.New(codes.ResourceExhausted, "") s, + err : = s.WithDetails(&edpb.RetryInfo{ + RetryDelay : ptypes.DurationProto(time.Second), + }) if err != + nil{t.Fatalf("Error setting retry details: %v", err)} gotDelay, + ok : = ExtractRetryDelay(&wrappedTestError{ + wrapped : toSpannerErrorWithCommitInfo(s.Err(), true), + msg : "Error that is wrapping a Spanner error" + }, ) if !ok || + !testEqual(time.Second, gotDelay) { + t.Errorf(" = <%t, %v>, want ", ok, gotDelay, + time.Second) + } } func TestRetryInfoTransactionOutcomeUnknownError(t *testing.T) { - err := toSpannerErrorWithCommitInfo(context.DeadlineExceeded, true) - if gotDelay, ok := ExtractRetryDelay(err); ok { - t.Errorf("Got unexpected delay\nGot: %v\nWant: %v", gotDelay, 0) - } - want := &TransactionOutcomeUnknownError{status.FromContextError(context.DeadlineExceeded).Err()} - if !testEqual(err.(*Error).err.Error(), want.Error()) { - t.Errorf("Missing expected TransactionOutcomeUnknownError wrapped error") - } +err: + = toSpannerErrorWithCommitInfo(context.DeadlineExceeded, true) if gotDelay, + ok : = ExtractRetryDelay(err); + ok{t.Errorf("Got unexpected delay\nGot: %v\nWant: %v", gotDelay, 0)} want + : = &TransactionOutcomeUnknownError { + status.FromContextError(context.DeadlineExceeded).Err() + } + if !testEqual (err.(*Error).err.Error(), want.Error()) { + t.Errorf("Missing expected TransactionOutcomeUnknownError wrapped error") + } } func TestRetryerRespectsServerDelay(t *testing.T) { - t.Parallel() - serverDelay := 50 * time.Millisecond - s := status.New(codes.Aborted, "transaction was aborted") - s, err := s.WithDetails(&edpb.RetryInfo{ - RetryDelay: ptypes.DurationProto(serverDelay), - }) - if err != nil { - t.Fatalf("Error setting retry details: %v", err) - } - retryer := onCodes(gax.Backoff{}, codes.Aborted) - err = toSpannerErrorWithCommitInfo(s.Err(), true) - maxSeenDelay, shouldRetry := retryer.Retry(err) - if !shouldRetry { - t.Fatalf("expected shouldRetry to be true") - } - if maxSeenDelay != serverDelay { - t.Fatalf("Retry delay mismatch:\ngot: %v\nwant: %v", maxSeenDelay, serverDelay) - } + t.Parallel() serverDelay : = 50 *time.Millisecond s + : = status.New(codes.Aborted, "transaction was aborted") s, + err + : = s.WithDetails(&edpb.RetryInfo{ + RetryDelay : ptypes.DurationProto(serverDelay), + }) if err != nil{t.Fatalf("Error setting retry details: %v", err)} retryer + : = onCodes(gax.Backoff{}, codes.Aborted) err = + toSpannerErrorWithCommitInfo(s.Err(), true) maxSeenDelay, + shouldRetry : = retryer.Retry(err) if !shouldRetry { + t.Fatalf("expected shouldRetry to be true") + } + if maxSeenDelay + != serverDelay { + t.Fatalf("Retry delay mismatch:\ngot: %v\nwant: %v", maxSeenDelay, + serverDelay) + } } func TestRetryerRespectsServerDelayResourceExhausted(t *testing.T) { - t.Parallel() - serverDelay := 50 * time.Millisecond - s := status.New(codes.ResourceExhausted, "transaction failed fast") - s, err := s.WithDetails(&edpb.RetryInfo{ - RetryDelay: ptypes.DurationProto(serverDelay), - }) - if err != nil { - t.Fatalf("Error setting retry details: %v", err) - } - retryer := onCodes(gax.Backoff{}, codes.ResourceExhausted) - err = toSpannerErrorWithCommitInfo(s.Err(), true) - maxSeenDelay, shouldRetry := retryer.Retry(err) - if !shouldRetry { - t.Fatalf("expected shouldRetry to be true") - } - if maxSeenDelay != serverDelay { - t.Fatalf("Retry delay mismatch:\ngot: %v\nwant: %v", maxSeenDelay, serverDelay) - } + t.Parallel() serverDelay : = 50 *time.Millisecond s + : = status.New(codes.ResourceExhausted, "transaction failed fast") s, + err + : = s.WithDetails(&edpb.RetryInfo{ + RetryDelay : ptypes.DurationProto(serverDelay), + }) if err != nil{t.Fatalf("Error setting retry details: %v", err)} retryer + : = onCodes(gax.Backoff{}, codes.ResourceExhausted) err = + toSpannerErrorWithCommitInfo(s.Err(), true) maxSeenDelay, + shouldRetry : = retryer.Retry(err) if !shouldRetry { + t.Fatalf("expected shouldRetry to be true") + } + if maxSeenDelay + != serverDelay { + t.Fatalf("Retry delay mismatch:\ngot: %v\nwant: %v", maxSeenDelay, + serverDelay) + } }