Skip to content

Commit

Permalink
spanner: retry single use transactions on SessionNotFound
Browse files Browse the repository at this point in the history
Single use read-only transactions should be retried if the query returns
a 'Session not found' error. These queries can safely be retried on a
different session as there is no transaction atomicity that can be
violated. The retry must be executed by the partial result sets stream,
as the query is sent to the server at the first call to
RowIterator.Next.

Updates #1527.

Change-Id: Ia3c77643e42adb2f88dba2cfd5d1bba7f9dbf3be
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/50511
Reviewed-by: Hengfeng Li <[email protected]>
  • Loading branch information
olavloite committed Jan 21, 2020
1 parent 4a4cd86 commit 7a18dc1
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 30 deletions.
30 changes: 22 additions & 8 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,24 @@ func (c *Client) Close() {
// "time-travel" to prior versions of the database, see the documentation of
// TimestampBound for details.
func (c *Client) Single() *ReadOnlyTransaction {
t := &ReadOnlyTransaction{singleUse: true, sp: c.idleSessions}
t := &ReadOnlyTransaction{singleUse: true}
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error {
if t.sh == nil {
return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction")
}
// Remove the session that returned 'Session not found' from the pool.
t.sh.destroy()
// Reset the transaction, acquire a new session and retry.
t.state = txNew
sh, _, err := t.acquire(ctx)
if err != nil {
return err
}
t.sh = sh
return nil
}
return t
}

Expand All @@ -304,9 +320,9 @@ func (c *Client) Single() *ReadOnlyTransaction {
func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
t := &ReadOnlyTransaction{
singleUse: false,
sp: c.idleSessions,
txReadyOrClosed: make(chan struct{}),
}
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
return t
}
Expand Down Expand Up @@ -365,7 +381,6 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
tx: tx,
txReadyOrClosed: make(chan struct{}),
state: txActive,
sh: sh,
rts: rts,
},
ID: BatchReadOnlyTransactionID{
Expand All @@ -374,6 +389,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
rts: rts,
},
}
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
return t, nil
}
Expand All @@ -389,11 +405,11 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
tx: tid.tid,
txReadyOrClosed: make(chan struct{}),
state: txActive,
sh: sh,
rts: tid.rts,
},
ID: tid,
}
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
return t
}
Expand Down Expand Up @@ -448,14 +464,12 @@ func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Contex
return err
}
t = &ReadWriteTransaction{
sh: sh,
tx: sh.getTransactionID(),
}
} else {
t = &ReadWriteTransaction{
sh: sh,
}
t = &ReadWriteTransaction{}
}
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
"Starting transaction attempt")
Expand Down
45 changes: 45 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,34 @@ func TestClient_Single_InvalidArgument(t *testing.T) {
}
}

func TestClient_Single_SessionNotFound(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteStreamingSql,
SimulatedExecutionTime{Errors: []error{status.Errorf(codes.NotFound, "Session not found")}},
)
ctx := context.Background()
iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
rowCount := int64(0)
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
t.Fatal(err)
}
rowCount++
}
if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
t.Fatalf("row count mismatch\nGot: %v\nWant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
}
}

func TestClient_Single_RetryableErrorOnPartialResultSet(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
Expand Down Expand Up @@ -557,6 +585,23 @@ func TestClient_ReadOnlyTransaction_UnavailableOnExecuteStreamingSql(t *testing.
}
}

func TestClient_ReadOnlyTransaction_SessionNotFoundOnExecuteStreamingSql(t *testing.T) {
t.Parallel()
// Session not found is not retryable for a query on a multi-use read-only
// transaction, as we would need to start a new transaction on a new
// session.
err := testReadOnlyTransaction(t, map[string]SimulatedExecutionTime{
MethodExecuteStreamingSql: {Errors: []error{status.Errorf(codes.NotFound, "Session not found")}},
})
want := spannerErrorf(codes.NotFound, "Session not found")
if err == nil {
t.Fatalf("missing expected error\nGot: nil\nWant: %v", want)
}
if status.Code(err) != status.Code(want) || !strings.Contains(err.Error(), want.Error()) {
t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, want)
}
}

