Skip to content

Commit

Permalink
feat: Add RESOURCE_EXHAUSTED to retryable transaction codes
Browse files Browse the repository at this point in the history
Update formatting
  • Loading branch information
Vizerai committed Jun 21, 2024
1 parent d380fe8 commit e697a1e
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 183 deletions.
163 changes: 83 additions & 80 deletions spanner/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,75 +16,77 @@ limitations under the License.

package spanner

import (
"context"
"strings"
"time"
import(
"context"
"strings"
"time"

"cloud.google.com/go/internal/trace"
"github.com/golang/protobuf/ptypes"
"github.com/googleapis/gax-go/v2"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
"cloud.google.com/go/internal/trace"
"github.com/golang/protobuf/ptypes"
"github.com/googleapis/gax-go/v2"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status")

const (
retryInfoKey = "google.rpc.retryinfo-bin"
)
const(retryInfoKey = "google.rpc.retryinfo-bin")

// DefaultRetryBackoff is used for retryers as a fallback value when the server
// did not return any retry information.
var DefaultRetryBackoff = gax.Backoff{
Initial: 20 * time.Millisecond,
Max: 32 * time.Second,
Multiplier: 1.3,
}
// DefaultRetryBackoff is used for retryers as a fallback value when the
// server did not return any retry information.
var DefaultRetryBackoff =
gax.Backoff{
Initial : 20 * time.Millisecond,
Max : 32 * time.Second,
Multiplier : 1.3,
}

// spannerRetryer extends the generic gax Retryer, but also checks for any
// retry info returned by Cloud Spanner and uses that if present.
type spannerRetryer struct {
gax.Retryer
// spannerRetryer extends the generic gax Retryer, but also checks for
// any retry info returned by Cloud Spanner and uses that if present.
type spannerRetryer struct {
gax.Retryer
}

// onCodes returns a spannerRetryer that will retry on the specified error
// codes. For Internal errors, only errors that have one of a list of known
// descriptions should be retried.
func onCodes(bo gax.Backoff, cc ...codes.Code) gax.Retryer {
return &spannerRetryer{
Retryer: gax.OnCodes(cc, bo),
}
return &spannerRetryer {
Retryer:
gax.OnCodes(cc, bo),
}
}

// Retry returns the retry delay returned by Cloud Spanner if that is present.
// Otherwise it returns the retry delay calculated by the generic gax Retryer.
func (r *spannerRetryer) Retry(err error) (time.Duration, bool) {
if status.Code(err) == codes.Internal &&
!strings.Contains(err.Error(), "stream terminated by RST_STREAM") &&
// See b/25451313.
!strings.Contains(err.Error(), "HTTP/2 error code: INTERNAL_ERROR") &&
// See b/27794742.
!strings.Contains(err.Error(), "Connection closed with unknown cause") &&
!strings.Contains(err.Error(), "Received unexpected EOS on DATA frame from server") {
return 0, false
}
func(r *spannerRetryer) Retry(err error)(time.Duration, bool) {
if status
.Code(err) == codes.Internal &&
!strings.Contains(err.Error(), "stream terminated by RST_STREAM") &&
// See b/25451313.
!strings.Contains(err.Error(), "HTTP/2 error code: INTERNAL_ERROR") &&
// See b/27794742.
!strings.Contains(err.Error(),
"Connection closed with unknown cause") &&
!strings.Contains(err.Error(),
"Received unexpected EOS on DATA frame from server"){
return 0, false}

delay, shouldRetry := r.Retryer.Retry(err)
if !shouldRetry {
return 0, false
}
if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay {
delay = serverDelay
}
return delay, true
delay,
shouldRetry : = r.Retryer.Retry(err) if !shouldRetry {
return 0, false
}
if serverDelay
, hasServerDelay : = ExtractRetryDelay(err);
hasServerDelay { delay = serverDelay }
return delay, true
}

// runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound executes the given function and
// retries it if it returns an Aborted, Session not found error or certain Internal errors. The retry
// is delayed if the error was Aborted or Internal error. The delay between retries is the delay
// returned by Cloud Spanner, or if none is returned, the calculated delay with
// a minimum of 10ms and maximum of 32s. There is no delay before the retry if
// the error was Session not found or failed inline begin transaction.
// runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound executes the given
// function and retries it if it returns an Aborted, Session not found error or
// certain Internal errors. The retry is delayed if the error was Aborted or
// Internal error. The delay between retries is the delay returned by Cloud
// Spanner, or if none is returned, the calculated delay with a minimum of 10ms
// and maximum of 32s. There is no delay before the retry if the error was
// Session not found or failed inline begin transaction.
func runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound(ctx context.Context, f func(context.Context) error) error {
retryer := onCodes(DefaultRetryBackoff, codes.Aborted, codes.ResourceExhausted, codes.Internal)
funcWithRetry := func(ctx context.Context) error {
Expand Down Expand Up @@ -124,35 +126,36 @@ func runWithRetryOnAbortedOrFailedInlineBeginOrSessionNotFound(ctx context.Conte
return err
}
trace.TracePrintf(ctx, nil, "Backing off after ABORTED for %s, then retrying", delay)
if err := gax.Sleep(ctx, delay); err != nil {
return err
}
}
}
return funcWithRetry(ctx)
if err := gax.Sleep(ctx, delay);
err != nil { return err }
}
}
return funcWithRetry(ctx)
}

// ExtractRetryDelay extracts retry backoff from a *spanner.Error if present.
func ExtractRetryDelay(err error) (time.Duration, bool) {
var se *Error
var s *status.Status
// Unwrap status error.
if errorAs(err, &se) {
s = status.Convert(se.Unwrap())
} else {
s = status.Convert(err)
}
if s == nil {
return 0, false
}
for _, detail := range s.Details() {
if retryInfo, ok := detail.(*errdetails.RetryInfo); ok {
delay, err := ptypes.Duration(retryInfo.RetryDelay)
if err != nil {
return 0, false
}
return delay, true
}
}
return 0, false
func ExtractRetryDelay(err error)(time.Duration, bool) {
var se *Error var s *status.Status
// Unwrap status error.
if errorAs (err, &se) {
s = status.Convert(se.Unwrap())
}
else {
s = status.Convert(err)
}
if s
== nil { return 0, false }
for
_, detail : = range s.Details() {
if retryInfo
, ok : = detail.(*errdetails.RetryInfo);
ok {
delay,
err : = ptypes.Duration(retryInfo.RetryDelay) if err != nil {
return 0, false
}
return delay, true
}
}
return 0, false
}
Loading

0 comments on commit e697a1e

Please sign in to comment.