From db0bd31046a1f014481aadafc840f28c3f80dc27 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Tue, 31 Oct 2023 06:47:39 +0000 Subject: [PATCH 1/4] feat(spanner): add new field and add relevant tests --- spanner/client_test.go | 77 ++++++++++++++++++++++++++++++++++++++--- spanner/session.go | 16 +++++++-- spanner/session_test.go | 38 ++++++++++---------- spanner/transaction.go | 8 +++++ 4 files changed, 113 insertions(+), 26 deletions(-) diff --git a/spanner/client_test.go b/spanner/client_test.go index 4594811f903f..0eb66755c27d 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -230,9 +230,9 @@ func TestClient_Single_WhenInactiveTransactionsAndSessionIsNotFoundOnBackend_Rem iter := single.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) p := client.idleSessions sh := single.sh - // simulate session to be checked out for more than 60mins + // simulate session to be last used before 60 mins sh.mu.Lock() - sh.checkoutTime = time.Now().Add(-time.Hour) + sh.lastUseTime = time.Now().Add(-time.Hour) sh.mu.Unlock() // force run task to clean up unexpected long-running sessions @@ -1460,10 +1460,10 @@ func TestClient_ReadWriteTransaction_WhenLongRunningSessionCleaned_TransactionSh return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w) } - // Simulate the session to be checked out for more than 60 mins. + // Simulate the session to be last used before 60 mins. // The background task cleans up this long-running session. tx.sh.mu.Lock() - tx.sh.checkoutTime = time.Now().Add(-time.Hour) + tx.sh.lastUseTime = time.Now().Add(-time.Hour) if g, w := tx.sh.eligibleForLongRunning, false; g != w { tx.sh.mu.Unlock() return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w) @@ -1489,6 +1489,73 @@ func TestClient_ReadWriteTransaction_WhenLongRunningSessionCleaned_TransactionSh } } +func TestClient_ReadWriteTransaction_WhenMultipleOperations_SessionLastUseTimeShouldBeUpdated(t *testing.T) { + t.Parallel() + server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 1, + InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ + actionOnInactiveTransaction: WarnAndClose, + idleTimeThreshold: 3 * time.Second, + }, + }, + }) + defer teardown() + server.TestSpanner.PutExecutionTime(MethodExecuteSql, + SimulatedExecutionTime{ + MinimumExecutionTime: 2 * time.Second, + }) + ctx := context.Background() + p := client.idleSessions + _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + // Execute first operation on the transaction + _, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo)) + if err != nil { + return err + } + + // Get the session last use time. + tx.sh.mu.Lock() + sessionPrevLastUseTime := tx.sh.lastUseTime + tx.sh.mu.Unlock() + + // Execute second operation on the transaction + _, err = tx.Update(ctx, NewStatement(UpdateBarSetFoo)) + if err != nil { + return err + } + // Get the latest session last use time + tx.sh.mu.Lock() + sessionLatestLastUseTime := tx.sh.lastUseTime + sessionCheckoutTime := tx.sh.checkoutTime + tx.sh.mu.Unlock() + + // sessionLatestLastUseTime should not be equal to sessionPrevLastUseTime. + // This is because session lastUse time should be updated whenever a new operation is being executed on the transaction. + if (sessionLatestLastUseTime.Sub(sessionPrevLastUseTime)).Seconds() <= 0 { + t.Fatalf("Session lastUseTime times should not be equal") + } + + if (time.Now().Sub(sessionPrevLastUseTime)).Seconds() < 4 { + t.Fatalf("Expected session to be checkedout for more than 4 seconds") + } + if (time.Now().Sub(sessionCheckoutTime)).Seconds() < 4 { + t.Fatalf("Expected session to be checkedout for more than 4 seconds") + } + // force run task to clean up unexpected long-running sessions whose lastUseTime >= 3sec. + // The session should not be cleaned since the latest operation on the transaction has updated the lastUseTime. + p.removeLongRunningSessions() + if p.numOfLeakedSessionsRemoved > 0 { + t.Fatalf("Expected session to not get cleaned by background maintainer") + } + return nil + }) + if err != nil { + t.Fatal(err) + } +} + func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *testing.T) { t.Parallel() @@ -1546,7 +1613,7 @@ func TestClient_ReadWriteTransaction_WhenLongRunningExecuteBatchUpdate_TakeNoAct if attempts == 2 { // Simulate the session to be long-running. The background task should not clean up this long-running session. tx.sh.mu.Lock() - tx.sh.checkoutTime = time.Now().Add(-time.Hour) + tx.sh.lastUseTime = time.Now().Add(-time.Hour) if g, w := tx.sh.eligibleForLongRunning, true; g != w { tx.sh.mu.Unlock() return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w) diff --git a/spanner/session.go b/spanner/session.go index 7a58379a0cc7..884036be4051 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -90,6 +90,8 @@ type sessionHandle struct { session *session // checkoutTime is the time the session was checked out of the pool. checkoutTime time.Time + // lastUseTime is the time the session was last used after checked out of the pool. + lastUseTime time.Time // trackedSessionHandle is the linked list node which links the session to // the list of tracked session handles. trackedSessionHandle is only set if // TrackSessionHandles has been enabled in the session pool configuration. @@ -118,6 +120,7 @@ func (sh *sessionHandle) recycle() { sh.session = nil sh.trackedSessionHandle = nil sh.checkoutTime = time.Time{} + sh.lastUseTime = time.Time{} sh.stack = nil sh.mu.Unlock() s.recycle() @@ -187,6 +190,7 @@ func (sh *sessionHandle) destroy() { sh.session = nil sh.trackedSessionHandle = nil sh.checkoutTime = time.Time{} + sh.lastUseTime = time.Time{} sh.stack = nil sh.mu.Unlock() @@ -199,6 +203,14 @@ func (sh *sessionHandle) destroy() { s.destroy(false) } +func (sh *sessionHandle) updateLastUseTime() { + sh.mu.Lock() + defer sh.mu.Unlock() + if sh.session != nil { + sh.lastUseTime = time.Now() + } +} + // session wraps a Cloud Spanner session ID through which transactions are // created and executed. type session struct { @@ -712,7 +724,7 @@ func (p *sessionPool) getLongRunningSessionsLocked() []*sessionHandle { for element != nil { sh := element.Value.(*sessionHandle) sh.mu.Lock() - diff := time.Now().Sub(sh.checkoutTime) + diff := time.Now().Sub(sh.lastUseTime) if !sh.eligibleForLongRunning && diff.Seconds() >= p.idleTimeThreshold.Seconds() { if (p.actionOnInactiveTransaction == Warn || p.actionOnInactiveTransaction == WarnAndClose) && !sh.isSessionLeakLogged { if p.actionOnInactiveTransaction == Warn { @@ -880,7 +892,7 @@ var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canc // stack if the session pool has been configured to track the call stacks of // sessions being checked out of the pool. func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) { - sh = &sessionHandle{session: s, checkoutTime: time.Now()} + sh = &sessionHandle{session: s, checkoutTime: time.Now(), lastUseTime: time.Now()} if p.TrackSessionHandles || p.actionOnInactiveTransaction == Warn || p.actionOnInactiveTransaction == WarnAndClose || p.actionOnInactiveTransaction == Close { p.mu.Lock() sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh) diff --git a/spanner/session_test.go b/spanner/session_test.go index 964cf2ff2ac1..4ec24afd4006 100644 --- a/spanner/session_test.go +++ b/spanner/session_test.go @@ -455,8 +455,8 @@ func TestSessionLeak_WhenInactiveTransactions_RemoveSessionsFromPool(t *testing. t.Fatalf("isLongRunningTransaction mismatch\nGot: %v\nWant: %v\n", g, w) } - // Mock the session checkout time to be greater than 60 mins - single.sh.checkoutTime = time.Now().Add(-time.Hour) + // Mock the session lastUseTime to be greater than 60 mins + single.sh.lastUseTime = time.Now().Add(-time.Hour) single.sh.mu.Unlock() // force run task to clean up unexpected long-running sessions @@ -526,17 +526,17 @@ func TestMaintainer_LongRunningTransactionsCleanup_IfClose_VerifyInactiveSession sp.mu.Unlock() s1.mu.Lock() s1.eligibleForLongRunning = false - s1.checkoutTime = time.Now().Add(-time.Hour) + s1.lastUseTime = time.Now().Add(-time.Hour) s1.mu.Unlock() s2.mu.Lock() s2.eligibleForLongRunning = false - s2.checkoutTime = time.Now().Add(-time.Hour) + s2.lastUseTime = time.Now().Add(-time.Hour) s2.mu.Unlock() s3.mu.Lock() s3.eligibleForLongRunning = true - s3.checkoutTime = time.Now().Add(-time.Hour) + s3.lastUseTime = time.Now().Add(-time.Hour) s3.mu.Unlock() // Sleep for maintainer to run long-running cleanup task @@ -596,17 +596,17 @@ func TestLongRunningTransactionsCleanup_IfClose_VerifyInactiveSessionsClosed(t * sp.mu.Unlock() s1.mu.Lock() s1.eligibleForLongRunning = false - s1.checkoutTime = time.Now().Add(-time.Hour) + s1.lastUseTime = time.Now().Add(-time.Hour) s1.mu.Unlock() s2.mu.Lock() s2.eligibleForLongRunning = false - s2.checkoutTime = time.Now().Add(-time.Hour) + s2.lastUseTime = time.Now().Add(-time.Hour) s2.mu.Unlock() s3.mu.Lock() s3.eligibleForLongRunning = true - s3.checkoutTime = time.Now().Add(-time.Hour) + s3.lastUseTime = time.Now().Add(-time.Hour) s3.mu.Unlock() // force run task to clean up unexpected long-running sessions @@ -666,17 +666,17 @@ func TestLongRunningTransactionsCleanup_IfLog_VerifyInactiveSessionsOpen(t *test sp.mu.Unlock() s1.mu.Lock() s1.eligibleForLongRunning = false - s1.checkoutTime = time.Now().Add(-time.Hour) + s1.lastUseTime = time.Now().Add(-time.Hour) s1.mu.Unlock() s2.mu.Lock() s2.eligibleForLongRunning = false - s2.checkoutTime = time.Now().Add(-time.Hour) + s2.lastUseTime = time.Now().Add(-time.Hour) s2.mu.Unlock() s3.mu.Lock() s3.eligibleForLongRunning = true - s3.checkoutTime = time.Now().Add(-time.Hour) + s3.lastUseTime = time.Now().Add(-time.Hour) s3.mu.Unlock() // force run task to clean up unexpected long-running sessions @@ -750,12 +750,12 @@ func TestLongRunningTransactionsCleanup_UtilisationBelowThreshold_VerifyInactive sp.mu.Unlock() s1.mu.Lock() s1.eligibleForLongRunning = false - s1.checkoutTime = time.Now().Add(-time.Hour) + s1.lastUseTime = time.Now().Add(-time.Hour) s1.mu.Unlock() s2.mu.Lock() s2.eligibleForLongRunning = false - s2.checkoutTime = time.Now().Add(-time.Hour) + s2.lastUseTime = time.Now().Add(-time.Hour) s2.mu.Unlock() // force run task to clean up unexpected long-running sessions @@ -816,17 +816,17 @@ func TestLongRunningTransactions_WhenAllExpectedlyLongRunning_VerifyInactiveSess sp.mu.Unlock() s1.mu.Lock() s1.eligibleForLongRunning = true - s1.checkoutTime = time.Now().Add(-time.Hour) + s1.lastUseTime = time.Now().Add(-time.Hour) s1.mu.Unlock() s2.mu.Lock() s2.eligibleForLongRunning = true - s2.checkoutTime = time.Now().Add(-time.Hour) + s2.lastUseTime = time.Now().Add(-time.Hour) s2.mu.Unlock() s3.mu.Lock() s3.eligibleForLongRunning = true - s3.checkoutTime = time.Now().Add(-time.Hour) + s3.lastUseTime = time.Now().Add(-time.Hour) s3.mu.Unlock() // force run task to clean up unexpected long-running sessions @@ -886,17 +886,17 @@ func TestLongRunningTransactions_WhenDurationBelowThreshold_VerifyInactiveSessio sp.mu.Unlock() s1.mu.Lock() s1.eligibleForLongRunning = false - s1.checkoutTime = time.Now().Add(-50 * time.Minute) + s1.lastUseTime = time.Now().Add(-50 * time.Minute) s1.mu.Unlock() s2.mu.Lock() s2.eligibleForLongRunning = false - s2.checkoutTime = time.Now().Add(-50 * time.Minute) + s2.lastUseTime = time.Now().Add(-50 * time.Minute) s2.mu.Unlock() s3.mu.Lock() s3.eligibleForLongRunning = true - s3.checkoutTime = time.Now().Add(-50 * time.Minute) + s3.lastUseTime = time.Now().Add(-50 * time.Minute) s3.mu.Unlock() // force run task to clean up unexpected long-running sessions diff --git a/spanner/transaction.go b/spanner/transaction.go index 6bbd6bd4d018..c4b04881c80b 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -825,6 +825,9 @@ func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHand }, } t.mu.Unlock() + if sh != nil { + sh.updateLastUseTime() + } return sh, ts, nil } state := t.state @@ -1267,6 +1270,9 @@ func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sp }, } t.mu.Unlock() + if sh != nil { + sh.updateLastUseTime() + } return sh, ts, nil default: state := t.state @@ -1479,6 +1485,7 @@ func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions if sid == "" || client == nil { return resp, errSessionClosed(t.sh) } + t.sh.updateLastUseTime() var md metadata.MD res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata(), t.disableRouteToLeader), &sppb.CommitRequest{ @@ -1527,6 +1534,7 @@ func (t *ReadWriteTransaction) rollback(ctx context.Context) { if sid == "" || client == nil { return } + t.sh.updateLastUseTime() err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata(), t.disableRouteToLeader), &sppb.RollbackRequest{ Session: sid, TransactionId: t.tx, From b024b5f1a5b775c90d42a6bb9fd57db3d81726fc Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Tue, 31 Oct 2023 10:17:30 +0000 Subject: [PATCH 2/4] feat(spanner): add test for read only transaction --- spanner/client_test.go | 66 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/spanner/client_test.go b/spanner/client_test.go index 0eb66755c27d..de62c398246f 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -1038,6 +1038,72 @@ func TestClient_ReadOnlyTransaction_ReadOptions(t *testing.T) { } } +func TestClient_ReadOnlyTransaction_WhenMultipleOperations_SessionLastUseTimeShouldBeUpdated(t *testing.T) { + t.Parallel() + + server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 1, + InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ + actionOnInactiveTransaction: WarnAndClose, + idleTimeThreshold: 3 * time.Second, + }, + }, + }) + defer teardown() + server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql, + SimulatedExecutionTime{ + MinimumExecutionTime: 2 * time.Second, + }) + server.TestSpanner.PutExecutionTime(MethodStreamingRead, + SimulatedExecutionTime{ + MinimumExecutionTime: 2 * time.Second, + }) + ctx := context.Background() + p := client.idleSessions + + roTxn := client.ReadOnlyTransaction() + defer roTxn.Close() + iter := roTxn.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) + iter.Next() + iter.Stop() + + // Get the session last use time. + roTxn.sh.mu.Lock() + sessionPrevLastUseTime := roTxn.sh.lastUseTime + roTxn.sh.mu.Unlock() + + iter = roTxn.Read(ctx, "FOO", AllKeys(), []string{"BAR"}) + iter.Next() + iter.Stop() + + // Get the latest session last use time + roTxn.sh.mu.Lock() + sessionLatestLastUseTime := roTxn.sh.lastUseTime + sessionCheckoutTime := roTxn.sh.checkoutTime + roTxn.sh.mu.Unlock() + + // sessionLatestLastUseTime should not be equal to sessionPrevLastUseTime. + // This is because session lastUse time should be updated whenever a new operation is being executed on the transaction. + if (sessionLatestLastUseTime.Sub(sessionPrevLastUseTime)).Seconds() <= 0 { + t.Fatalf("Session lastUseTime times should not be equal") + } + + if (time.Now().Sub(sessionPrevLastUseTime)).Seconds() < 4 { + t.Fatalf("Expected session to be checkedout for more than 4 seconds") + } + if (time.Now().Sub(sessionCheckoutTime)).Seconds() < 4 { + t.Fatalf("Expected session to be checkedout for more than 4 seconds") + } + // force run task to clean up unexpected long-running sessions whose lastUseTime >= 3sec. + // The session should not be cleaned since the latest operation on the transaction has updated the lastUseTime. + p.removeLongRunningSessions() + if p.numOfLeakedSessionsRemoved > 0 { + t.Fatalf("Expected session to not get cleaned by background maintainer") + } +} + func setQueryOptionsEnvVars(opts *sppb.ExecuteSqlRequest_QueryOptions) func() { os.Setenv("SPANNER_OPTIMIZER_VERSION", opts.OptimizerVersion) os.Setenv("SPANNER_OPTIMIZER_STATISTICS_PACKAGE", opts.OptimizerStatisticsPackage) From 7431b6e215fdce96fce0d6acee3bb577f9f538e4 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 1 Nov 2023 06:27:42 +0000 Subject: [PATCH 3/4] feat(spanner): update lastUseTime before invoking the RPC --- spanner/batch.go | 3 +++ spanner/client.go | 2 ++ spanner/pdml.go | 3 +++ spanner/transaction.go | 18 ++++++++++++------ 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/spanner/batch.go b/spanner/batch.go index e1ed21f23cbc..988382d8402b 100644 --- a/spanner/batch.go +++ b/spanner/batch.go @@ -133,6 +133,7 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex return nil, err } var md metadata.MD + sh.updateLastUseTime() resp, err = client.PartitionRead(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), &sppb.PartitionReadRequest{ Session: sid, Transaction: ts, @@ -203,6 +204,7 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement Params: params, ParamTypes: paramTypes, } + sh.updateLastUseTime() resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), req, gax.WithGRPCOptions(grpc.Header(&md))) if getGFELatencyMetricsFlag() && md != nil && t.ct != nil { @@ -306,6 +308,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R // Might happen if transaction is closed in the middle of a API call. return &RowIterator{err: errSessionClosed(sh)} } + sh.updateLastUseTime() // Read or query partition. if p.rreq != nil { rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { diff --git a/spanner/client.go b/spanner/client.go index 2a26e1d4c481..1c81ccefba9e 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -427,6 +427,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound return nil, err } sh = &sessionHandle{session: s} + sh.updateLastUseTime() // Begin transaction. res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata(), true), &sppb.BeginTransactionRequest{ @@ -854,6 +855,7 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup rpc := func(ct context.Context) (sppb.Spanner_BatchWriteClient, error) { var md metadata.MD + sh.updateLastUseTime() stream, rpcErr := sh.getClient().BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{ Session: sh.getID(), MutationGroups: mgsPb, diff --git a/spanner/pdml.go b/spanner/pdml.go index 4dfde7efb370..e23b7c27da7c 100644 --- a/spanner/pdml.go +++ b/spanner/pdml.go @@ -108,6 +108,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt // Note that PDML transactions cannot be committed or rolled back. func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) { var md metadata.MD + sh.updateLastUseTime() // Begin transaction. res, err := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{ Session: sh.getID(), @@ -122,6 +123,8 @@ func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlReq req.Transaction = &sppb.TransactionSelector{ Selector: &sppb.TransactionSelector_Id{Id: res.Id}, } + + sh.updateLastUseTime() resultSet, err := sh.getClient().ExecuteSql(ctx, req, gax.WithGRPCOptions(grpc.Header(&md))) if getGFELatencyMetricsFlag() && md != nil && sh.session.pool != nil { err := captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_ExecuteSql") diff --git a/spanner/transaction.go b/spanner/transaction.go index c4b04881c80b..941ac3eab49c 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -248,6 +248,9 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), sh.session.logger, func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { + if t.sh != nil { + t.sh.updateLastUseTime() + } client, err := client.StreamingRead(ctx, &sppb.ReadRequest{ Session: t.sh.getID(), @@ -496,6 +499,8 @@ func (t *txReadOnly) query(ctx context.Context, statement Statement, options Que req.ResumeToken = resumeToken req.Session = t.sh.getID() req.Transaction = t.getTransactionSelector() + t.sh.updateLastUseTime() + client, err := client.ExecuteStreamingSql(ctx, req) if err != nil { if _, ok := req.Transaction.GetSelector().(*sppb.TransactionSelector_Begin); ok { @@ -675,6 +680,7 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error { if err != nil { return err } + sh.updateLastUseTime() var md metadata.MD res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), &sppb.BeginTransactionRequest{ Session: sh.getID(), @@ -825,9 +831,6 @@ func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHand }, } t.mu.Unlock() - if sh != nil { - sh.updateLastUseTime() - } return sh, ts, nil } state := t.state @@ -1068,6 +1071,7 @@ func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts hasInlineBeginTransaction = true } + sh.updateLastUseTime() var md metadata.MD resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), req, gax.WithGRPCOptions(grpc.Header(&md))) @@ -1165,6 +1169,7 @@ func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts hasInlineBeginTransaction = true } + sh.updateLastUseTime() var md metadata.MD resp, err := sh.getClient().ExecuteBatchDml(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), &sppb.ExecuteBatchDmlRequest{ Session: sh.getID(), @@ -1270,9 +1275,6 @@ func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sp }, } t.mu.Unlock() - if sh != nil { - sh.updateLastUseTime() - } return sh, ts, nil default: state := t.state @@ -1399,6 +1401,9 @@ func (t *ReadWriteTransaction) begin(ctx context.Context) error { }() // Retry the BeginTransaction call if a 'Session not found' is returned. for { + if sh != nil { + sh.updateLastUseTime() + } tx, err = beginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), sh.getID(), sh.getClient(), t.txOpts) if isSessionNotFoundError(err) { sh.destroy() @@ -1737,6 +1742,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta return ToSpannerError(err) } } + sh.updateLastUseTime() res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), &sppb.CommitRequest{ Session: sh.getID(), Transaction: &sppb.CommitRequest_SingleUseTransaction{ From 2b1fae72242b183a5fc344a0cf12e19885f65e41 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 2 Nov 2023 10:35:11 +0000 Subject: [PATCH 4/4] feat(spanner): reduce time to ms range in unit tests --- spanner/client_test.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/spanner/client_test.go b/spanner/client_test.go index de62c398246f..a7cf599e85d0 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -1047,18 +1047,18 @@ func TestClient_ReadOnlyTransaction_WhenMultipleOperations_SessionLastUseTimeSho MaxOpened: 1, InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ actionOnInactiveTransaction: WarnAndClose, - idleTimeThreshold: 3 * time.Second, + idleTimeThreshold: 30 * time.Millisecond, }, }, }) defer teardown() server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql, SimulatedExecutionTime{ - MinimumExecutionTime: 2 * time.Second, + MinimumExecutionTime: 20 * time.Millisecond, }) server.TestSpanner.PutExecutionTime(MethodStreamingRead, SimulatedExecutionTime{ - MinimumExecutionTime: 2 * time.Second, + MinimumExecutionTime: 20 * time.Millisecond, }) ctx := context.Background() p := client.idleSessions @@ -1086,15 +1086,15 @@ func TestClient_ReadOnlyTransaction_WhenMultipleOperations_SessionLastUseTimeSho // sessionLatestLastUseTime should not be equal to sessionPrevLastUseTime. // This is because session lastUse time should be updated whenever a new operation is being executed on the transaction. - if (sessionLatestLastUseTime.Sub(sessionPrevLastUseTime)).Seconds() <= 0 { + if (sessionLatestLastUseTime.Sub(sessionPrevLastUseTime)).Milliseconds() <= 0 { t.Fatalf("Session lastUseTime times should not be equal") } - if (time.Now().Sub(sessionPrevLastUseTime)).Seconds() < 4 { - t.Fatalf("Expected session to be checkedout for more than 4 seconds") + if (time.Now().Sub(sessionPrevLastUseTime)).Milliseconds() < 40 { + t.Fatalf("Expected session to be checkedout for more than 40 milliseconds") } - if (time.Now().Sub(sessionCheckoutTime)).Seconds() < 4 { - t.Fatalf("Expected session to be checkedout for more than 4 seconds") + if (time.Now().Sub(sessionCheckoutTime)).Milliseconds() < 40 { + t.Fatalf("Expected session to be checkedout for more than 40 milliseconds") } // force run task to clean up unexpected long-running sessions whose lastUseTime >= 3sec. // The session should not be cleaned since the latest operation on the transaction has updated the lastUseTime. @@ -1563,14 +1563,14 @@ func TestClient_ReadWriteTransaction_WhenMultipleOperations_SessionLastUseTimeSh MaxOpened: 1, InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ actionOnInactiveTransaction: WarnAndClose, - idleTimeThreshold: 3 * time.Second, + idleTimeThreshold: 30 * time.Millisecond, }, }, }) defer teardown() server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{ - MinimumExecutionTime: 2 * time.Second, + MinimumExecutionTime: 20 * time.Millisecond, }) ctx := context.Background() p := client.idleSessions @@ -1599,15 +1599,15 @@ func TestClient_ReadWriteTransaction_WhenMultipleOperations_SessionLastUseTimeSh // sessionLatestLastUseTime should not be equal to sessionPrevLastUseTime. // This is because session lastUse time should be updated whenever a new operation is being executed on the transaction. - if (sessionLatestLastUseTime.Sub(sessionPrevLastUseTime)).Seconds() <= 0 { + if (sessionLatestLastUseTime.Sub(sessionPrevLastUseTime)).Milliseconds() <= 0 { t.Fatalf("Session lastUseTime times should not be equal") } - if (time.Now().Sub(sessionPrevLastUseTime)).Seconds() < 4 { - t.Fatalf("Expected session to be checkedout for more than 4 seconds") + if (time.Now().Sub(sessionPrevLastUseTime)).Milliseconds() < 40 { + t.Fatalf("Expected session to be checkedout for more than 40 milliseconds") } - if (time.Now().Sub(sessionCheckoutTime)).Seconds() < 4 { - t.Fatalf("Expected session to be checkedout for more than 4 seconds") + if (time.Now().Sub(sessionCheckoutTime)).Milliseconds() < 40 { + t.Fatalf("Expected session to be checkedout for more than 40 milliseconds") } // force run task to clean up unexpected long-running sessions whose lastUseTime >= 3sec. // The session should not be cleaned since the latest operation on the transaction has updated the lastUseTime.