diff --git a/spanner/client.go b/spanner/client.go index f1d18f881f17..9b75de672410 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -287,8 +287,24 @@ func (c *Client) Close() { // "time-travel" to prior versions of the database, see the documentation of // TimestampBound for details. func (c *Client) Single() *ReadOnlyTransaction { - t := &ReadOnlyTransaction{singleUse: true, sp: c.idleSessions} + t := &ReadOnlyTransaction{singleUse: true} + t.txReadOnly.sp = c.idleSessions t.txReadOnly.txReadEnv = t + t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error { + if t.sh == nil { + return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction") + } + // Remove the session that returned 'Session not found' from the pool. + t.sh.destroy() + // Reset the transaction, acquire a new session and retry. + t.state = txNew + sh, _, err := t.acquire(ctx) + if err != nil { + return err + } + t.sh = sh + return nil + } return t } @@ -304,9 +320,9 @@ func (c *Client) Single() *ReadOnlyTransaction { func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction { t := &ReadOnlyTransaction{ singleUse: false, - sp: c.idleSessions, txReadyOrClosed: make(chan struct{}), } + t.txReadOnly.sp = c.idleSessions t.txReadOnly.txReadEnv = t return t } @@ -365,7 +381,6 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound tx: tx, txReadyOrClosed: make(chan struct{}), state: txActive, - sh: sh, rts: rts, }, ID: BatchReadOnlyTransactionID{ @@ -374,6 +389,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound rts: rts, }, } + t.txReadOnly.sh = sh t.txReadOnly.txReadEnv = t return t, nil } @@ -389,11 +405,11 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) tx: tid.tid, txReadyOrClosed: make(chan struct{}), state: txActive, - sh: sh, rts: tid.rts, }, ID: tid, } + t.txReadOnly.sh = sh t.txReadOnly.txReadEnv = t return t } @@ -448,14 +464,12 @@ func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Contex return err } t = &ReadWriteTransaction{ - sh: sh, tx: sh.getTransactionID(), } } else { - t = &ReadWriteTransaction{ - sh: sh, - } + t = &ReadWriteTransaction{} } + t.txReadOnly.sh = sh t.txReadOnly.txReadEnv = t trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())}, "Starting transaction attempt") diff --git a/spanner/client_test.go b/spanner/client_test.go index f04b79059d52..fd8ea0f33469 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -159,6 +159,34 @@ func TestClient_Single_InvalidArgument(t *testing.T) { } } +func TestClient_Single_SessionNotFound(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + server.TestSpanner.PutExecutionTime( + MethodExecuteStreamingSql, + SimulatedExecutionTime{Errors: []error{status.Errorf(codes.NotFound, "Session not found")}}, + ) + ctx := context.Background() + iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) + defer iter.Stop() + rowCount := int64(0) + for { + _, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + t.Fatal(err) + } + rowCount++ + } + if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount { + t.Fatalf("row count mismatch\nGot: %v\nWant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount) + } +} + func TestClient_Single_RetryableErrorOnPartialResultSet(t *testing.T) { t.Parallel() server, client, teardown := setupMockedTestServer(t) @@ -557,6 +585,23 @@ func TestClient_ReadOnlyTransaction_UnavailableOnExecuteStreamingSql(t *testing. } } +func TestClient_ReadOnlyTransaction_SessionNotFoundOnExecuteStreamingSql(t *testing.T) { + t.Parallel() + // Session not found is not retryable for a query on a multi-use read-only + // transaction, as we would need to start a new transaction on a new + // session. + err := testReadOnlyTransaction(t, map[string]SimulatedExecutionTime{ + MethodExecuteStreamingSql: {Errors: []error{status.Errorf(codes.NotFound, "Session not found")}}, + }) + want := spannerErrorf(codes.NotFound, "Session not found") + if err == nil { + t.Fatalf("missing expected error\nGot: nil\nWant: %v", want) + } + if status.Code(err) != status.Code(want) || !strings.Contains(err.Error(), want.Error()) { + t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, want) + } +} + func TestClient_ReadOnlyTransaction_UnavailableOnCreateSessionAndBeginTransaction(t *testing.T) { t.Parallel() exec := map[string]SimulatedExecutionTime{ diff --git a/spanner/read.go b/spanner/read.go index 8a7bddf0a08c..f0d16345565f 100644 --- a/spanner/read.go +++ b/spanner/read.go @@ -48,11 +48,38 @@ func errEarlyReadEnd() error { // stream is the internal fault tolerant method for streaming data from Cloud // Spanner. -func stream(ctx context.Context, logger *log.Logger, rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), setTimestamp func(time.Time), release func(error)) *RowIterator { +func stream( + ctx context.Context, + logger *log.Logger, + rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), + setTimestamp func(time.Time), + release func(error), +) *RowIterator { + return streamWithReplaceSessionFunc( + ctx, + logger, + rpc, + nil, + setTimestamp, + release, + ) +} + +// this stream method will automatically retry the stream on a new session if +// the replaceSessionFunc function has been defined. This function should only be +// used for single-use transactions. +func streamWithReplaceSessionFunc( + ctx context.Context, + logger *log.Logger, + rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), + replaceSession func(ctx context.Context) error, + setTimestamp func(time.Time), + release func(error), +) *RowIterator { ctx, cancel := context.WithCancel(ctx) ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.RowIterator") return &RowIterator{ - streamd: newResumableStreamDecoder(ctx, logger, rpc), + streamd: newResumableStreamDecoder(ctx, logger, rpc, replaceSession), rowd: &partialResultSetDecoder{}, setTimestamp: setTimestamp, release: release, @@ -295,6 +322,13 @@ type resumableStreamDecoder struct { // resumable. rpc func(ctx context.Context, restartToken []byte) (streamingReceiver, error) + // replaceSessionFunc is a function that can be used to replace the session + // that is being used to execute the read operation. This function should + // only be defined for single-use transactions that can safely retry the + // read operation on a new session. If this function is nil, the stream + // does not support retrying the query on a new session. + replaceSessionFunc func(ctx context.Context) error + // logger is the logger to use. logger *log.Logger @@ -333,11 +367,12 @@ type resumableStreamDecoder struct { // newResumableStreamDecoder creates a new resumeableStreamDecoder instance. // Parameter rpc should be a function that creates a new stream beginning at the // restartToken if non-nil. -func newResumableStreamDecoder(ctx context.Context, logger *log.Logger, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error)) *resumableStreamDecoder { +func newResumableStreamDecoder(ctx context.Context, logger *log.Logger, rpc func(ct context.Context, restartToken []byte) (streamingReceiver, error), replaceSession func(ctx context.Context) error) *resumableStreamDecoder { return &resumableStreamDecoder{ ctx: ctx, logger: logger, rpc: rpc, + replaceSessionFunc: replaceSession, maxBytesBetweenResumeTokens: atomic.LoadInt32(&maxBytesBetweenResumeTokens), backoff: DefaultRetryBackoff, } @@ -530,15 +565,26 @@ func (d *resumableStreamDecoder) tryRecv(retryer gax.Retryer) { d.changeState(finished) return } - delay, shouldRetry := retryer.Retry(d.err) - if !shouldRetry || d.state != queueingRetryable { - d.changeState(aborted) - return - } - if err := gax.Sleep(d.ctx, delay); err != nil { - d.err = err - d.changeState(aborted) - return + if d.replaceSessionFunc != nil && isSessionNotFoundError(d.err) && d.resumeToken == nil { + // A 'Session not found' error occurred before we received a resume + // token and a replaceSessionFunc function is defined. Try to restart + // the stream on a new session. + if err := d.replaceSessionFunc(d.ctx); err != nil { + d.err = err + d.changeState(aborted) + return + } + } else { + delay, shouldRetry := retryer.Retry(d.err) + if !shouldRetry || d.state != queueingRetryable { + d.changeState(aborted) + return + } + if err := gax.Sleep(d.ctx, delay); err != nil { + d.err = err + d.changeState(aborted) + return + } } // Clear error and retry the stream. d.err = nil diff --git a/spanner/read_test.go b/spanner/read_test.go index 26f72e0a885a..74c4122daeda 100644 --- a/spanner/read_test.go +++ b/spanner/read_test.go @@ -811,6 +811,7 @@ func TestRsdNonblockingStates(t *testing.T) { ctx, nil, test.rpc, + nil, ) st := []resumableStreamDecoderState{} var lastErr error @@ -1080,6 +1081,7 @@ func TestRsdBlockingStates(t *testing.T) { ctx, nil, test.rpc, + nil, ) // Override backoff to make the test run faster. r.backoff = gax.Backoff{ @@ -1220,6 +1222,7 @@ func TestQueueBytes(t *testing.T) { sr.rpcReceiver = r return sr, err }, + nil, ) go func() { for r.next() { diff --git a/spanner/transaction.go b/spanner/transaction.go index ec1002dd1c35..b02a220f8f7e 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -57,6 +57,19 @@ type txReadOnly struct { // Atomic. Only needed for DML statements, but used forall. sequenceNumber int64 + + // replaceSessionFunc is a function that can be called to replace the + // session that is used by the transaction. This function should only be + // defined for single-use transactions that can safely be retried on a + // different session. All other transactions will set this function to nil. + replaceSessionFunc func(ctx context.Context) error + + // sp is the session pool for allocating a session to execute the read-only + // transaction. It is set only once during initialization of the + // txReadOnly. + sp *sessionPool + // sh is the sessionHandle allocated from sp. + sh *sessionHandle } // errSessionClosed returns error for using a recycled/destroyed session @@ -247,13 +260,15 @@ func (t *txReadOnly) query(ctx context.Context, statement Statement, mode sppb.E return &RowIterator{err: err} } client := sh.getClient() - return stream( + return streamWithReplaceSessionFunc( contextWithOutgoingMetadata(ctx, sh.getMetadata()), sh.session.logger, func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { req.ResumeToken = resumeToken + req.Session = t.sh.getID() return client.ExecuteStreamingSql(ctx, req) }, + t.replaceSessionFunc, t.setTimestamp, t.release) } @@ -338,10 +353,6 @@ type ReadOnlyTransaction struct { txReadOnly // singleUse indicates that the transaction can be used for only one read. singleUse bool - // sp is the session pool for allocating a session to execute the read-only - // transaction. It is set only once during initialization of the - // ReadOnlyTransaction. - sp *sessionPool // tx is the transaction ID in Cloud Spanner that uniquely identifies the // ReadOnlyTransaction. tx transactionID @@ -350,8 +361,6 @@ type ReadOnlyTransaction struct { txReadyOrClosed chan struct{} // state is the current transaction status of the ReadOnly transaction. state txState - // sh is the sessionHandle allocated from sp. - sh *sessionHandle // rts is the read timestamp returned by transactional reads. rts time.Time // tb is the read staleness bound specification for transactional reads. @@ -705,9 +714,6 @@ func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTra type ReadWriteTransaction struct { // txReadOnly contains methods for performing transactional reads. txReadOnly - // sh is the sessionHandle allocated from sp. It is set only once during the - // initialization of ReadWriteTransaction. - sh *sessionHandle // tx is the transaction ID in Cloud Spanner that uniquely identifies the // ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin() // during the initialization of ReadWriteTransaction.