From f692dbf4dfc2cf076383bc5a0a1ba205f7b39ee5 Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Thu, 5 Dec 2024 10:50:55 +0530 Subject: [PATCH 1/5] chore(spanner): add support for multiplexed session with read write transactions. --- spanner/client.go | 69 ++++++++++------- .../internal/testutil/inmem_spanner_server.go | 47 +++++++++++- spanner/kokoro/presubmit.sh | 1 + spanner/read.go | 47 +++++++----- spanner/session.go | 7 +- spanner/transaction.go | 27 ++++++- spanner/transaction_test.go | 74 +++++++++++++++++++ 7 files changed, 220 insertions(+), 52 deletions(-) diff --git a/spanner/client.go b/spanner/client.go index f95740bed97d..bd5e2d7f9912 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -107,19 +107,20 @@ func parseDatabaseName(db string) (project, instance, database string, err error // Client is a client for reading and writing data to a Cloud Spanner database. // A client is safe to use concurrently, except for its Close method. type Client struct { - sc *sessionClient - idleSessions *sessionPool - logger *log.Logger - qo QueryOptions - ro ReadOptions - ao []ApplyOption - txo TransactionOptions - bwo BatchWriteOptions - ct *commonTags - disableRouteToLeader bool - dro *sppb.DirectedReadOptions - otConfig *openTelemetryConfig - metricsTracerFactory *builtinMetricsTracerFactory + sc *sessionClient + idleSessions *sessionPool + logger *log.Logger + qo QueryOptions + ro ReadOptions + ao []ApplyOption + txo TransactionOptions + bwo BatchWriteOptions + ct *commonTags + disableRouteToLeader bool + enableMultiplexSessionForRW bool + dro *sppb.DirectedReadOptions + otConfig *openTelemetryConfig + metricsTracerFactory *builtinMetricsTracerFactory } // DatabaseName returns the full name of a database, e.g., @@ -478,6 +479,13 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf if config.EnableEndToEndTracing || endToEndTracingEnvironmentVariable == "true" { md.Append(endToEndTracingHeader, "true") } + //TODO: Uncomment this once the feature is enabled. + //if isMultiplexForRW := os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW"); isMultiplexForRW != "" { + // config.enableMultiplexSessionForRW, err = strconv.ParseBool(isMultiplexForRW) + // if err != nil { + // return nil, spannerErrorf(codes.InvalidArgument, "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW must be either true or false") + // } + //} // Create a session client. sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, md, config.BatchTimeout, config.Logger, config.CallOptions) @@ -524,19 +532,20 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf } c = &Client{ - sc: sc, - idleSessions: sp, - logger: config.Logger, - qo: getQueryOptions(config.QueryOptions), - ro: config.ReadOptions, - ao: config.ApplyOptions, - txo: config.TransactionOptions, - bwo: config.BatchWriteOptions, - ct: getCommonTags(sc), - disableRouteToLeader: config.DisableRouteToLeader, - dro: config.DirectedReadOptions, - otConfig: otConfig, - metricsTracerFactory: metricsTracerFactory, + sc: sc, + idleSessions: sp, + logger: config.Logger, + qo: getQueryOptions(config.QueryOptions), + ro: config.ReadOptions, + ao: config.ApplyOptions, + txo: config.TransactionOptions, + bwo: config.BatchWriteOptions, + ct: getCommonTags(sc), + disableRouteToLeader: config.DisableRouteToLeader, + dro: config.DirectedReadOptions, + otConfig: otConfig, + metricsTracerFactory: metricsTracerFactory, + enableMultiplexSessionForRW: config.enableMultiplexSessionForRW, } return c, nil } @@ -1000,8 +1009,12 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea err error ) if sh == nil || sh.getID() == "" || sh.getClient() == nil { - // Session handle hasn't been allocated or has been destroyed. - sh, err = c.idleSessions.take(ctx) + if c.enableMultiplexSessionForRW { + sh, err = c.idleSessions.takeMultiplexed(ctx) + } else { + // Session handle hasn't been allocated or has been destroyed. + sh, err = c.idleSessions.take(ctx) + } if err != nil { // If session retrieval fails, just fail the transaction. return err diff --git a/spanner/internal/testutil/inmem_spanner_server.go b/spanner/internal/testutil/inmem_spanner_server.go index ae73b82230a1..19126b1a90c3 100644 --- a/spanner/internal/testutil/inmem_spanner_server.go +++ b/spanner/internal/testutil/inmem_spanner_server.go @@ -25,6 +25,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "cloud.google.com/go/spanner/apiv1/spannerpb" @@ -332,7 +333,8 @@ type inMemSpannerServer struct { // counters. transactionCounters map[string]*uint64 // The transactions that have been created on this mock server. - transactions map[string]*spannerpb.Transaction + transactions map[string]*spannerpb.Transaction + multiplexedSessionTransactions map[string]*Transaction // The transactions that have been (manually) aborted on the server. abortedTransactions map[string]bool // The transactions that are marked as PartitionedDMLTransaction @@ -358,6 +360,18 @@ type inMemSpannerServer struct { freezed chan struct{} } +type Transaction struct { + sequence *atomic.Int32 + transaction *spannerpb.Transaction +} + +func (t *Transaction) getPreCommitToken(operation string) *spannerpb.MultiplexedSessionPrecommitToken { + return &spannerpb.MultiplexedSessionPrecommitToken{ + SeqNum: t.sequence.Add(1), + PrecommitToken: []byte(fmt.Sprintf("precommit-token-%v-%v", operation, t.sequence.Load())), + } +} + // NewInMemSpannerServer creates a new in-mem test server. func NewInMemSpannerServer() InMemSpannerServer { res := &inMemSpannerServer{} @@ -520,6 +534,7 @@ func (s *inMemSpannerServer) initDefaults() { s.sessions = make(map[string]*spannerpb.Session) s.sessionLastUseTime = make(map[string]time.Time) s.transactions = make(map[string]*spannerpb.Transaction) + s.multiplexedSessionTransactions = make(map[string]*Transaction) s.abortedTransactions = make(map[string]bool) s.partitionedDmlTransactions = make(map[string]bool) s.transactionCounters = make(map[string]*uint64) @@ -596,6 +611,9 @@ func (s *inMemSpannerServer) beginTransaction(session *spannerpb.Session, option ReadTimestamp: getCurrentTimestamp(), } s.mu.Lock() + if options.GetReadWrite() != nil && session.Multiplexed { + s.multiplexedSessionTransactions[id] = &Transaction{transaction: res, sequence: new(atomic.Int32)} + } s.transactions[id] = res s.partitionedDmlTransactions[id] = options.GetPartitionedDml() != nil s.mu.Unlock() @@ -633,6 +651,7 @@ func (s *inMemSpannerServer) removeTransaction(tx *spannerpb.Transaction) { s.mu.Lock() defer s.mu.Unlock() delete(s.transactions, string(tx.Id)) + delete(s.multiplexedSessionTransactions, string(tx.Id)) delete(s.partitionedDmlTransactions, string(tx.Id)) } @@ -869,9 +888,21 @@ func (s *inMemSpannerServer) ExecuteSql(ctx context.Context, req *spannerpb.Exec case StatementResultError: return nil, statementResult.Err case StatementResultResultSet: + // if request's session is multiplexed and transaction is Read/Write then add Pre-commit Token in Metadata + if statementResult.ResultSet != nil { + statementResult.ResultSet.PrecommitToken = s.multiplexedSessionTransactions[string(id)].getPreCommitToken("ResultSetPrecommitToken") + } return statementResult.ResultSet, nil case StatementResultUpdateCount: - return statementResult.convertUpdateCountToResultSet(!isPartitionedDml), nil + res := statementResult.convertUpdateCountToResultSet(!isPartitionedDml) + // if request's session is multiplexed and transaction is Read/Write then add Pre-commit Token in Metadata + s.mu.Lock() + txn, ok := s.multiplexedSessionTransactions[string(id)] + s.mu.Unlock() + if ok { + res.PrecommitToken = txn.getPreCommitToken("ResultSetPrecommitToken") + } + return res, nil } return nil, gstatus.Error(codes.Internal, "Unknown result type") } @@ -937,6 +968,12 @@ func (s *inMemSpannerServer) executeStreamingSQL(req *spannerpb.ExecuteSqlReques return nextPartialResultSetError.Err } } + s.mu.Lock() + txn, ok := s.multiplexedSessionTransactions[string(id)] + s.mu.Unlock() + if ok { + part.PrecommitToken = txn.getPreCommitToken("PartialResultSetPrecommitToken") + } if err := stream.Send(part); err != nil { return err } @@ -996,6 +1033,12 @@ func (s *inMemSpannerServer) ExecuteBatchDml(ctx context.Context, req *spannerpb resp.ResultSets[idx] = statementResult.convertUpdateCountToResultSet(!isPartitionedDml) } } + s.mu.Lock() + txn, ok := s.multiplexedSessionTransactions[string(id)] + s.mu.Unlock() + if ok { + resp.PrecommitToken = txn.getPreCommitToken("ExecuteBatchDmlResponsePrecommitToken") + } return resp, nil } diff --git a/spanner/kokoro/presubmit.sh b/spanner/kokoro/presubmit.sh index f92440377a06..f5ac7c1b389a 100755 --- a/spanner/kokoro/presubmit.sh +++ b/spanner/kokoro/presubmit.sh @@ -46,6 +46,7 @@ exit_code=0 case $JOB_TYPE in integration-with-multiplexed-session ) GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS=true + GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW=true echo "running presubmit with multiplexed sessions enabled: $GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS" ;; esac diff --git a/spanner/read.go b/spanner/read.go index eefd44b4843a..d87c29efcee3 100644 --- a/spanner/read.go +++ b/spanner/read.go @@ -67,6 +67,7 @@ func stream( func(err error) error { return err }, + nil, setTimestamp, release, ) @@ -83,20 +84,22 @@ func streamWithReplaceSessionFunc( replaceSession func(ctx context.Context) error, setTransactionID func(transactionID), updateTxState func(err error) error, + updatePrecommitToken func(token *sppb.MultiplexedSessionPrecommitToken), setTimestamp func(time.Time), release func(error), ) *RowIterator { ctx, cancel := context.WithCancel(ctx) ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.RowIterator") return &RowIterator{ - meterTracerFactory: meterTracerFactory, - streamd: newResumableStreamDecoder(ctx, logger, rpc, replaceSession), - rowd: &partialResultSetDecoder{}, - setTransactionID: setTransactionID, - updateTxState: updateTxState, - setTimestamp: setTimestamp, - release: release, - cancel: cancel, + meterTracerFactory: meterTracerFactory, + streamd: newResumableStreamDecoder(ctx, logger, rpc, replaceSession), + rowd: &partialResultSetDecoder{}, + setTransactionID: setTransactionID, + updatePrecommitToken: updatePrecommitToken, + updateTxState: updateTxState, + setTimestamp: setTimestamp, + release: release, + cancel: cancel, } } @@ -127,18 +130,19 @@ type RowIterator struct { // RowIterator.Next() returned an error that is not equal to iterator.Done. Metadata *sppb.ResultSetMetadata - ctx context.Context - meterTracerFactory *builtinMetricsTracerFactory - streamd *resumableStreamDecoder - rowd *partialResultSetDecoder - setTransactionID func(transactionID) - updateTxState func(err error) error - setTimestamp func(time.Time) - release func(error) - cancel func() - err error - rows []*Row - sawStats bool + ctx context.Context + meterTracerFactory *builtinMetricsTracerFactory + streamd *resumableStreamDecoder + rowd *partialResultSetDecoder + setTransactionID func(transactionID) + updateTxState func(err error) error + updatePrecommitToken func(token *sppb.MultiplexedSessionPrecommitToken) + setTimestamp func(time.Time) + release func(error) + cancel func() + err error + rows []*Row + sawStats bool } // this is for safety from future changes to RowIterator making sure that it implements rowIterator interface. @@ -189,6 +193,9 @@ func (r *RowIterator) Next() (*Row, error) { } r.setTransactionID = nil } + if r.updatePrecommitToken != nil { + r.updatePrecommitToken(prs.GetPrecommitToken()) + } if prs.Stats != nil { r.sawStats = true r.QueryPlan = prs.Stats.QueryPlan diff --git a/spanner/session.go b/spanner/session.go index e07f67dedf3e..7f17f515c859 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -507,6 +507,9 @@ type SessionPoolConfig struct { // Defaults to false. TrackSessionHandles bool + // enableMultiplexSessionForRW is a flag to enable multiplexed session for read/write transactions. + enableMultiplexSessionForRW bool + // healthCheckSampleInterval is how often the health checker samples live // session (for use in maintaining session pool size). // @@ -703,6 +706,7 @@ func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, if isMultiplexed != "" && isMultiplexed != "true" && isMultiplexed != "false" { return nil, spannerErrorf(codes.InvalidArgument, "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS must be either true or false") } + pool := &sessionPool{ sc: sc, valid: true, @@ -713,7 +717,7 @@ func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, mw: newMaintenanceWindow(config.MaxOpened), rand: rand.New(rand.NewSource(time.Now().UnixNano())), otConfig: sc.otConfig, - enableMultiplexSession: isMultiplexed == "true", + enableMultiplexSession: isMultiplexed == "true" || config.enableMultiplexSessionForRW, } _, instance, database, err := parseDatabaseName(sc.database) @@ -1291,6 +1295,7 @@ func (p *sessionPool) takeMultiplexed(ctx context.Context) (*sessionHandle, erro if isUnimplementedError(err) { logf(p.sc.logger, "Multiplexed session is not enabled on this project, continuing with regular sessions") p.enableMultiplexSession = false + p.enableMultiplexSessionForRW = false } else { p.mu.Unlock() // If the error is a timeout, there is a chance that the session was diff --git a/spanner/transaction.go b/spanner/transaction.go index a33d03628685..1ffe0e64e0ed 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -50,6 +50,8 @@ type txReadEnv interface { getTransactionSelector() *sppb.TransactionSelector // sets the transactionID setTransactionID(id transactionID) + // updatePrecommitToken updates the precommit token for the transaction + updatePrecommitToken(token *sppb.MultiplexedSessionPrecommitToken) // sets the transaction's read timestamp setTimestamp(time.Time) // release should be called at the end of every transactional read to deal @@ -355,6 +357,7 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key func(err error) error { return t.updateTxState(err) }, + t.updatePrecommitToken, t.setTimestamp, t.release, ) @@ -642,6 +645,7 @@ func (t *txReadOnly) query(ctx context.Context, statement Statement, options Que func(err error) error { return t.updateTxState(err) }, + t.updatePrecommitToken, t.setTimestamp, t.release) } @@ -869,6 +873,11 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error { return err } +// no-op for ReadOnlyTransaction. +func (t *ReadOnlyTransaction) updatePrecommitToken(token *sppb.MultiplexedSessionPrecommitToken) { + return +} + // acquire implements txReadEnv.acquire. func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { if err := checkNestedTxn(ctx); err != nil { @@ -1153,7 +1162,9 @@ type ReadWriteTransaction struct { // tx is the transaction ID in Cloud Spanner that uniquely identifies the // ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin() // during the initialization of ReadWriteTransaction. - tx transactionID + tx transactionID + precommitToken *sppb.MultiplexedSessionPrecommitToken + // txReadyOrClosed is for broadcasting that transaction ID has been returned // by Cloud Spanner or that transaction is closed. txReadyOrClosed chan struct{} @@ -1250,6 +1261,7 @@ func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts return 0, errInlineBeginTransactionFailed() } } + t.updatePrecommitToken(resultSet.GetPrecommitToken()) if resultSet.Stats == nil { return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL) } @@ -1369,6 +1381,7 @@ func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts t.setTransactionID(nil) return counts, errInlineBeginTransactionFailed() } + t.updatePrecommitToken(resp.PrecommitToken) if resp.Status != nil && resp.Status.Code != 0 { return counts, t.txReadOnly.updateTxState(spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message)) } @@ -1484,6 +1497,17 @@ func (t *ReadWriteTransaction) setTransactionID(tx transactionID) { t.txReadyOrClosed = make(chan struct{}) } +func (t *ReadWriteTransaction) updatePrecommitToken(token *sppb.MultiplexedSessionPrecommitToken) { + if token == nil { + return + } + t.mu.Lock() + defer t.mu.Unlock() + if t.precommitToken == nil || token.SeqNum > t.precommitToken.SeqNum { + t.precommitToken = token + } +} + // release implements txReadEnv.release. func (t *ReadWriteTransaction) release(err error) { t.mu.Lock() @@ -1674,6 +1698,7 @@ func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions Transaction: &sppb.CommitRequest_TransactionId{ TransactionId: t.tx, }, + PrecommitToken: t.precommitToken, RequestOptions: createRequestOptions(t.txOpts.CommitPriority, "", t.txOpts.TransactionTag), Mutations: mPb, ReturnCommitStats: options.ReturnCommitStats, diff --git a/spanner/transaction_test.go b/spanner/transaction_test.go index 3c12ac7eca3c..68067c1f5069 100644 --- a/spanner/transaction_test.go +++ b/spanner/transaction_test.go @@ -302,6 +302,80 @@ func TestReadWriteTransaction_ErrorReturned(t *testing.T) { } } +func TestReadWriteTransaction_PrecommitToken(t *testing.T) { + t.Parallel() + ctx := context.Background() + server, client, teardown := setupMockedTestServer(t) + defer teardown() + client.enableMultiplexSessionForRW = true + + type testCase struct { + name string + query bool + update bool + batchUpdate bool + expectedPrecommitToken string + expectedSequenceNumber int32 + } + + testCases := []testCase{ + {"Only Query", true, false, false, "PartialResultSetPrecommitToken", 3}, //since mock server is returning 3 rows + {"Query and Update", true, true, false, "ResultSetPrecommitToken", 4}, + {"Query, Update, and Batch Update", true, true, true, "ExecuteBatchDmlResponsePrecommitToken", 5}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + if tc.query { + iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) + defer iter.Stop() + for { + _, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } + } + } + + if tc.update { + if _, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}); err != nil { + return err + } + } + + if tc.batchUpdate { + if _, err := tx.BatchUpdate(ctx, []Statement{{SQL: UpdateBarSetFoo}}); err != nil { + return err + } + } + + return nil + }) + if err != nil { + t.Fatalf("%s failed: %v", tc.name, err) + } + + requests := drainRequestsFromServer(server.TestSpanner) + commitReq := requests[len(requests)-1].(*sppb.CommitRequest) + if commitReq.PrecommitToken == nil || len(commitReq.PrecommitToken.GetPrecommitToken()) == 0 { + t.Fatalf("Expected commit request to contain a valid precommitToken, got: %v", commitReq.PrecommitToken) + } + // Validate that the precommit token contains the test argument. + if !strings.Contains(string(commitReq.PrecommitToken.GetPrecommitToken()), tc.expectedPrecommitToken) { + t.Fatalf("Precommit token does not contain the expected test argument") + } + // Validate that the sequence number is as expected. + if got, want := commitReq.PrecommitToken.GetSeqNum(), tc.expectedSequenceNumber; got != want { + t.Fatalf("Precommit token sequence number mismatch: got %d, want %d", got, want) + } + }) + } +} + func TestBatchDML_WithMultipleDML(t *testing.T) { t.Parallel() ctx := context.Background() From 8aaea61adb88f78ed4025e716a9a2ccf4bb1d8e5 Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Thu, 19 Dec 2024 13:28:37 +0530 Subject: [PATCH 2/5] fix tests --- spanner/transaction_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/spanner/transaction_test.go b/spanner/transaction_test.go index 684864784418..010b5cb40173 100644 --- a/spanner/transaction_test.go +++ b/spanner/transaction_test.go @@ -366,7 +366,13 @@ func TestReadWriteTransaction_PrecommitToken(t *testing.T) { } requests := drainRequestsFromServer(server.TestSpanner) - commitReq := requests[len(requests)-1].(*sppb.CommitRequest) + var commitReq *sppb.CommitRequest + for _, req := range requests { + if c, ok := req.(*sppb.CommitRequest); ok { + commitReq = c + break + } + } if commitReq.PrecommitToken == nil || len(commitReq.PrecommitToken.GetPrecommitToken()) == 0 { t.Fatalf("Expected commit request to contain a valid precommitToken, got: %v", commitReq.PrecommitToken) } From 9380c32555c91c4bf46e5c5f79a951b86c83b8da Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Fri, 20 Dec 2024 15:34:58 +0530 Subject: [PATCH 3/5] incorporate changes --- spanner/client.go | 11 +++ .../internal/testutil/inmem_spanner_server.go | 44 ++++++--- spanner/session.go | 12 +-- spanner/transaction_test.go | 94 +++++++++++++++++++ 4 files changed, 138 insertions(+), 23 deletions(-) diff --git a/spanner/client.go b/spanner/client.go index a8e157c7bdb1..557cd508f898 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -487,12 +487,20 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf if config.EnableEndToEndTracing || endToEndTracingEnvironmentVariable == "true" { md.Append(endToEndTracingHeader, "true") } + + if isMultiplexed := strings.ToLower(os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS")); isMultiplexed != "" { + config.SessionPoolConfig.enableMultiplexSession, err = strconv.ParseBool(isMultiplexed) + if err != nil { + return nil, spannerErrorf(codes.InvalidArgument, "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS must be either true or false") + } + } //TODO: Uncomment this once the feature is enabled. //if isMultiplexForRW := os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW"); isMultiplexForRW != "" { // config.enableMultiplexSessionForRW, err = strconv.ParseBool(isMultiplexForRW) // if err != nil { // return nil, spannerErrorf(codes.InvalidArgument, "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW must be either true or false") // } + // config.enableMultiplexSessionForRW = config.enableMultiplexSessionForRW && config.SessionPoolConfig.enableMultiplexSession //} // Create a session client. @@ -1063,6 +1071,9 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea resp, err = t.runInTransaction(ctx, f) return err }) + if isUnimplementedError(err) { + c.enableMultiplexSessionForRW = false + } return resp, err } diff --git a/spanner/internal/testutil/inmem_spanner_server.go b/spanner/internal/testutil/inmem_spanner_server.go index c3b382d5a703..1155efa52ff9 100644 --- a/spanner/internal/testutil/inmem_spanner_server.go +++ b/spanner/internal/testutil/inmem_spanner_server.go @@ -335,7 +335,7 @@ type inMemSpannerServer struct { transactionCounters map[string]*uint64 // The transactions that have been created on this mock server. transactions map[string]*spannerpb.Transaction - multiplexedSessionTransactions map[string]*Transaction + multiplexedSessionTransactions map[string]*atomic.Int32 // The transactions that have been (manually) aborted on the server. abortedTransactions map[string]bool // The transactions that are marked as PartitionedDMLTransaction @@ -537,7 +537,7 @@ func (s *inMemSpannerServer) initDefaults() { s.sessions = make(map[string]*spannerpb.Session) s.sessionLastUseTime = make(map[string]time.Time) s.transactions = make(map[string]*spannerpb.Transaction) - s.multiplexedSessionTransactions = make(map[string]*Transaction) + s.multiplexedSessionTransactions = make(map[string]*atomic.Int32) s.abortedTransactions = make(map[string]bool) s.partitionedDmlTransactions = make(map[string]bool) s.transactionCounters = make(map[string]*uint64) @@ -615,7 +615,7 @@ func (s *inMemSpannerServer) beginTransaction(session *spannerpb.Session, option } s.mu.Lock() if options.GetReadWrite() != nil && session.Multiplexed { - s.multiplexedSessionTransactions[id] = &Transaction{transaction: res, sequence: new(atomic.Int32)} + s.multiplexedSessionTransactions[id] = new(atomic.Int32) } s.transactions[id] = res s.partitionedDmlTransactions[id] = options.GetPartitionedDml() != nil @@ -895,22 +895,28 @@ func (s *inMemSpannerServer) ExecuteSql(ctx context.Context, req *spannerpb.Exec // if request's session is multiplexed and transaction is Read/Write then add Pre-commit Token in Metadata if statementResult.ResultSet != nil { s.mu.Lock() - txn, ok := s.multiplexedSessionTransactions[string(id)] - s.mu.Unlock() + sequence, ok := s.multiplexedSessionTransactions[string(id)] if ok { - statementResult.ResultSet.PrecommitToken = txn.getPreCommitToken("ResultSetPrecommitToken") + statementResult.ResultSet.PrecommitToken = &spannerpb.MultiplexedSessionPrecommitToken{ + SeqNum: sequence.Add(1), + PrecommitToken: []byte(fmt.Sprintf("precommit-token-ResultSetPrecommitToken-%v", sequence.Load())), + } } + s.mu.Unlock() } return statementResult.ResultSet, nil case StatementResultUpdateCount: res := statementResult.convertUpdateCountToResultSet(!isPartitionedDml) // if request's session is multiplexed and transaction is Read/Write then add Pre-commit Token in Metadata s.mu.Lock() - txn, ok := s.multiplexedSessionTransactions[string(id)] - s.mu.Unlock() + sequence, ok := s.multiplexedSessionTransactions[string(id)] if ok { - res.PrecommitToken = txn.getPreCommitToken("ResultSetPrecommitToken") + res.PrecommitToken = &spannerpb.MultiplexedSessionPrecommitToken{ + SeqNum: sequence.Add(1), + PrecommitToken: []byte(fmt.Sprintf("precommit-token-ResultSetPrecommitToken-%v", sequence.Load())), + } } + s.mu.Unlock() return res, nil } return nil, gstatus.Error(codes.Internal, "Unknown result type") @@ -977,12 +983,17 @@ func (s *inMemSpannerServer) executeStreamingSQL(req *spannerpb.ExecuteSqlReques return nextPartialResultSetError.Err } } + // For every PartialResultSet, if request's session is multiplexed and transaction is Read/Write then add Pre-commit Token in Metadata + // and increment the sequence number s.mu.Lock() - txn, ok := s.multiplexedSessionTransactions[string(id)] - s.mu.Unlock() + sequence, ok := s.multiplexedSessionTransactions[string(id)] if ok { - part.PrecommitToken = txn.getPreCommitToken("PartialResultSetPrecommitToken") + part.PrecommitToken = &spannerpb.MultiplexedSessionPrecommitToken{ + SeqNum: sequence.Add(1), + PrecommitToken: []byte(fmt.Sprintf("precommit-token-PartialResultSetPrecommitToken-%v", sequence.Load())), + } } + s.mu.Unlock() if err := stream.Send(part); err != nil { return err } @@ -1043,11 +1054,14 @@ func (s *inMemSpannerServer) ExecuteBatchDml(ctx context.Context, req *spannerpb } } s.mu.Lock() - txn, ok := s.multiplexedSessionTransactions[string(id)] - s.mu.Unlock() + sequence, ok := s.multiplexedSessionTransactions[string(id)] if ok { - resp.PrecommitToken = txn.getPreCommitToken("ExecuteBatchDmlResponsePrecommitToken") + resp.PrecommitToken = &spannerpb.MultiplexedSessionPrecommitToken{ + SeqNum: sequence.Add(1), + PrecommitToken: []byte(fmt.Sprintf("precommit-token-ExecuteBatchDmlResponsePrecommitToken-%v", sequence.Load())), + } } + s.mu.Unlock() return resp, nil } diff --git a/spanner/session.go b/spanner/session.go index 7f17f515c859..8b02721bb896 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -24,7 +24,6 @@ import ( "log" "math" "math/rand" - "os" "runtime/debug" "strings" "sync" @@ -507,7 +506,9 @@ type SessionPoolConfig struct { // Defaults to false. TrackSessionHandles bool - // enableMultiplexSessionForRW is a flag to enable multiplexed session for read/write transactions. + enableMultiplexSession bool + + // enableMultiplexSessionForRW is a flag to enable multiplexed session for read/write transactions, is used in testing enableMultiplexSessionForRW bool // healthCheckSampleInterval is how often the health checker samples live @@ -702,10 +703,6 @@ func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, if config.MultiplexSessionCheckInterval == 0 { config.MultiplexSessionCheckInterval = 10 * time.Minute } - isMultiplexed := strings.ToLower(os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS")) - if isMultiplexed != "" && isMultiplexed != "true" && isMultiplexed != "false" { - return nil, spannerErrorf(codes.InvalidArgument, "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS must be either true or false") - } pool := &sessionPool{ sc: sc, @@ -717,7 +714,7 @@ func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, mw: newMaintenanceWindow(config.MaxOpened), rand: rand.New(rand.NewSource(time.Now().UnixNano())), otConfig: sc.otConfig, - enableMultiplexSession: isMultiplexed == "true" || config.enableMultiplexSessionForRW, + enableMultiplexSession: config.enableMultiplexSession, } _, instance, database, err := parseDatabaseName(sc.database) @@ -1295,7 +1292,6 @@ func (p *sessionPool) takeMultiplexed(ctx context.Context) (*sessionHandle, erro if isUnimplementedError(err) { logf(p.sc.logger, "Multiplexed session is not enabled on this project, continuing with regular sessions") p.enableMultiplexSession = false - p.enableMultiplexSessionForRW = false } else { p.mu.Unlock() // If the error is a timeout, there is a chance that the session was diff --git a/spanner/transaction_test.go b/spanner/transaction_test.go index 010b5cb40173..5cd57d2f7339 100644 --- a/spanner/transaction_test.go +++ b/spanner/transaction_test.go @@ -302,6 +302,99 @@ func TestReadWriteTransaction_ErrorReturned(t *testing.T) { } } +func TestClient_ReadWriteTransaction_UnimplementedErrorWithMultiplexedSessionSwitchesToRegular(t *testing.T) { + ctx := context.Background() + server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + DisableNativeMetrics: true, + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 1, + enableMultiplexSession: true, + enableMultiplexSessionForRW: true, + }, + }) + defer teardown() + + // Simulate an unimplemented error for the first transaction attempt. + server.TestSpanner.PutExecutionTime( + MethodExecuteStreamingSql, + SimulatedExecutionTime{Errors: []error{status.Error(codes.Unimplemented, "Unimplemented method")}}, + ) + + // Attempt the first read-write transaction, which should return an Unimplemented error. + _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + iter := tx.Query(ctx, NewStatement(SelectFooFromBar)) + defer iter.Stop() + for { + _, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } + } + return nil + }) + + requests := drainRequestsFromServer(server.TestSpanner) + foundMultiplexedSession := false + for _, req := range requests { + if sqlReq, ok := req.(*sppb.ExecuteSqlRequest); ok { + if strings.Contains(sqlReq.Session, "multiplexed") { + foundMultiplexedSession = true + break + } + } + } + + // Assert that the error is an Unimplemented error. + if status.Code(err) != codes.Unimplemented { + t.Fatalf("Expected Unimplemented error, got: %v", err) + } + if !foundMultiplexedSession { + t.Fatalf("Expected first transaction to use a multiplexed session, but it did not") + } + + server.TestSpanner.Reset() + + // Attempt a second read-write transaction. + _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + iter := tx.Query(ctx, NewStatement(SelectFooFromBar)) + defer iter.Stop() + for { + _, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } + } + return nil + }) + + if err != nil { + t.Fatalf("Unexpected error in second transaction: %v", err) + } + + // Check that the second transaction used a multiplexed session. + requests = drainRequestsFromServer(server.TestSpanner) + foundMultiplexedSession = false + for _, req := range requests { + if sqlReq, ok := req.(*sppb.CommitRequest); ok { + if strings.Contains(sqlReq.Session, "multiplexed") { + foundMultiplexedSession = true + break + } + } + } + + if foundMultiplexedSession { + t.Fatalf("Expected second transaction to use a regular session, but it did not") + } +} + func TestReadWriteTransaction_PrecommitToken(t *testing.T) { t.Parallel() ctx := context.Background() @@ -310,6 +403,7 @@ func TestReadWriteTransaction_PrecommitToken(t *testing.T) { SessionPoolConfig: SessionPoolConfig{ MinOpened: 1, MaxOpened: 1, + enableMultiplexSession: true, enableMultiplexSessionForRW: true, }, }) From f7ae1bf5b6b6d266dcf260f9cad7a31f34cc3bd5 Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Fri, 20 Dec 2024 17:59:04 +0530 Subject: [PATCH 4/5] disable multiplxed session for ReadWrite only when unimplemented error is because of multiplex from server --- spanner/client.go | 66 ++++++------- .../internal/testutil/inmem_spanner_server.go | 78 +++++---------- spanner/session.go | 16 ++-- spanner/transaction_test.go | 95 +++++++++---------- 4 files changed, 112 insertions(+), 143 deletions(-) diff --git a/spanner/client.go b/spanner/client.go index 557cd508f898..1ce356266108 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -107,20 +107,20 @@ func parseDatabaseName(db string) (project, instance, database string, err error // Client is a client for reading and writing data to a Cloud Spanner database. // A client is safe to use concurrently, except for its Close method. type Client struct { - sc *sessionClient - idleSessions *sessionPool - logger *log.Logger - qo QueryOptions - ro ReadOptions - ao []ApplyOption - txo TransactionOptions - bwo BatchWriteOptions - ct *commonTags - disableRouteToLeader bool - enableMultiplexSessionForRW bool - dro *sppb.DirectedReadOptions - otConfig *openTelemetryConfig - metricsTracerFactory *builtinMetricsTracerFactory + sc *sessionClient + idleSessions *sessionPool + logger *log.Logger + qo QueryOptions + ro ReadOptions + ao []ApplyOption + txo TransactionOptions + bwo BatchWriteOptions + ct *commonTags + disableRouteToLeader bool + enableMultiplexedSessionForRW bool + dro *sppb.DirectedReadOptions + otConfig *openTelemetryConfig + metricsTracerFactory *builtinMetricsTracerFactory } // DatabaseName returns the full name of a database, e.g., @@ -496,11 +496,11 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf } //TODO: Uncomment this once the feature is enabled. //if isMultiplexForRW := os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW"); isMultiplexForRW != "" { - // config.enableMultiplexSessionForRW, err = strconv.ParseBool(isMultiplexForRW) + // config.enableMultiplexedSessionForRW, err = strconv.ParseBool(isMultiplexForRW) // if err != nil { // return nil, spannerErrorf(codes.InvalidArgument, "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW must be either true or false") // } - // config.enableMultiplexSessionForRW = config.enableMultiplexSessionForRW && config.SessionPoolConfig.enableMultiplexSession + // config.enableMultiplexedSessionForRW = config.enableMultiplexedSessionForRW && config.SessionPoolConfig.enableMultiplexSession //} // Create a session client. @@ -548,20 +548,20 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf } c = &Client{ - sc: sc, - idleSessions: sp, - logger: config.Logger, - qo: getQueryOptions(config.QueryOptions), - ro: config.ReadOptions, - ao: config.ApplyOptions, - txo: config.TransactionOptions, - bwo: config.BatchWriteOptions, - ct: getCommonTags(sc), - disableRouteToLeader: config.DisableRouteToLeader, - dro: config.DirectedReadOptions, - otConfig: otConfig, - metricsTracerFactory: metricsTracerFactory, - enableMultiplexSessionForRW: config.enableMultiplexSessionForRW, + sc: sc, + idleSessions: sp, + logger: config.Logger, + qo: getQueryOptions(config.QueryOptions), + ro: config.ReadOptions, + ao: config.ApplyOptions, + txo: config.TransactionOptions, + bwo: config.BatchWriteOptions, + ct: getCommonTags(sc), + disableRouteToLeader: config.DisableRouteToLeader, + dro: config.DirectedReadOptions, + otConfig: otConfig, + metricsTracerFactory: metricsTracerFactory, + enableMultiplexedSessionForRW: config.enableMultiplexedSessionForRW, } return c, nil } @@ -1025,7 +1025,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea err error ) if sh == nil || sh.getID() == "" || sh.getClient() == nil { - if c.enableMultiplexSessionForRW { + if c.enableMultiplexedSessionForRW { sh, err = c.idleSessions.takeMultiplexed(ctx) } else { // Session handle hasn't been allocated or has been destroyed. @@ -1071,8 +1071,8 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea resp, err = t.runInTransaction(ctx, f) return err }) - if isUnimplementedError(err) { - c.enableMultiplexSessionForRW = false + if isUnimplementedErrorForMultiplexedRW(err) { + c.enableMultiplexedSessionForRW = false } return resp, err } diff --git a/spanner/internal/testutil/inmem_spanner_server.go b/spanner/internal/testutil/inmem_spanner_server.go index 1155efa52ff9..86770e4d2948 100644 --- a/spanner/internal/testutil/inmem_spanner_server.go +++ b/spanner/internal/testutil/inmem_spanner_server.go @@ -334,8 +334,8 @@ type inMemSpannerServer struct { // counters. transactionCounters map[string]*uint64 // The transactions that have been created on this mock server. - transactions map[string]*spannerpb.Transaction - multiplexedSessionTransactions map[string]*atomic.Int32 + transactions map[string]*spannerpb.Transaction + multiplexedSessionTransactionsToSeqNo map[string]*atomic.Int32 // The transactions that have been (manually) aborted on the server. abortedTransactions map[string]bool // The transactions that are marked as PartitionedDMLTransaction @@ -361,20 +361,6 @@ type inMemSpannerServer struct { freezed chan struct{} } -// Transaction is a wrapper around a spannerpb.Transaction that also contains -// a sequence number that is used to generate precommit tokens. -type Transaction struct { - sequence *atomic.Int32 - transaction *spannerpb.Transaction -} - -func (t *Transaction) getPreCommitToken(operation string) *spannerpb.MultiplexedSessionPrecommitToken { - return &spannerpb.MultiplexedSessionPrecommitToken{ - SeqNum: t.sequence.Add(1), - PrecommitToken: []byte(fmt.Sprintf("precommit-token-%v-%v", operation, t.sequence.Load())), - } -} - // NewInMemSpannerServer creates a new in-mem test server. func NewInMemSpannerServer() InMemSpannerServer { res := &inMemSpannerServer{} @@ -537,12 +523,25 @@ func (s *inMemSpannerServer) initDefaults() { s.sessions = make(map[string]*spannerpb.Session) s.sessionLastUseTime = make(map[string]time.Time) s.transactions = make(map[string]*spannerpb.Transaction) - s.multiplexedSessionTransactions = make(map[string]*atomic.Int32) + s.multiplexedSessionTransactionsToSeqNo = make(map[string]*atomic.Int32) s.abortedTransactions = make(map[string]bool) s.partitionedDmlTransactions = make(map[string]bool) s.transactionCounters = make(map[string]*uint64) } +func (s *inMemSpannerServer) getPreCommitToken(transactionID, operation string) *spannerpb.MultiplexedSessionPrecommitToken { + s.mu.Lock() + defer s.mu.Unlock() + sequence, ok := s.multiplexedSessionTransactionsToSeqNo[transactionID] + if !ok { + return nil + } + return &spannerpb.MultiplexedSessionPrecommitToken{ + SeqNum: sequence.Add(1), + PrecommitToken: []byte(fmt.Sprintf("precommit-token-%v-%v", operation, sequence.Load())), + } +} + func (s *inMemSpannerServer) generateSessionNameLocked(database string, isMultiplexed bool) string { s.sessionCounter++ if isMultiplexed { @@ -615,7 +614,7 @@ func (s *inMemSpannerServer) beginTransaction(session *spannerpb.Session, option } s.mu.Lock() if options.GetReadWrite() != nil && session.Multiplexed { - s.multiplexedSessionTransactions[id] = new(atomic.Int32) + s.multiplexedSessionTransactionsToSeqNo[id] = new(atomic.Int32) } s.transactions[id] = res s.partitionedDmlTransactions[id] = options.GetPartitionedDml() != nil @@ -654,7 +653,7 @@ func (s *inMemSpannerServer) removeTransaction(tx *spannerpb.Transaction) { s.mu.Lock() defer s.mu.Unlock() delete(s.transactions, string(tx.Id)) - delete(s.multiplexedSessionTransactions, string(tx.Id)) + delete(s.multiplexedSessionTransactionsToSeqNo, string(tx.Id)) delete(s.partitionedDmlTransactions, string(tx.Id)) } @@ -894,29 +893,12 @@ func (s *inMemSpannerServer) ExecuteSql(ctx context.Context, req *spannerpb.Exec // if request's session is multiplexed and transaction is Read/Write then add Pre-commit Token in Metadata if statementResult.ResultSet != nil { - s.mu.Lock() - sequence, ok := s.multiplexedSessionTransactions[string(id)] - if ok { - statementResult.ResultSet.PrecommitToken = &spannerpb.MultiplexedSessionPrecommitToken{ - SeqNum: sequence.Add(1), - PrecommitToken: []byte(fmt.Sprintf("precommit-token-ResultSetPrecommitToken-%v", sequence.Load())), - } - } - s.mu.Unlock() + statementResult.ResultSet.PrecommitToken = s.getPreCommitToken(string(id), "ResultSetPrecommitToken") } return statementResult.ResultSet, nil case StatementResultUpdateCount: res := statementResult.convertUpdateCountToResultSet(!isPartitionedDml) - // if request's session is multiplexed and transaction is Read/Write then add Pre-commit Token in Metadata - s.mu.Lock() - sequence, ok := s.multiplexedSessionTransactions[string(id)] - if ok { - res.PrecommitToken = &spannerpb.MultiplexedSessionPrecommitToken{ - SeqNum: sequence.Add(1), - PrecommitToken: []byte(fmt.Sprintf("precommit-token-ResultSetPrecommitToken-%v", sequence.Load())), - } - } - s.mu.Unlock() + res.PrecommitToken = s.getPreCommitToken(string(id), "ResultSetPrecommitToken") return res, nil } return nil, gstatus.Error(codes.Internal, "Unknown result type") @@ -985,15 +967,7 @@ func (s *inMemSpannerServer) executeStreamingSQL(req *spannerpb.ExecuteSqlReques } // For every PartialResultSet, if request's session is multiplexed and transaction is Read/Write then add Pre-commit Token in Metadata // and increment the sequence number - s.mu.Lock() - sequence, ok := s.multiplexedSessionTransactions[string(id)] - if ok { - part.PrecommitToken = &spannerpb.MultiplexedSessionPrecommitToken{ - SeqNum: sequence.Add(1), - PrecommitToken: []byte(fmt.Sprintf("precommit-token-PartialResultSetPrecommitToken-%v", sequence.Load())), - } - } - s.mu.Unlock() + part.PrecommitToken = s.getPreCommitToken(string(id), "PartialResultSetPrecommitToken") if err := stream.Send(part); err != nil { return err } @@ -1053,15 +1027,7 @@ func (s *inMemSpannerServer) ExecuteBatchDml(ctx context.Context, req *spannerpb resp.ResultSets[idx] = statementResult.convertUpdateCountToResultSet(!isPartitionedDml) } } - s.mu.Lock() - sequence, ok := s.multiplexedSessionTransactions[string(id)] - if ok { - resp.PrecommitToken = &spannerpb.MultiplexedSessionPrecommitToken{ - SeqNum: sequence.Add(1), - PrecommitToken: []byte(fmt.Sprintf("precommit-token-ExecuteBatchDmlResponsePrecommitToken-%v", sequence.Load())), - } - } - s.mu.Unlock() + resp.PrecommitToken = s.getPreCommitToken(string(id), "ExecuteBatchDmlResponsePrecommitToken") return resp, nil } diff --git a/spanner/session.go b/spanner/session.go index 8b02721bb896..2165fdee04d8 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -508,8 +508,8 @@ type SessionPoolConfig struct { enableMultiplexSession bool - // enableMultiplexSessionForRW is a flag to enable multiplexed session for read/write transactions, is used in testing - enableMultiplexSessionForRW bool + // enableMultiplexedSessionForRW is a flag to enable multiplexed session for read/write transactions, is used in testing + enableMultiplexedSessionForRW bool // healthCheckSampleInterval is how often the health checker samples live // session (for use in maintaining session pool size). @@ -1945,15 +1945,19 @@ func isSessionNotFoundError(err error) bool { return strings.Contains(err.Error(), "Session not found") } -// isUnimplementedError returns true if the gRPC error code is Unimplemented. func isUnimplementedError(err error) bool { if err == nil { return false } - if ErrCode(err) == codes.Unimplemented { - return true + return ErrCode(err) == codes.Unimplemented +} + +// isUnimplementedErrorForMultiplexedRW returns true if the gRPC error code is Unimplemented and related to use of multiplexed session with ReadWrite txn. +func isUnimplementedErrorForMultiplexedRW(err error) bool { + if err == nil { + return false } - return false + return ErrCode(err) == codes.Unimplemented && strings.Contains(err.Error(), "Transaction type read_write not supported with multiplexed sessions") } func isFailedInlineBeginTransaction(err error) bool { diff --git a/spanner/transaction_test.go b/spanner/transaction_test.go index 5cd57d2f7339..1acd7f72f390 100644 --- a/spanner/transaction_test.go +++ b/spanner/transaction_test.go @@ -307,59 +307,58 @@ func TestClient_ReadWriteTransaction_UnimplementedErrorWithMultiplexedSessionSwi server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ DisableNativeMetrics: true, SessionPoolConfig: SessionPoolConfig{ - MinOpened: 1, - MaxOpened: 1, - enableMultiplexSession: true, - enableMultiplexSessionForRW: true, + MinOpened: 1, + MaxOpened: 1, + enableMultiplexSession: true, + enableMultiplexedSessionForRW: true, }, }) defer teardown() - // Simulate an unimplemented error for the first transaction attempt. - server.TestSpanner.PutExecutionTime( - MethodExecuteStreamingSql, - SimulatedExecutionTime{Errors: []error{status.Error(codes.Unimplemented, "Unimplemented method")}}, - ) - - // Attempt the first read-write transaction, which should return an Unimplemented error. - _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { - iter := tx.Query(ctx, NewStatement(SelectFooFromBar)) - defer iter.Stop() - for { - _, err := iter.Next() - if err == iterator.Done { - break + for _, sumulatdError := range []error{ + status.Error(codes.Unimplemented, "other Unimplemented error which should not turn off multiplexed session"), + status.Error(codes.Unimplemented, "Transaction type read_write not supported with multiplexed sessions")} { + server.TestSpanner.PutExecutionTime( + MethodExecuteStreamingSql, + SimulatedExecutionTime{Errors: []error{sumulatdError}}, + ) + _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + iter := tx.Query(ctx, NewStatement(SelectFooFromBar)) + defer iter.Stop() + for { + _, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return err + } } - if err != nil { - return err + return nil + }) + requests := drainRequestsFromServer(server.TestSpanner) + foundMultiplexedSession := false + for _, req := range requests { + if sqlReq, ok := req.(*sppb.ExecuteSqlRequest); ok { + if strings.Contains(sqlReq.Session, "multiplexed") { + foundMultiplexedSession = true + break + } } } - return nil - }) - requests := drainRequestsFromServer(server.TestSpanner) - foundMultiplexedSession := false - for _, req := range requests { - if sqlReq, ok := req.(*sppb.ExecuteSqlRequest); ok { - if strings.Contains(sqlReq.Session, "multiplexed") { - foundMultiplexedSession = true - break - } + // Assert that the error is an Unimplemented error. + if status.Code(err) != codes.Unimplemented { + t.Fatalf("Expected Unimplemented error, got: %v", err) } + if !foundMultiplexedSession { + t.Fatalf("Expected first transaction to use a multiplexed session, but it did not") + } + server.TestSpanner.Reset() } - // Assert that the error is an Unimplemented error. - if status.Code(err) != codes.Unimplemented { - t.Fatalf("Expected Unimplemented error, got: %v", err) - } - if !foundMultiplexedSession { - t.Fatalf("Expected first transaction to use a multiplexed session, but it did not") - } - - server.TestSpanner.Reset() - // Attempt a second read-write transaction. - _, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { iter := tx.Query(ctx, NewStatement(SelectFooFromBar)) defer iter.Stop() for { @@ -378,9 +377,9 @@ func TestClient_ReadWriteTransaction_UnimplementedErrorWithMultiplexedSessionSwi t.Fatalf("Unexpected error in second transaction: %v", err) } - // Check that the second transaction used a multiplexed session. - requests = drainRequestsFromServer(server.TestSpanner) - foundMultiplexedSession = false + // Check that the second transaction used a regular session. + requests := drainRequestsFromServer(server.TestSpanner) + foundMultiplexedSession := false for _, req := range requests { if sqlReq, ok := req.(*sppb.CommitRequest); ok { if strings.Contains(sqlReq.Session, "multiplexed") { @@ -401,10 +400,10 @@ func TestReadWriteTransaction_PrecommitToken(t *testing.T) { server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ DisableNativeMetrics: true, SessionPoolConfig: SessionPoolConfig{ - MinOpened: 1, - MaxOpened: 1, - enableMultiplexSession: true, - enableMultiplexSessionForRW: true, + MinOpened: 1, + MaxOpened: 1, + enableMultiplexSession: true, + enableMultiplexedSessionForRW: true, }, }) defer teardown() From c115d30119235031b512ee92228c4d36e3288a34 Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Mon, 23 Dec 2024 10:56:54 +0530 Subject: [PATCH 5/5] re-trigger