Skip to content

Commit

Permalink
spanner: Retry "Session not found" for read-only transactions
Browse files Browse the repository at this point in the history
Session not found errors should be retried by taking a new session from
the pool and retrying the gRPC call when that is possible. This change
fixes this for multi-use read-only transactions when the error occurs
on the BeginTransaction call. This method can safely be retried on a
new session, as the user has not yet been able to execute any queries
on the transaction yet. This is also the most probable moment for a
Session not found error to occur for a multi-use read-only transaction.

A Session not found error halfway through a read-only multi use
transaction could in theory be mitigated by starting a new read-only
transaction on a new session with the same read timestamp as the
transaction that produced the error, retry the query that returned the
error on the new transaction and update the internal transaction reference
to ensure any future queries that the user executes on the transaction
will use the new transaction. This is not included in this change.

Updates #1527.

Change-Id: Ibc48e558bf07e8066996c6aaad864c4450abae66
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/44051
Reviewed-by: kokoro <[email protected]>
Reviewed-by: Emmanuel Odeke <[email protected]>
  • Loading branch information
olavloite committed Sep 9, 2019
1 parent 89c7d46 commit 5d17c75
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 57 deletions.
43 changes: 23 additions & 20 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,20 @@ func TestClient_Single_InvalidArgument(t *testing.T) {
}

func testSingleQuery(t *testing.T, serverError error) error {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
if serverError != nil {
server.TestSpanner.SetError(serverError)
}
iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
tx := client.Single()
return executeTestQuery(t, tx)
}

func executeTestQuery(t *testing.T, tx *ReadOnlyTransaction) error {
ctx := context.Background()
iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
var rowCount int64
for {
row, err := iter.Next()
if err == iterator.Done {
Expand All @@ -133,6 +139,10 @@ func testSingleQuery(t *testing.T, serverError error) error {
if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
return err
}
rowCount++
}
if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
t.Fatalf("Row count mismatch\ngot: %v\nwant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
}
return nil
}
Expand Down Expand Up @@ -200,6 +210,16 @@ func TestClient_ReadOnlyTransaction_UnavailableOnCreateSessionAndInvalidArgument
}
}

func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction(t *testing.T) {
t.Parallel()
exec := map[string]SimulatedExecutionTime{
MethodBeginTransaction: {Errors: []error{gstatus.Error(codes.NotFound, "Session not found")}},
}
if err := testReadOnlyTransaction(t, exec); err != nil {
t.Fatal(err)
}
}

func testReadOnlyTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime) error {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
Expand All @@ -208,24 +228,7 @@ func testReadOnlyTransaction(t *testing.T, executionTimes map[string]SimulatedEx
}
tx := client.ReadOnlyTransaction()
defer tx.Close()
ctx := context.Background()
iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
for {
row, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
var singerID, albumID int64
var albumTitle string
if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
return err
}
}
return nil
return executeTestQuery(t, tx)
}

func TestClient_ReadWriteTransaction(t *testing.T) {
Expand Down
9 changes: 3 additions & 6 deletions spanner/internal/testutil/inmem_spanner_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const (
MethodGetSession string = "GET_SESSION"
MethodExecuteSql string = "EXECUTE_SQL"
MethodExecuteStreamingSql string = "EXECUTE_STREAMING_SQL"
MethodStreamingRead string = "EXECUTE_STREAMING_READ"
)

// StatementResult represents a mocked result on the test server. Th result can
Expand Down Expand Up @@ -703,13 +704,9 @@ func (s *inMemSpannerServer) Read(ctx context.Context, req *spannerpb.ReadReques
}

func (s *inMemSpannerServer) StreamingRead(req *spannerpb.ReadRequest, stream spannerpb.Spanner_StreamingReadServer) error {
s.mu.Lock()
if s.stopped {
s.mu.Unlock()
return gstatus.Error(codes.Unavailable, "server has been stopped")
if err := s.simulateExecutionTime(MethodStreamingRead, req); err != nil {
return err
}
s.receivedRequests <- req
s.mu.Unlock()
return gstatus.Error(codes.Unimplemented, "Method not yet implemented")
}

Expand Down
45 changes: 34 additions & 11 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,18 +363,41 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
sh.recycle()
}
}()
sh, err = t.sp.take(ctx)
if err != nil {
return err
}
res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_ReadOnly_{
ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
},

