From 461d11e913414e9de822e5f1acdf19c8f3f953d5 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH <57220027+harshachinta@users.noreply.github.com> Date: Mon, 30 Oct 2023 15:19:07 +0530 Subject: [PATCH] feat(spanner): long running transaction clean up - disabled (#8177) * feat(spanner): long running transaction clean up to prevent session leaks * feat(spanner): code refactoring in session.go file * fix(spanner): fix vet * feat(spanner): refactor client.go file * feat(spanner): add lock when updating session handle * feat(spanner): code refactoring in transaction.go file * test(spanner): remove test * feat(spanner): code refactor * feat(spanner): refactor nit comments * feat(spanner): reduce sleep timing to milli seconds for unit tests * feat(spanner): update idleTimeThresholdSecs field to time.Duration * feat(spanner): make the log messages conditional based on type of action for inactive transactions * feat(spanner): combine get and remove long running sessions in a single function to avoid sleep statements during unit tests * feat(spanner): modify presubmit condition to run tests for changed modules. Currently tests are not getting run as part of github presubmits. * feat(spanner): revert presubmit.sh fix * feat(spanner): update doc * feat(spanner): reword isLongRunning to eligibleForLongRunning in sessionHandle * feat(spanner): update TrackSessionHandles logic to turn off stack trace by default for long running sessions * feat(spanner): fix test * feat(spanner): change action on inactive transactions option to enum * feat(spanner): fix lint * feat(spanner): fix lint * feat(spanner): fix lint * feat(spanner): fix doc * feat(spanner): disable the feature by default * feat(spanner): revert commit - disable the feature by default * fix: lint * docs: add Readme * feat(spanner): make WARN as default for action on inactive transactions * docs: address review comments * docs: move README.md changes in a different PR * feat: disable feature --- spanner/client.go | 26 +- spanner/client_test.go | 217 +++++++++++++++++ spanner/pdml.go | 4 + spanner/session.go | 160 ++++++++++++- spanner/session_test.go | 518 ++++++++++++++++++++++++++++++++++++++++ spanner/transaction.go | 21 ++ 6 files changed, 928 insertions(+), 18 deletions(-) diff --git a/spanner/client.go b/spanner/client.go index c2e8447c155e..2a26e1d4c481 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -447,10 +447,11 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound t := &BatchReadOnlyTransaction{ ReadOnlyTransaction: ReadOnlyTransaction{ - tx: tx, - txReadyOrClosed: make(chan struct{}), - state: txActive, - rts: rts, + tx: tx, + txReadyOrClosed: make(chan struct{}), + state: txActive, + rts: rts, + isLongRunningTransaction: true, }, ID: BatchReadOnlyTransactionID{ tid: tx, @@ -481,10 +482,11 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) t := &BatchReadOnlyTransaction{ ReadOnlyTransaction: ReadOnlyTransaction{ - tx: tid.tid, - txReadyOrClosed: make(chan struct{}), - state: txActive, - rts: tid.rts, + tx: tid.tid, + txReadyOrClosed: make(chan struct{}), + state: txActive, + rts: tid.rts, + isLongRunningTransaction: true, }, ID: tid, } @@ -566,6 +568,14 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea if sh == nil || sh.getID() == "" || sh.getClient() == nil { // Session handle hasn't been allocated or has been destroyed. sh, err = c.idleSessions.take(ctx) + if t != nil { + // Some operations (for ex BatchUpdate) can be long-running. For such operations set the isLongRunningTransaction flag to be true + sh.mu.Lock() + t.mu.Lock() + sh.eligibleForLongRunning = t.isLongRunningTransaction + t.mu.Unlock() + sh.mu.Unlock() + } if err != nil { // If session retrieval fails, just fail the transaction. return err diff --git a/spanner/client_test.go b/spanner/client_test.go index da3309adccb5..4594811f903f 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -209,6 +209,73 @@ func TestClient_Single_Read_SessionNotFound(t *testing.T) { } } +func TestClient_Single_WhenInactiveTransactionsAndSessionIsNotFoundOnBackend_RemoveSessionFromPool(t *testing.T) { + t.Parallel() + server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 1, + InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ + actionOnInactiveTransaction: WarnAndClose, + }, + }, + }) + defer teardown() + server.TestSpanner.PutExecutionTime( + MethodExecuteStreamingSql, + SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, + ) + ctx := context.Background() + single := client.Single() + iter := single.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) + p := client.idleSessions + sh := single.sh + // simulate session to be checked out for more than 60mins + sh.mu.Lock() + sh.checkoutTime = time.Now().Add(-time.Hour) + sh.mu.Unlock() + + // force run task to clean up unexpected long-running sessions + p.removeLongRunningSessions() + rowCount := int64(0) + for { + // Backend throws SessionNotFoundError. Session gets replaced with new session + _, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + t.Fatal(err) + } + rowCount++ + } + // New session returns back to pool + iter.Stop() + + p.mu.Lock() + defer p.mu.Unlock() + if g, w := p.idleList.Len(), 1; g != w { + t.Fatalf("Idle Sessions in pool, count mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := p.numInUse, uint64(0); g != w { + t.Fatalf("Number of sessions currently in use mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := p.numOpened, uint64(1); g != w { + t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w) + } + + sh.mu.Lock() + defer sh.mu.Unlock() + if g, w := sh.eligibleForLongRunning, false; g != w { + t.Fatalf("isLongRunningTransaction mismatch\nGot: %v\nWant: %v\n", g, w) + } + p.InactiveTransactionRemovalOptions.mu.Lock() + defer p.InactiveTransactionRemovalOptions.mu.Unlock() + if g, w := p.numOfLeakedSessionsRemoved, uint64(1); g != w { + t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) + } +} + func TestClient_Single_ReadRow_SessionNotFound(t *testing.T) { t.Parallel() @@ -1369,6 +1436,59 @@ func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteUpdate(t *testing.T } } +func TestClient_ReadWriteTransaction_WhenLongRunningSessionCleaned_TransactionShouldFail(t *testing.T) { + t.Parallel() + _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 1, + InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ + actionOnInactiveTransaction: WarnAndClose, + }, + }, + }) + defer teardown() + ctx := context.Background() + p := client.idleSessions + msg := "session is already recycled / destroyed" + _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + rowCount, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo)) + if err != nil { + return err + } + if g, w := rowCount, int64(UpdateBarSetFooRowCount); g != w { + return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w) + } + + // Simulate the session to be checked out for more than 60 mins. + // The background task cleans up this long-running session. + tx.sh.mu.Lock() + tx.sh.checkoutTime = time.Now().Add(-time.Hour) + if g, w := tx.sh.eligibleForLongRunning, false; g != w { + tx.sh.mu.Unlock() + return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w) + } + tx.sh.mu.Unlock() + + // force run task to clean up unexpected long-running sessions + p.removeLongRunningSessions() + + // The session associated with this transaction tx has been destroyed. So the below call should fail. + // Eventually this means the entire transaction should not succeed. + _, err = tx.Update(ctx, NewStatement("UPDATE FOO SET BAR='value' WHERE ID=1")) + if err != nil { + return err + } + return nil + }) + if err == nil { + t.Fatalf("Missing expected exception") + } + if status.Code(err) != codes.FailedPrecondition || !strings.Contains(err.Error(), msg) { + t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg) + } +} + func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *testing.T) { t.Parallel() @@ -1402,6 +1522,65 @@ func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *test } } +func TestClient_ReadWriteTransaction_WhenLongRunningExecuteBatchUpdate_TakeNoAction(t *testing.T) { + t.Parallel() + server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 1, + InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ + actionOnInactiveTransaction: WarnAndClose, + }, + }, + }) + defer teardown() + server.TestSpanner.PutExecutionTime( + MethodExecuteBatchDml, + SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}}, + ) + ctx := context.Background() + p := client.idleSessions + var attempts int + _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + attempts++ + if attempts == 2 { + // Simulate the session to be long-running. The background task should not clean up this long-running session. + tx.sh.mu.Lock() + tx.sh.checkoutTime = time.Now().Add(-time.Hour) + if g, w := tx.sh.eligibleForLongRunning, true; g != w { + tx.sh.mu.Unlock() + return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w) + } + tx.sh.mu.Unlock() + + // force run task to clean up unexpected long-running sessions + p.removeLongRunningSessions() + } + rowCounts, err := tx.BatchUpdate(ctx, []Statement{NewStatement(UpdateBarSetFoo)}) + if err != nil { + return err + } + if g, w := len(rowCounts), 1; g != w { + return status.Errorf(codes.FailedPrecondition, "Row counts length mismatch\nGot: %v\nWant: %v", g, w) + } + if g, w := rowCounts[0], int64(UpdateBarSetFooRowCount); g != w { + return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w) + } + return nil + }) + if err != nil { + t.Fatal(err) + } + if g, w := attempts, 2; g != w { + t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w) + } + p.InactiveTransactionRemovalOptions.mu.Lock() + defer p.InactiveTransactionRemovalOptions.mu.Unlock() + if g, w := p.numOfLeakedSessionsRemoved, uint64(0); g != w { + t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) + } +} + func TestClient_ReadWriteTransaction_Query_QueryOptions(t *testing.T) { for _, tt := range queryOptionsTestCases() { t.Run(tt.name, func(t *testing.T) { @@ -3913,6 +4092,44 @@ func TestClient_PDML_Priority(t *testing.T) { } } +func TestClient_WhenLongRunningPartitionedUpdateRequest_TakeNoAction(t *testing.T) { + t.Parallel() + ctx := context.Background() + server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 1, + healthCheckSampleInterval: 10 * time.Millisecond, // maintainer runs every 10ms + InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ + actionOnInactiveTransaction: WarnAndClose, + executionFrequency: 15 * time.Millisecond, // check long-running sessions every 15ms + }, + }, + }) + defer teardown() + // delay the rpc by 30ms. The background task runs to clean long-running sessions. + server.TestSpanner.PutExecutionTime(MethodExecuteSql, + SimulatedExecutionTime{ + MinimumExecutionTime: 30 * time.Millisecond, + }) + + stmt := NewStatement(UpdateBarSetFoo) + // This transaction is eligible to be long-running, so the background task should not clean its session. + rowCount, err := client.PartitionedUpdate(ctx, stmt) + if err != nil { + t.Fatal(err) + } + if g, w := rowCount, int64(UpdateBarSetFooRowCount); g != w { + t.Errorf("Row count mismatch\nGot: %v\nWant: %v", g, w) + } + p := client.idleSessions + p.InactiveTransactionRemovalOptions.mu.Lock() + defer p.InactiveTransactionRemovalOptions.mu.Unlock() + if g, w := p.numOfLeakedSessionsRemoved, uint64(0); g != w { + t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) + } +} + func TestClient_Apply_Priority(t *testing.T) { t.Parallel() diff --git a/spanner/pdml.go b/spanner/pdml.go index 3d9be7c041d4..4dfde7efb370 100644 --- a/spanner/pdml.go +++ b/spanner/pdml.go @@ -53,6 +53,10 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt } sh, err := c.idleSessions.take(ctx) + // Mark isLongRunningTransaction to true, as the session in case of partitioned dml can be long-running + sh.mu.Lock() + sh.eligibleForLongRunning = true + sh.mu.Unlock() if err != nil { return 0, ToSpannerError(err) } diff --git a/spanner/session.go b/spanner/session.go index 2fc4809ff34a..7a58379a0cc7 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -42,6 +42,43 @@ import ( const healthCheckIntervalMins = 50 +// ActionOnInactiveTransactionKind describes the kind of action taken when there are inactive transactions. +type ActionOnInactiveTransactionKind int + +const ( + actionUnspecified ActionOnInactiveTransactionKind = iota + // NoAction action does not perform any action on inactive transactions. + NoAction + // Warn action logs inactive transactions. Any inactive transaction gets logged only once. + Warn + // Close action closes inactive transactions without logging. + Close + // WarnAndClose action logs and closes the inactive transactions. + WarnAndClose +) + +// InactiveTransactionRemovalOptions has configurations for action on long-running transactions. +type InactiveTransactionRemovalOptions struct { + mu sync.Mutex + // actionOnInactiveTransaction is the configuration to choose action for inactive transactions. + // It can be one of Warn, Close, WarnAndClose. + actionOnInactiveTransaction ActionOnInactiveTransactionKind + // long-running transactions will be cleaned up if utilisation is + // greater than the below value. + usedSessionsRatioThreshold float64 + // A transaction is considered to be idle if it has not been used for + // a duration greater than the below value. + idleTimeThreshold time.Duration + // frequency for closing inactive transactions + executionFrequency time.Duration + // variable that keeps track of the last execution time when inactive transactions + // were removed by the maintainer task. + lastExecutionTime time.Time + // indicates the number of leaked sessions removed from the session pool. + // This is valid only when ActionOnInactiveTransaction is WarnAndClose or ActionOnInactiveTransaction is Close. + numOfLeakedSessionsRemoved uint64 +} + // sessionHandle is an interface for transactions to access Cloud Spanner // sessions safely. It is generated by sessionPool.take(). type sessionHandle struct { @@ -60,6 +97,10 @@ type sessionHandle struct { // stack is the call stack of the goroutine that checked out the session // from the pool. This can be used to track down session leak problems. stack []byte + // eligibleForLongRunning tells if the inner session is eligible to be long-running. + eligibleForLongRunning bool + // if the inner session object is long-running then the stack gets logged once. + isSessionLeakLogged bool } // recycle gives the inner session object back to its home session pool. It is @@ -440,6 +481,8 @@ type SessionPoolConfig struct { // sessionLabels for the sessions created in the session pool. sessionLabels map[string]string + + InactiveTransactionRemovalOptions } // DefaultSessionPoolConfig is the default configuration for the session pool @@ -453,6 +496,12 @@ var DefaultSessionPoolConfig = SessionPoolConfig{ WriteSessions: 0.2, HealthCheckWorkers: 10, HealthCheckInterval: healthCheckIntervalMins * time.Minute, + InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ + actionOnInactiveTransaction: NoAction, // Make default to Warn from NoAction later + executionFrequency: 2 * time.Minute, + idleTimeThreshold: 60 * time.Minute, + usedSessionsRatioThreshold: 0.95, + }, } // errMinOpenedGTMapOpened returns error for SessionPoolConfig.MaxOpened < SessionPoolConfig.MinOpened when SessionPoolConfig.MaxOpened is set. @@ -559,14 +608,6 @@ func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, if err := config.validate(); err != nil { return nil, err } - pool := &sessionPool{ - sc: sc, - valid: true, - mayGetSession: make(chan struct{}), - SessionPoolConfig: config, - mw: newMaintenanceWindow(config.MaxOpened), - rand: rand.New(rand.NewSource(time.Now().UnixNano())), - } if config.HealthCheckWorkers == 0 { // With 10 workers and assuming average latency of 5ms for // BeginTransaction, we will be able to prepare 2000 tx/sec in advance. @@ -582,6 +623,27 @@ func newSessionPool(sc *sessionClient, config SessionPoolConfig) (*sessionPool, if config.healthCheckSampleInterval == 0 { config.healthCheckSampleInterval = time.Minute } + if config.actionOnInactiveTransaction == actionUnspecified { + config.actionOnInactiveTransaction = DefaultSessionPoolConfig.actionOnInactiveTransaction + } + if config.idleTimeThreshold == 0 { + config.idleTimeThreshold = DefaultSessionPoolConfig.idleTimeThreshold + } + if config.executionFrequency == 0 { + config.executionFrequency = DefaultSessionPoolConfig.executionFrequency + } + if config.usedSessionsRatioThreshold == 0 { + config.usedSessionsRatioThreshold = DefaultSessionPoolConfig.usedSessionsRatioThreshold + } + + pool := &sessionPool{ + sc: sc, + valid: true, + mayGetSession: make(chan struct{}), + SessionPoolConfig: config, + mw: newMaintenanceWindow(config.MaxOpened), + rand: rand.New(rand.NewSource(time.Now().UnixNano())), + } _, instance, database, err := parseDatabaseName(sc.database) if err != nil { @@ -633,6 +695,71 @@ func (p *sessionPool) recordStat(ctx context.Context, m *stats.Int64Measure, n i recordStat(ctx, m, n) } +func (p *sessionPool) getRatioOfSessionsInUseLocked() float64 { + maxSessions := p.MaxOpened + if maxSessions == 0 { + return 0 + } + return float64(p.numInUse) / float64(maxSessions) +} + +// gets sessions which are unexpectedly long-running. +func (p *sessionPool) getLongRunningSessionsLocked() []*sessionHandle { + usedSessionsRatio := p.getRatioOfSessionsInUseLocked() + var longRunningSessions []*sessionHandle + if usedSessionsRatio > p.usedSessionsRatioThreshold { + element := p.trackedSessionHandles.Front() + for element != nil { + sh := element.Value.(*sessionHandle) + sh.mu.Lock() + diff := time.Now().Sub(sh.checkoutTime) + if !sh.eligibleForLongRunning && diff.Seconds() >= p.idleTimeThreshold.Seconds() { + if (p.actionOnInactiveTransaction == Warn || p.actionOnInactiveTransaction == WarnAndClose) && !sh.isSessionLeakLogged { + if p.actionOnInactiveTransaction == Warn { + if sh.stack != nil { + logf(p.sc.logger, "session %s checked out of pool at %s is long running due to possible session leak for goroutine: \n%s", sh.session.getID(), sh.checkoutTime.Format(time.RFC3339), sh.stack) + } else { + logf(p.sc.logger, "session %s checked out of pool at %s is long running due to possible session leak for goroutine: \nEnable SessionPoolConfig.TrackSessionHandles to get stack trace associated with the session", sh.session.getID(), sh.checkoutTime.Format(time.RFC3339)) + } + sh.isSessionLeakLogged = true + } else if p.actionOnInactiveTransaction == WarnAndClose { + if sh.stack != nil { + logf(p.sc.logger, "session %s checked out of pool at %s is long running and will be removed due to possible session leak for goroutine: \n%s", sh.session.getID(), sh.checkoutTime.Format(time.RFC3339), sh.stack) + } else { + logf(p.sc.logger, "session %s checked out of pool at %s is long running and will be removed due to possible session leak for goroutine: \nEnable SessionPoolConfig.TrackSessionHandles to get stack trace associated with the session", sh.session.getID(), sh.checkoutTime.Format(time.RFC3339)) + } + } + } + if p.actionOnInactiveTransaction == WarnAndClose || p.actionOnInactiveTransaction == Close { + longRunningSessions = append(longRunningSessions, sh) + } + } + sh.mu.Unlock() + element = element.Next() + } + } + return longRunningSessions +} + +// removes or logs sessions that are unexpectedly long-running. +func (p *sessionPool) removeLongRunningSessions() { + p.mu.Lock() + longRunningSessions := p.getLongRunningSessionsLocked() + p.mu.Unlock() + + // destroy long-running sessions + if p.actionOnInactiveTransaction == WarnAndClose || p.actionOnInactiveTransaction == Close { + for _, sh := range longRunningSessions { + // removes inner session out of the pool to reduce the probability of two processes trying + // to use the same session at the same time. + sh.destroy() + p.InactiveTransactionRemovalOptions.mu.Lock() + p.InactiveTransactionRemovalOptions.numOfLeakedSessionsRemoved++ + p.InactiveTransactionRemovalOptions.mu.Unlock() + } + } +} + func (p *sessionPool) initPool(numSessions uint64) error { p.mu.Lock() defer p.mu.Unlock() @@ -754,11 +881,13 @@ var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canc // sessions being checked out of the pool. func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) { sh = &sessionHandle{session: s, checkoutTime: time.Now()} - if p.TrackSessionHandles { + if p.TrackSessionHandles || p.actionOnInactiveTransaction == Warn || p.actionOnInactiveTransaction == WarnAndClose || p.actionOnInactiveTransaction == Close { p.mu.Lock() sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh) p.mu.Unlock() - sh.stack = debug.Stack() + if p.TrackSessionHandles { + sh.stack = debug.Stack() + } } return sh } @@ -978,6 +1107,8 @@ func (p *sessionPool) remove(s *session, isExpire bool) bool { if s.invalidate() { // Decrease the number of opened sessions. p.numOpened-- + // Decrease the number of sessions in use. + p.decNumInUseLocked(ctx) p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) // Broadcast that a session has been destroyed. close(p.mayGetSession) @@ -1365,6 +1496,15 @@ func (hc *healthChecker) maintainer() { hc.pool.lastResetTime = now } hc.pool.mu.Unlock() + + // task to remove or log sessions which are unexpectedly long-running + if now.After(hc.pool.InactiveTransactionRemovalOptions.lastExecutionTime.Add(hc.pool.executionFrequency)) { + if hc.pool.actionOnInactiveTransaction == Warn || hc.pool.actionOnInactiveTransaction == WarnAndClose || hc.pool.actionOnInactiveTransaction == Close { + hc.pool.removeLongRunningSessions() + } + hc.pool.InactiveTransactionRemovalOptions.lastExecutionTime = now + } + // Get the maximum number of sessions in use during the current // maintenance window. maxSessionsInUseDuringWindow := hc.pool.mw.maxSessionsCheckedOutDuringWindow() diff --git a/spanner/session_test.go b/spanner/session_test.go index c90d2a423374..964cf2ff2ac1 100644 --- a/spanner/session_test.go +++ b/spanner/session_test.go @@ -408,6 +408,524 @@ func TestSessionLeak(t *testing.T) { iter.Stop() } +func TestSessionLeak_WhenInactiveTransactions_RemoveSessionsFromPool(t *testing.T) { + t.Parallel() + ctx := context.Background() + _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 0, + MaxOpened: 1, + InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ + actionOnInactiveTransaction: WarnAndClose, + }, + TrackSessionHandles: true, + }, + }) + defer teardown() + + // Execute a query without calling rowIterator.Stop. This will cause the + // session not to be returned to the pool. + single := client.Single() + iter := single.Query(ctx, NewStatement(SelectFooFromBar)) + for { + _, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + t.Fatalf("Got unexpected error while iterating results: %v\n", err) + } + } + // The session should not have been returned to the pool. + p := client.idleSessions + p.mu.Lock() + if g, w := p.idleList.Len(), 0; g != w { // No of sessions in the pool must be 0 + p.mu.Unlock() + t.Fatalf("Idle sessions count mismatch\nGot: %d\nWant: %d\n", g, w) + } + p.mu.Unlock() + // The checked out session should contain a stack trace as Logging is true. + single.sh.mu.Lock() + if single.sh.stack == nil { + single.sh.mu.Unlock() + t.Fatalf("Missing stacktrace from session handle") + } + if g, w := single.sh.eligibleForLongRunning, false; g != w { + single.sh.mu.Unlock() + t.Fatalf("isLongRunningTransaction mismatch\nGot: %v\nWant: %v\n", g, w) + } + + // Mock the session checkout time to be greater than 60 mins + single.sh.checkoutTime = time.Now().Add(-time.Hour) + single.sh.mu.Unlock() + + // force run task to clean up unexpected long-running sessions + p.removeLongRunningSessions() + + // The session should have been removed from pool. + p.mu.Lock() + defer p.mu.Unlock() + if g, w := p.idleList.Len(), 0; g != w { + t.Fatalf("Idle Sessions in pool, count mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := p.numInUse, uint64(0); g != w { + t.Fatalf("Number of sessions currently in use mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := p.numOpened, uint64(0); g != w { + t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w) + } + p.InactiveTransactionRemovalOptions.mu.Lock() + defer p.InactiveTransactionRemovalOptions.mu.Unlock() + if g, w := p.numOfLeakedSessionsRemoved, uint64(1); g != w { + t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) + } + iter.Stop() +} + +func TestMaintainer_LongRunningTransactionsCleanup_IfClose_VerifyInactiveSessionsClosed(t *testing.T) { + t.Parallel() + ctx := context.Background() + _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 3, + healthCheckSampleInterval: 10 * time.Millisecond, // maintainer runs every 10ms + InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ + actionOnInactiveTransaction: WarnAndClose, + executionFrequency: 15 * time.Millisecond, // check long-running sessions every 20ms + }, + }, + }) + defer teardown() + sp := client.idleSessions + + // get session-1 from pool + s1, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + // get session-2 from pool + s2, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + // get session-3 from pool + s3, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + sp.mu.Lock() + if g, w := sp.numOpened, uint64(3); g != w { + sp.mu.Unlock() + t.Fatalf("No of sessions opened mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := sp.numInUse, uint64(3); g != w { + sp.mu.Unlock() + t.Fatalf("No of sessions in use mismatch\nGot: %d\nWant: %d\n", g, w) + } + sp.mu.Unlock() + s1.mu.Lock() + s1.eligibleForLongRunning = false + s1.checkoutTime = time.Now().Add(-time.Hour) + s1.mu.Unlock() + + s2.mu.Lock() + s2.eligibleForLongRunning = false + s2.checkoutTime = time.Now().Add(-time.Hour) + s2.mu.Unlock() + + s3.mu.Lock() + s3.eligibleForLongRunning = true + s3.checkoutTime = time.Now().Add(-time.Hour) + s3.mu.Unlock() + + // Sleep for maintainer to run long-running cleanup task + time.Sleep(30 * time.Millisecond) + + sp.mu.Lock() + defer sp.mu.Unlock() + sp.InactiveTransactionRemovalOptions.mu.Lock() + defer sp.InactiveTransactionRemovalOptions.mu.Unlock() + if g, w := sp.numOfLeakedSessionsRemoved, uint64(2); g != w { + t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := sp.numOpened, uint64(1); g != w { + t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w) + } +} + +func TestLongRunningTransactionsCleanup_IfClose_VerifyInactiveSessionsClosed(t *testing.T) { + t.Parallel() + ctx := context.Background() + _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 3, + InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ + actionOnInactiveTransaction: WarnAndClose, + }, + }, + }) + defer teardown() + sp := client.idleSessions + + // get session-1 from pool + s1, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + // get session-2 from pool + s2, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + // get session-3 from pool + s3, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + sp.mu.Lock() + if g, w := sp.numOpened, uint64(3); g != w { + sp.mu.Unlock() + t.Fatalf("No of sessions opened mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := sp.numInUse, uint64(3); g != w { + sp.mu.Unlock() + t.Fatalf("No of sessions in use mismatch\nGot: %d\nWant: %d\n", g, w) + } + sp.mu.Unlock() + s1.mu.Lock() + s1.eligibleForLongRunning = false + s1.checkoutTime = time.Now().Add(-time.Hour) + s1.mu.Unlock() + + s2.mu.Lock() + s2.eligibleForLongRunning = false + s2.checkoutTime = time.Now().Add(-time.Hour) + s2.mu.Unlock() + + s3.mu.Lock() + s3.eligibleForLongRunning = true + s3.checkoutTime = time.Now().Add(-time.Hour) + s3.mu.Unlock() + + // force run task to clean up unexpected long-running sessions + sp.removeLongRunningSessions() + + sp.mu.Lock() + defer sp.mu.Unlock() + sp.InactiveTransactionRemovalOptions.mu.Lock() + defer sp.InactiveTransactionRemovalOptions.mu.Unlock() + if g, w := sp.numOfLeakedSessionsRemoved, uint64(2); g != w { + t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := sp.numOpened, uint64(1); g != w { + t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w) + } +} + +func TestLongRunningTransactionsCleanup_IfLog_VerifyInactiveSessionsOpen(t *testing.T) { + t.Parallel() + ctx := context.Background() + _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 3, + InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ + actionOnInactiveTransaction: Warn, + }, + }, + }) + defer teardown() + sp := client.idleSessions + + // get session-1 from pool + s1, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + // get session-2 from pool + s2, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + // get session-3 from pool + s3, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + sp.mu.Lock() + if g, w := sp.numInUse, uint64(3); g != w { + sp.mu.Unlock() + t.Fatalf("Number of sessions currently in use mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := sp.numOpened, uint64(3); g != w { + sp.mu.Unlock() + t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w) + } + sp.mu.Unlock() + s1.mu.Lock() + s1.eligibleForLongRunning = false + s1.checkoutTime = time.Now().Add(-time.Hour) + s1.mu.Unlock() + + s2.mu.Lock() + s2.eligibleForLongRunning = false + s2.checkoutTime = time.Now().Add(-time.Hour) + s2.mu.Unlock() + + s3.mu.Lock() + s3.eligibleForLongRunning = true + s3.checkoutTime = time.Now().Add(-time.Hour) + s3.mu.Unlock() + + // force run task to clean up unexpected long-running sessions + sp.removeLongRunningSessions() + + s1.mu.Lock() + if !s1.isSessionLeakLogged { + t.Fatalf("Expect session leak logged for session %v", s1.session.id) + } + s1.mu.Unlock() + + s2.mu.Lock() + if !s2.isSessionLeakLogged { + t.Fatalf("Expect session leak logged for session %v", s2.session.id) + } + s2.mu.Unlock() + + s3.mu.Lock() + if s3.isSessionLeakLogged { + t.Fatalf("Incorrect session leak log as transaction is long running for session: %v", s3.session.id) + } + s3.mu.Unlock() + + sp.mu.Lock() + defer sp.mu.Unlock() + sp.InactiveTransactionRemovalOptions.mu.Lock() + defer sp.InactiveTransactionRemovalOptions.mu.Unlock() + if g, w := sp.numOfLeakedSessionsRemoved, uint64(0); g != w { + t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := sp.numOpened, uint64(3); g != w { + t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w) + } +} + +func TestLongRunningTransactionsCleanup_UtilisationBelowThreshold_VerifyInactiveSessionsOpen(t *testing.T) { + t.Parallel() + ctx := context.Background() + _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 3, + incStep: 1, + InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ + actionOnInactiveTransaction: WarnAndClose, + }, + }, + }) + defer teardown() + sp := client.idleSessions + + // get session-1 from pool + s1, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + // get session-2 from pool + s2, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + sp.mu.Lock() + if g, w := sp.numInUse, uint64(2); g != w { + sp.mu.Unlock() + t.Fatalf("Number of sessions currently in use mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := sp.numOpened, uint64(2); g != w { + sp.mu.Unlock() + t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w) + } + sp.mu.Unlock() + s1.mu.Lock() + s1.eligibleForLongRunning = false + s1.checkoutTime = time.Now().Add(-time.Hour) + s1.mu.Unlock() + + s2.mu.Lock() + s2.eligibleForLongRunning = false + s2.checkoutTime = time.Now().Add(-time.Hour) + s2.mu.Unlock() + + // force run task to clean up unexpected long-running sessions + sp.removeLongRunningSessions() + + sp.mu.Lock() + defer sp.mu.Unlock() + sp.InactiveTransactionRemovalOptions.mu.Lock() + defer sp.InactiveTransactionRemovalOptions.mu.Unlock() + // 2/3 sessions are used. Hence utilisation < 95%. + if g, w := sp.numOfLeakedSessionsRemoved, uint64(0); g != w { + t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := sp.numOpened, uint64(2); g != w { + t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w) + } +} + +func TestLongRunningTransactions_WhenAllExpectedlyLongRunning_VerifyInactiveSessionsOpen(t *testing.T) { + t.Parallel() + ctx := context.Background() + _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 3, + InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ + actionOnInactiveTransaction: Warn, + }, + }, + }) + defer teardown() + sp := client.idleSessions + + // get session-1 from pool + s1, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + // get session-2 from pool + s2, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + // get session-3 from pool + s3, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + sp.mu.Lock() + if g, w := sp.numInUse, uint64(3); g != w { + sp.mu.Unlock() + t.Fatalf("Number of sessions currently in use mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := sp.numOpened, uint64(3); g != w { + sp.mu.Unlock() + t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w) + } + sp.mu.Unlock() + s1.mu.Lock() + s1.eligibleForLongRunning = true + s1.checkoutTime = time.Now().Add(-time.Hour) + s1.mu.Unlock() + + s2.mu.Lock() + s2.eligibleForLongRunning = true + s2.checkoutTime = time.Now().Add(-time.Hour) + s2.mu.Unlock() + + s3.mu.Lock() + s3.eligibleForLongRunning = true + s3.checkoutTime = time.Now().Add(-time.Hour) + s3.mu.Unlock() + + // force run task to clean up unexpected long-running sessions + sp.removeLongRunningSessions() + + sp.mu.Lock() + defer sp.mu.Unlock() + sp.InactiveTransactionRemovalOptions.mu.Lock() + defer sp.InactiveTransactionRemovalOptions.mu.Unlock() + if g, w := sp.numOfLeakedSessionsRemoved, uint64(0); g != w { + t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := sp.numOpened, uint64(3); g != w { + t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w) + } +} + +func TestLongRunningTransactions_WhenDurationBelowThreshold_VerifyInactiveSessionsOpen(t *testing.T) { + t.Parallel() + ctx := context.Background() + _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 3, + InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{ + actionOnInactiveTransaction: Warn, + }, + }, + }) + defer teardown() + sp := client.idleSessions + + // get session-1 from pool + s1, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + // get session-2 from pool + s2, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + // get session-3 from pool + s3, err := sp.take(ctx) + if err != nil { + t.Fatalf("cannot get the session: %v", err) + } + if g, w := sp.numInUse, uint64(3); g != w { + sp.mu.Unlock() + t.Fatalf("Number of sessions currently in use mismatch\nGot: %d\nWant: %d\n", g, w) + } + sp.mu.Lock() + if g, w := sp.numOpened, uint64(3); g != w { + sp.mu.Unlock() + t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w) + } + sp.mu.Unlock() + s1.mu.Lock() + s1.eligibleForLongRunning = false + s1.checkoutTime = time.Now().Add(-50 * time.Minute) + s1.mu.Unlock() + + s2.mu.Lock() + s2.eligibleForLongRunning = false + s2.checkoutTime = time.Now().Add(-50 * time.Minute) + s2.mu.Unlock() + + s3.mu.Lock() + s3.eligibleForLongRunning = true + s3.checkoutTime = time.Now().Add(-50 * time.Minute) + s3.mu.Unlock() + + // force run task to clean up unexpected long-running sessions + sp.removeLongRunningSessions() + + s1.mu.Lock() + if s1.isSessionLeakLogged { + t.Fatalf("Session leak should not be logged for session %v as checkout duration is <60 mins", s1.session.id) + } + s1.mu.Unlock() + + s2.mu.Lock() + if s2.isSessionLeakLogged { + t.Fatalf("Session leak should not be logged for session %v as checkout duration is <60 mins", s2.session.id) + } + s2.mu.Unlock() + + sp.mu.Lock() + defer sp.mu.Unlock() + sp.InactiveTransactionRemovalOptions.mu.Lock() + defer sp.InactiveTransactionRemovalOptions.mu.Unlock() + if g, w := sp.numOfLeakedSessionsRemoved, uint64(0); g != w { + t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w) + } + if g, w := sp.numOpened, uint64(3); g != w { + t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w) + } +} + // TestMaxOpenedSessions tests max open sessions constraint. func TestMaxOpenedSessions(t *testing.T) { t.Parallel() diff --git a/spanner/transaction.go b/spanner/transaction.go index 81d7e036521c..6bbd6bd4d018 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -617,6 +617,8 @@ type ReadOnlyTransaction struct { rts time.Time // tb is the read staleness bound specification for transactional reads. tb TimestampBound + // isLongRunningTransaction indicates whether the transaction is long-running or not. + isLongRunningTransaction bool } // errTxInitTimeout returns error for timeout in waiting for initialization of @@ -669,6 +671,7 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error { // Retry the BeginTransaction call if a 'Session not found' is returned. for { sh, err = t.sp.take(ctx) + sh.eligibleForLongRunning = t.isLongRunningTransaction if err != nil { return err } @@ -1009,6 +1012,8 @@ type ReadWriteTransaction struct { state txState // wb is the set of buffered mutations waiting to be committed. wb []*Mutation + // isLongRunningTransaction indicates whether the transaction is long-running or not. + isLongRunningTransaction bool } // BufferWrite adds a list of mutations to the set of updates that will be @@ -1124,6 +1129,14 @@ func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts if err != nil { return nil, err } + // mark transaction and session to be eligible for long-running + t.mu.Lock() + t.isLongRunningTransaction = true + t.mu.Unlock() + t.sh.mu.Lock() + t.sh.eligibleForLongRunning = true + t.sh.mu.Unlock() + // Cloud Spanner will return "Session not found" on bad sessions. sid := sh.getID() if sid == "" { @@ -1387,6 +1400,14 @@ func (t *ReadWriteTransaction) begin(ctx context.Context) error { if err != nil { return err } + + sh.mu.Lock() + t.mu.Lock() + // for batch update operations, isLongRunningTransaction will be true + sh.eligibleForLongRunning = t.isLongRunningTransaction + t.mu.Unlock() + sh.mu.Unlock() + continue } else { err = ToSpannerError(err)