Skip to content

Commit

Permalink
feat(spanner): retry spanner transactions and mutations when RST_STRE…
Browse files Browse the repository at this point in the history
…AM error (#6699)

* feat(spanner): retry spanner transactions and mutations when RST_STREAM internal errors is returned from backend.

* added test for non-retryable internal error
  • Loading branch information
rahul2393 authored Sep 20, 2022
1 parent f5443e8 commit 1b56cd0
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 35 deletions.
18 changes: 18 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1579,6 +1579,24 @@ func TestClient_ApplyAtLeastOnceInvalidArgument(t *testing.T) {
}
}

func TestClient_ApplyAtLeastOnce_NonRetryableInternalErrors(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}
server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
SimulatedExecutionTime{
Errors: []error{status.Errorf(codes.Internal, "grpc: error while marshaling: string field contains invalid UTF-8")},
})
_, err := client.Apply(context.Background(), ms, ApplyAtLeastOnce())
if status.Code(err) != codes.Internal {
t.Fatalf("Error mismatch:\ngot: %v\nwant: %v", err, codes.Internal)
}
}

func TestClient_Apply_ApplyOptions(t *testing.T) {
t.Parallel()

Expand Down
6 changes: 3 additions & 3 deletions spanner/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ func (r *spannerRetryer) Retry(err error) (time.Duration, bool) {
}

// runWithRetryOnAbortedOrSessionNotFound executes the given function and
// retries it if it returns an Aborted or Session not found error. The retry
// is delayed if the error was Aborted. The delay between retries is the delay
// retries it if it returns an Aborted, Session not found error or certain Internal errors. The retry
// is delayed if the error was Aborted or Internal error. The delay between retries is the delay
// returned by Cloud Spanner, or if none is returned, the calculated delay with
// a minimum of 10ms and maximum of 32s. There is no delay before the retry if
// the error was Session not found.
func runWithRetryOnAbortedOrSessionNotFound(ctx context.Context, f func(context.Context) error) error {
retryer := onCodes(DefaultRetryBackoff, codes.Aborted)
retryer := onCodes(DefaultRetryBackoff, codes.Aborted, codes.Internal)
funcWithRetry := func(ctx context.Context) error {
for {
err := f(ctx)
Expand Down
77 changes: 45 additions & 32 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,51 +1387,64 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta
ts time.Time
sh *sessionHandle
)
defer func() {
if sh != nil {
sh.recycle()
}
}()
mPb, err := mutationsProto(ms)
if err != nil {
// Malformed mutation found, just return the error.
return ts, err
}

// Retry-loop for aborted transactions.
// TODO: Replace with generic retryer.
for {
if sh == nil || sh.getID() == "" || sh.getClient() == nil {
// No usable session for doing the commit, take one from pool.
sh, err = t.sp.take(ctx)
if err != nil {
// sessionPool.Take already retries for session
// creations/retrivals.
return ts, err
// Make a retryer for Aborted and certain Internal errors.
retryer := onCodes(DefaultRetryBackoff, codes.Aborted, codes.Internal)
// Apply the mutation and retry if the commit is aborted.
applyMutationWithRetry := func(ctx context.Context) error {
for {
if sh == nil || sh.getID() == "" || sh.getClient() == nil {
// No usable session for doing the commit, take one from pool.
sh, err = t.sp.take(ctx)
if err != nil {
// sessionPool.Take already retries for session
// creations/retrivals.
return ToSpannerError(err)
}
}
defer sh.recycle()
}
res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
Session: sh.getID(),
Transaction: &sppb.CommitRequest_SingleUseTransaction{
SingleUseTransaction: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_ReadWrite_{
ReadWrite: &sppb.TransactionOptions_ReadWrite{},
res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
Session: sh.getID(),
Transaction: &sppb.CommitRequest_SingleUseTransaction{
SingleUseTransaction: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_ReadWrite_{
ReadWrite: &sppb.TransactionOptions_ReadWrite{},
},
},
},
},
Mutations: mPb,
RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag),
})
if err != nil && !isAbortedErr(err) {
if isSessionNotFoundError(err) {
// Discard the bad session.
sh.destroy()
Mutations: mPb,
RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag),
})
if err != nil && !isAbortedErr(err) {
if isSessionNotFoundError(err) {
// Discard the bad session.
sh.destroy()
}
return toSpannerErrorWithCommitInfo(err, true)
} else if err == nil {
if tstamp := res.GetCommitTimestamp(); tstamp != nil {
ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
}
}
return ts, toSpannerErrorWithCommitInfo(err, true)
} else if err == nil {
if tstamp := res.GetCommitTimestamp(); tstamp != nil {
ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
delay, shouldRetry := retryer.Retry(err)
if !shouldRetry {
return err
}
if err := gax.Sleep(ctx, delay); err != nil {
return err
}
break
}
}
return ts, ToSpannerError(err)
return ts, applyMutationWithRetry(ctx)
}

// isAbortedErr returns true if the error indicates that an gRPC call is
Expand Down

0 comments on commit 1b56cd0

Please sign in to comment.