diff --git a/spanner/client.go b/spanner/client.go index 1c81ccefba9e..52cc320a74ff 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -569,18 +569,13 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea if sh == nil || sh.getID() == "" || sh.getClient() == nil { // Session handle hasn't been allocated or has been destroyed. sh, err = c.idleSessions.take(ctx) - if t != nil { - // Some operations (for ex BatchUpdate) can be long-running. For such operations set the isLongRunningTransaction flag to be true - sh.mu.Lock() - t.mu.Lock() - sh.eligibleForLongRunning = t.isLongRunningTransaction - t.mu.Unlock() - sh.mu.Unlock() - } if err != nil { // If session retrieval fails, just fail the transaction. return err } + + // Some operations (for ex BatchUpdate) can be long-running. For such operations set the isLongRunningTransaction flag to be true + t.setSessionEligibilityForLongRunning(sh) } if t.shouldExplicitBegin(attempt) { // Make sure we set the current session handle before calling BeginTransaction. diff --git a/spanner/client_test.go b/spanner/client_test.go index b0bd2bd2cafa..f35b8ff715b5 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -269,8 +269,6 @@ func TestClient_Single_WhenInactiveTransactionsAndSessionIsNotFoundOnBackend_Rem if g, w := sh.eligibleForLongRunning, false; g != w { t.Fatalf("isLongRunningTransaction mismatch\nGot: %v\nWant: %v\n", g, w) } - p.InactiveTransactionRemovalOptions.mu.Lock() - defer p.InactiveTransactionRemovalOptions.mu.Unlock() if g, w := p.numOfLeakedSessionsRemoved, uint64(1); g != w { t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) } @@ -1563,14 +1561,14 @@ func TestClient_ReadWriteTransaction_WhenMultipleOperations_SessionLastUseTimeSh MaxOpened: 1, InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ actionOnInactiveTransaction: WarnAndClose, - idleTimeThreshold: 30 * time.Millisecond, + idleTimeThreshold: 300 * time.Millisecond, }, }, }) defer teardown() server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{ - MinimumExecutionTime: 20 * time.Millisecond, + MinimumExecutionTime: 200 * time.Millisecond, }) ctx := context.Background() p := client.idleSessions @@ -1603,11 +1601,11 @@ func TestClient_ReadWriteTransaction_WhenMultipleOperations_SessionLastUseTimeSh t.Fatalf("Session lastUseTime times should not be equal") } - if (time.Now().Sub(sessionPrevLastUseTime)).Milliseconds() < 40 { - t.Fatalf("Expected session to be checkedout for more than 40 milliseconds") + if (time.Now().Sub(sessionPrevLastUseTime)).Milliseconds() < 400 { + t.Fatalf("Expected session to be checkedout for more than 400 milliseconds") } - if (time.Now().Sub(sessionCheckoutTime)).Milliseconds() < 40 { - t.Fatalf("Expected session to be checkedout for more than 40 milliseconds") + if (time.Now().Sub(sessionCheckoutTime)).Milliseconds() < 400 { + t.Fatalf("Expected session to be checkedout for more than 400 milliseconds") } // force run task to clean up unexpected long-running sessions whose lastUseTime >= 3sec. // The session should not be cleaned since the latest operation on the transaction has updated the lastUseTime. @@ -1707,8 +1705,8 @@ func TestClient_ReadWriteTransaction_WhenLongRunningExecuteBatchUpdate_TakeNoAct if g, w := attempts, 2; g != w { t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w) } - p.InactiveTransactionRemovalOptions.mu.Lock() - defer p.InactiveTransactionRemovalOptions.mu.Unlock() + p.mu.Lock() + defer p.mu.Unlock() if g, w := p.numOfLeakedSessionsRemoved, uint64(0); g != w { t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) } @@ -4256,8 +4254,8 @@ func TestClient_WhenLongRunningPartitionedUpdateRequest_TakeNoAction(t *testing. t.Errorf("Row count mismatch\nGot: %v\nWant: %v", g, w) } p := client.idleSessions - p.InactiveTransactionRemovalOptions.mu.Lock() - defer p.InactiveTransactionRemovalOptions.mu.Unlock() + p.mu.Lock() + defer p.mu.Unlock() if g, w := p.numOfLeakedSessionsRemoved, uint64(0); g != w { t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) } diff --git a/spanner/pdml.go b/spanner/pdml.go index e23b7c27da7c..2bdfd72897d8 100644 --- a/spanner/pdml.go +++ b/spanner/pdml.go @@ -53,16 +53,16 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt } sh, err := c.idleSessions.take(ctx) - // Mark isLongRunningTransaction to true, as the session in case of partitioned dml can be long-running - sh.mu.Lock() - sh.eligibleForLongRunning = true - sh.mu.Unlock() if err != nil { return 0, ToSpannerError(err) } if sh != nil { defer sh.recycle() } + // Mark isLongRunningTransaction to true, as the session in case of partitioned dml can be long-running + sh.mu.Lock() + sh.eligibleForLongRunning = true + sh.mu.Unlock() // Create the parameters and the SQL request, but without a transaction. // The transaction reference will be added by the executePdml method. diff --git a/spanner/session.go b/spanner/session.go index 884036be4051..c23f66541a8e 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -59,7 +59,6 @@ const ( // InactiveTransactionRemovalOptions has configurations for action on long-running transactions. type InactiveTransactionRemovalOptions struct { - mu sync.Mutex // actionOnInactiveTransaction is the configuration to choose action for inactive transactions. // It can be one of Warn, Close, WarnAndClose. actionOnInactiveTransaction ActionOnInactiveTransactionKind @@ -74,9 +73,6 @@ type InactiveTransactionRemovalOptions struct { // variable that keeps track of the last execution time when inactive transactions // were removed by the maintainer task. lastExecutionTime time.Time - // indicates the number of leaked sessions removed from the session pool. - // This is valid only when ActionOnInactiveTransaction is WarnAndClose or ActionOnInactiveTransaction is Close. - numOfLeakedSessionsRemoved uint64 } // sessionHandle is an interface for transactions to access Cloud Spanner @@ -613,6 +609,10 @@ type sessionPool struct { // tagMap is a map of all tags that are associated with the emitted metrics. tagMap *tag.Map + + // indicates the number of leaked sessions removed from the session pool. + // This is valid only when ActionOnInactiveTransaction is WarnAndClose or ActionOnInactiveTransaction is Close in InactiveTransactionRemovalOptions. + numOfLeakedSessionsRemoved uint64 } // newSessionPool creates a new session pool. @@ -761,14 +761,16 @@ func (p *sessionPool) removeLongRunningSessions() { // destroy long-running sessions if p.actionOnInactiveTransaction == WarnAndClose || p.actionOnInactiveTransaction == Close { + var leakedSessionsRemovedCount uint64 for _, sh := range longRunningSessions { // removes inner session out of the pool to reduce the probability of two processes trying // to use the same session at the same time. sh.destroy() - p.InactiveTransactionRemovalOptions.mu.Lock() - p.InactiveTransactionRemovalOptions.numOfLeakedSessionsRemoved++ - p.InactiveTransactionRemovalOptions.mu.Unlock() + leakedSessionsRemovedCount++ } + p.mu.Lock() + p.numOfLeakedSessionsRemoved += leakedSessionsRemovedCount + p.mu.Unlock() } } diff --git a/spanner/session_test.go b/spanner/session_test.go index 4ec24afd4006..a90f46f0d344 100644 --- a/spanner/session_test.go +++ b/spanner/session_test.go @@ -474,8 +474,6 @@ func TestSessionLeak_WhenInactiveTransactions_RemoveSessionsFromPool(t *testing. if g, w := p.numOpened, uint64(0); g != w { t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w) } - p.InactiveTransactionRemovalOptions.mu.Lock() - defer p.InactiveTransactionRemovalOptions.mu.Unlock() if g, w := p.numOfLeakedSessionsRemoved, uint64(1); g != w { t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) } @@ -541,11 +539,11 @@ func TestMaintainer_LongRunningTransactionsCleanup_IfClose_VerifyInactiveSession // Sleep for maintainer to run long-running cleanup task time.Sleep(30 * time.Millisecond) + // force run task to clean up unexpected long-running sessions + sp.removeLongRunningSessions() sp.mu.Lock() defer sp.mu.Unlock() - sp.InactiveTransactionRemovalOptions.mu.Lock() - defer sp.InactiveTransactionRemovalOptions.mu.Unlock() if g, w := sp.numOfLeakedSessionsRemoved, uint64(2); g != w { t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) } @@ -614,8 +612,6 @@ func TestLongRunningTransactionsCleanup_IfClose_VerifyInactiveSessionsClosed(t * sp.mu.Lock() defer sp.mu.Unlock() - sp.InactiveTransactionRemovalOptions.mu.Lock() - defer sp.InactiveTransactionRemovalOptions.mu.Unlock() if g, w := sp.numOfLeakedSessionsRemoved, uint64(2); g != w { t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) } @@ -702,8 +698,6 @@ func TestLongRunningTransactionsCleanup_IfLog_VerifyInactiveSessionsOpen(t *test sp.mu.Lock() defer sp.mu.Unlock() - sp.InactiveTransactionRemovalOptions.mu.Lock() - defer sp.InactiveTransactionRemovalOptions.mu.Unlock() if g, w := sp.numOfLeakedSessionsRemoved, uint64(0); g != w { t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) } @@ -763,8 +757,6 @@ func TestLongRunningTransactionsCleanup_UtilisationBelowThreshold_VerifyInactive sp.mu.Lock() defer sp.mu.Unlock() - sp.InactiveTransactionRemovalOptions.mu.Lock() - defer sp.InactiveTransactionRemovalOptions.mu.Unlock() // 2/3 sessions are used. Hence utilisation < 95%. if g, w := sp.numOfLeakedSessionsRemoved, uint64(0); g != w { t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) @@ -834,8 +826,6 @@ func TestLongRunningTransactions_WhenAllExpectedlyLongRunning_VerifyInactiveSess sp.mu.Lock() defer sp.mu.Unlock() - sp.InactiveTransactionRemovalOptions.mu.Lock() - defer sp.InactiveTransactionRemovalOptions.mu.Unlock() if g, w := sp.numOfLeakedSessionsRemoved, uint64(0); g != w { t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) } @@ -916,8 +906,6 @@ func TestLongRunningTransactions_WhenDurationBelowThreshold_VerifyInactiveSessio sp.mu.Lock() defer sp.mu.Unlock() - sp.InactiveTransactionRemovalOptions.mu.Lock() - defer sp.InactiveTransactionRemovalOptions.mu.Unlock() if g, w := sp.numOfLeakedSessionsRemoved, uint64(0); g != w { t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) } diff --git a/spanner/transaction.go b/spanner/transaction.go index 941ac3eab49c..875162e4e45c 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -54,6 +54,7 @@ type txReadEnv interface { // release should be called at the end of every transactional read to deal // with session recycling. release(error) + setSessionEligibilityForLongRunning(sh *sessionHandle) } // txReadOnly contains methods for doing transactional reads. @@ -676,10 +677,10 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error { // Retry the BeginTransaction call if a 'Session not found' is returned. for { sh, err = t.sp.take(ctx) - sh.eligibleForLongRunning = t.isLongRunningTransaction if err != nil { return err } + t.setSessionEligibilityForLongRunning(sh) sh.updateLastUseTime() var md metadata.MD res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), &sppb.BeginTransactionRequest{ @@ -937,6 +938,16 @@ func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTra return t } +func (t *ReadOnlyTransaction) setSessionEligibilityForLongRunning(sh *sessionHandle) { + if t != nil && sh != nil { + sh.mu.Lock() + t.mu.Lock() + sh.eligibleForLongRunning = t.isLongRunningTransaction + t.mu.Unlock() + sh.mu.Unlock() + } +} + // ReadWriteTransaction provides a locking read-write transaction. // // This type of transaction is the only way to write data into Cloud Spanner; @@ -1136,13 +1147,6 @@ func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts if err != nil { return nil, err } - // mark transaction and session to be eligible for long-running - t.mu.Lock() - t.isLongRunningTransaction = true - t.mu.Unlock() - t.sh.mu.Lock() - t.sh.eligibleForLongRunning = true - t.sh.mu.Unlock() // Cloud Spanner will return "Session not found" on bad sessions. sid := sh.getID() @@ -1151,6 +1155,12 @@ func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts return nil, errSessionClosed(sh) } + // mark transaction and session to be eligible for long-running + t.mu.Lock() + t.isLongRunningTransaction = true + t.mu.Unlock() + t.setSessionEligibilityForLongRunning(sh) + var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement for _, st := range stmts { params, paramTypes, err := st.convertParams() @@ -1339,6 +1349,16 @@ func (t *ReadWriteTransaction) release(err error) { } } +func (t *ReadWriteTransaction) setSessionEligibilityForLongRunning(sh *sessionHandle) { + if t != nil && sh != nil { + sh.mu.Lock() + t.mu.Lock() + sh.eligibleForLongRunning = t.isLongRunningTransaction + t.mu.Unlock() + sh.mu.Unlock() + } +} + func beginTransaction(ctx context.Context, sid string, client *vkit.Client, opts TransactionOptions) (transactionID, error) { res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{ Session: sid, @@ -1411,14 +1431,8 @@ func (t *ReadWriteTransaction) begin(ctx context.Context) error { if err != nil { return err } - - sh.mu.Lock() - t.mu.Lock() - // for batch update operations, isLongRunningTransaction will be true - sh.eligibleForLongRunning = t.isLongRunningTransaction - t.mu.Unlock() - sh.mu.Unlock() - + // Some operations (for ex BatchUpdate) can be long-running. For such operations set the isLongRunningTransaction flag to be true + t.setSessionEligibilityForLongRunning(sh) continue } else { err = ToSpannerError(err)