// Create transaction options.
readOnlyOptions := buildTransactionOptionsReadOnly(t.getTimestampBound(), true)
transactionOptions := &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_ReadOnly_{
ReadOnly: readOnlyOptions,
},
})
}
// Retry TakeSession and BeginTransaction on Session not found.
retryOnNotFound := gax.OnCodes([]codes.Code{codes.NotFound}, gax.Backoff{})
beginTxWithRetry := func(ctx context.Context) (*sppb.Transaction, error) {
for {
sh, err = t.sp.take(ctx)
if err != nil {
return nil, err
}
client := sh.getClient()
ctx := contextWithOutgoingMetadata(ctx, sh.getMetadata())
res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: transactionOptions,
})
if err == nil {
return res, nil
}
// We should not wait before retrying.
if _, shouldRetry := retryOnNotFound.Retry(err); !shouldRetry {
return nil, err
}
// Delete session and then retry with a new one.
sh.destroy()
}
}

res, err := beginTxWithRetry(ctx)
if err == nil {
tx = res.Id
if res.ReadTimestamp != nil {
Expand Down
58 changes: 38 additions & 20 deletions spanner/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,46 +195,64 @@ func TestApply_RetryOnAbort(t *testing.T) {
}
}

// Tests that NotFound errors cause failures, and aren't retried.
// Tests that NotFound errors cause failures, and aren't retried, except for
// BeginTransaction.
func TestTransaction_NotFound(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()

wantErr := spannerErrorf(codes.NotFound, "Session not found")
errSessionNotFound := spannerErrorf(codes.NotFound, "Session not found")
// BeginTransaction should retry automatically.
server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
SimulatedExecutionTime{
Errors: []error{wantErr, wantErr, wantErr},
Errors: []error{errSessionNotFound},
})
server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
SimulatedExecutionTime{
Errors: []error{wantErr, wantErr, wantErr},
})

txn := client.ReadOnlyTransaction()
defer txn.Close()

if _, _, got := txn.acquire(ctx); !testEqual(wantErr, got) {
t.Fatalf("Expect acquire to fail, got %v, want %v.", got, wantErr)
if _, _, got := txn.acquire(ctx); got != nil {
t.Fatalf("Expect acquire to succeed, got %v, want nil.", got)
}
txn.Close()

// The failure should recycle the session, we expect it to be used in
// following requests.
if got := txn.Query(ctx, NewStatement("SELECT 1")); !testEqual(wantErr, got.err) {
t.Fatalf("Expect Query to fail, got %v, want %v.", got.err, wantErr)
// Query should fail with Session not found.
server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
SimulatedExecutionTime{
Errors: []error{errSessionNotFound},
})
txn = client.ReadOnlyTransaction()
iter := txn.Query(ctx, NewStatement("SELECT 1"))
_, got := iter.Next()
if !testEqual(errSessionNotFound, got) {
t.Fatalf("Expect Query to fail\ngot: %v\nwant: %v", got, errSessionNotFound)
}
iter.Stop()

if got := txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"}); !testEqual(wantErr, got.err) {
t.Fatalf("Expect Read to fail, got %v, want %v.", got.err, wantErr)
// Read should fail with Session not found.
server.TestSpanner.PutExecutionTime(MethodStreamingRead,
SimulatedExecutionTime{
Errors: []error{errSessionNotFound},
})
txn = client.ReadOnlyTransaction()
iter = txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"})
_, got = iter.Next()
if !testEqual(errSessionNotFound, got) {
t.Fatalf("Expect Read to fail\ngot: %v\nwant: %v", got, errSessionNotFound)
}
iter.Stop()

// Commit should fail with Session not found.
server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
SimulatedExecutionTime{
Errors: []error{errSessionNotFound},
})

ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}
if _, got := client.Apply(ctx, ms, ApplyAtLeastOnce()); !testEqual(wantErr, got) {
t.Fatalf("Expect Apply to fail, got %v, want %v.", got, wantErr)
if _, got := client.Apply(ctx, ms, ApplyAtLeastOnce()); !testEqual(errSessionNotFound, got) {
t.Fatalf("Expect Apply to fail\ngot: %v\nwant: %v", got, errSessionNotFound)
}
}

Expand Down

0 comments on commit 5d17c75

Please sign in to comment.