func TestClient_ReadOnlyTransaction_UnavailableOnCreateSessionAndBeginTransaction(t *testing.T) {
t.Parallel()
exec := map[string]SimulatedExecutionTime{
Expand Down
70 changes: 58 additions & 12 deletions spanner/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,38 @@ func errEarlyReadEnd() error {

// stream is the internal fault tolerant method for streaming data from Cloud
// Spanner.
func stream(ctx context.Context, logger *log.Logger, rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), setTimestamp func(time.Time), release func(error)) *RowIterator {
func stream(
ctx context.Context,
logger *log.Logger,
rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error),
setTimestamp func(time.Time),
release func(error),
) *RowIterator {
return streamWithReplaceSessionFunc(
ctx,
logger,
rpc,
nil,
setTimestamp,
release,
)
}

// this stream method will automatically retry the stream on a new session if
// the replaceSessionFunc function has been defined. This function should only be
// used for single-use transactions.
func streamWithReplaceSessionFunc(
ctx context.Context,
logger *log.Logger,
rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error),
replaceSession func(ctx context.Context) error,
setTimestamp func(time.Time),
release func(error),
) *RowIterator {
ctx, cancel := context.WithCancel(ctx)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.RowIterator")
return &RowIterator{
streamd: newResumableStreamDecoder(ctx, logger, rpc),
streamd: newResumableStreamDecoder(ctx, logger, rpc, replaceSession),
rowd: &partialResultSetDecoder{},
setTimestamp: setTimestamp,
release: release,
Expand Down Expand Up @@ -295,6 +322,13 @@ type resumableStreamDecoder struct {
// resumable.
rpc func(ctx context.Context, restartToken []byte) (streamingReceiver, error)

// replaceSessionFunc is a function that can be used to replace the session
// that is being used to execute the read operation. This function should
// only be defined for single-use transactions that can safely retry the
// read operation on a new session. If this function is nil, the stream
// does not support retrying the query on a new session.
replaceSessionFunc func(ctx context.Context) error

// logger is the logger to use.
logger *log.Logger

Expand Down Expand Up @@ -333,11 +367,12 @@ type resumableStreamDecoder struct {
// newResumableStreamDecoder creates a new resumeableStreamDecoder instance.
// Parameter rpc should be a function that creates a new stream beginning at the
// restartToken if non-nil.
func newResumableStreamDecoder(ctx context.Context, logger *log.Logger, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error)) *resumableStreamDecoder {
func newResumableStreamDecoder(ctx context.Context, logger *log.Logger, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error), replaceSession func(ctx context.Context) error) *resumableStreamDecoder {
return &resumableStreamDecoder{
ctx: ctx,
logger: logger,
rpc: rpc,
replaceSessionFunc: replaceSession,
maxBytesBetweenResumeTokens: atomic.LoadInt32(&maxBytesBetweenResumeTokens),
backoff: DefaultRetryBackoff,
}
Expand Down Expand Up @@ -530,15 +565,26 @@ func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) {
d.changeState(finished)
return
}
delay, shouldRetry := retryer.Retry(d.err)
if !shouldRetry || d.state != queueingRetryable {
d.changeState(aborted)
return
}
if err := gax.Sleep(d.ctx, delay); err != nil {
d.err = err
d.changeState(aborted)
return
if d.replaceSessionFunc != nil && isSessionNotFoundError(d.err) && d.resumeToken == nil {
// A 'Session not found' error occurred before we received a resume
// token and a replaceSessionFunc function is defined. Try to restart
// the stream on a new session.
if err := d.replaceSessionFunc(d.ctx); err != nil {
d.err = err
d.changeState(aborted)
return
}
} else {
delay, shouldRetry := retryer.Retry(d.err)
if !shouldRetry || d.state != queueingRetryable {
d.changeState(aborted)
return
}
if err := gax.Sleep(d.ctx, delay); err != nil {
d.err = err
d.changeState(aborted)
return
}
}
// Clear error and retry the stream.
d.err = nil
Expand Down
3 changes: 3 additions & 0 deletions spanner/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,7 @@ func TestRsdNonblockingStates(t *testing.T) {
ctx,
nil,
test.rpc,
nil,
)
st := []resumableStreamDecoderState{}
var lastErr error
Expand Down Expand Up @@ -1080,6 +1081,7 @@ func TestRsdBlockingStates(t *testing.T) {
ctx,
nil,
test.rpc,
nil,
)
// Override backoff to make the test run faster.
r.backoff = gax.Backoff{
Expand Down Expand Up @@ -1220,6 +1222,7 @@ func TestQueueBytes(t *testing.T) {
sr.rpcReceiver = r
return sr, err
},
nil,
)
go func() {
for r.next() {
Expand Down
26 changes: 16 additions & 10 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ type txReadOnly struct {

// Atomic. Only needed for DML statements, but used forall.
sequenceNumber int64

// replaceSessionFunc is a function that can be called to replace the
// session that is used by the transaction. This function should only be
// defined for single-use transactions that can safely be retried on a
// different session. All other transactions will set this function to nil.
replaceSessionFunc func(ctx context.Context) error

// sp is the session pool for allocating a session to execute the read-only
// transaction. It is set only once during initialization of the
// txReadOnly.
sp *sessionPool
// sh is the sessionHandle allocated from sp.
sh *sessionHandle
}

// errSessionClosed returns error for using a recycled/destroyed session
Expand Down Expand Up @@ -247,13 +260,15 @@ func (t *txReadOnly) query(ctx context.Context, statement Statement, mode sppb.E
return &RowIterator{err: err}
}
client := sh.getClient()
return stream(
return streamWithReplaceSessionFunc(
contextWithOutgoingMetadata(ctx, sh.getMetadata()),
sh.session.logger,
func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
req.ResumeToken = resumeToken
req.Session = t.sh.getID()
return client.ExecuteStreamingSql(ctx, req)
},
t.replaceSessionFunc,
t.setTimestamp,
t.release)
}
Expand Down Expand Up @@ -338,10 +353,6 @@ type ReadOnlyTransaction struct {
txReadOnly
// singleUse indicates that the transaction can be used for only one read.
singleUse bool
// sp is the session pool for allocating a session to execute the read-only
// transaction. It is set only once during initialization of the
// ReadOnlyTransaction.
sp *sessionPool
// tx is the transaction ID in Cloud Spanner that uniquely identifies the
// ReadOnlyTransaction.
tx transactionID
Expand All @@ -350,8 +361,6 @@ type ReadOnlyTransaction struct {
txReadyOrClosed chan struct{}
// state is the current transaction status of the ReadOnly transaction.
state txState
// sh is the sessionHandle allocated from sp.
sh *sessionHandle
// rts is the read timestamp returned by transactional reads.
rts time.Time
// tb is the read staleness bound specification for transactional reads.
Expand Down Expand Up @@ -705,9 +714,6 @@ func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTra
type ReadWriteTransaction struct {
// txReadOnly contains methods for performing transactional reads.
txReadOnly
// sh is the sessionHandle allocated from sp. It is set only once during the
// initialization of ReadWriteTransaction.
sh *sessionHandle
// tx is the transaction ID in Cloud Spanner that uniquely identifies the
// ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin()
// during the initialization of ReadWriteTransaction.
Expand Down

0 comments on commit 7a18dc1

Please sign in to comment.