Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): Add lastUseTime property to session #8942

Merged
merged 8 commits into from
Nov 3, 2023
143 changes: 138 additions & 5 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ func TestClient_Single_WhenInactiveTransactionsAndSessionIsNotFoundOnBackend_Rem
iter := single.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
p := client.idleSessions
sh := single.sh
// simulate session to be checked out for more than 60mins
// simulate session to be last used before 60 mins
sh.mu.Lock()
sh.checkoutTime = time.Now().Add(-time.Hour)
sh.lastUseTime = time.Now().Add(-time.Hour)
sh.mu.Unlock()

// force run task to clean up unexpected long-running sessions
Expand Down Expand Up @@ -1038,6 +1038,72 @@ func TestClient_ReadOnlyTransaction_ReadOptions(t *testing.T) {
}
}

func TestClient_ReadOnlyTransaction_WhenMultipleOperations_SessionLastUseTimeShouldBeUpdated(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
idleTimeThreshold: 3 * time.Second,
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
},
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
SimulatedExecutionTime{
MinimumExecutionTime: 2 * time.Second,
})
server.TestSpanner.PutExecutionTime(MethodStreamingRead,
SimulatedExecutionTime{
MinimumExecutionTime: 2 * time.Second,
})
ctx := context.Background()
p := client.idleSessions

roTxn := client.ReadOnlyTransaction()
defer roTxn.Close()
iter := roTxn.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
iter.Next()
iter.Stop()

// Get the session last use time.
roTxn.sh.mu.Lock()
sessionPrevLastUseTime := roTxn.sh.lastUseTime
roTxn.sh.mu.Unlock()

iter = roTxn.Read(ctx, "FOO", AllKeys(), []string{"BAR"})
iter.Next()
iter.Stop()

// Get the latest session last use time
roTxn.sh.mu.Lock()
sessionLatestLastUseTime := roTxn.sh.lastUseTime
sessionCheckoutTime := roTxn.sh.checkoutTime
roTxn.sh.mu.Unlock()

// sessionLatestLastUseTime should not be equal to sessionPrevLastUseTime.
// This is because session lastUse time should be updated whenever a new operation is being executed on the transaction.
if (sessionLatestLastUseTime.Sub(sessionPrevLastUseTime)).Seconds() <= 0 {
t.Fatalf("Session lastUseTime times should not be equal")
}

if (time.Now().Sub(sessionPrevLastUseTime)).Seconds() < 4 {
t.Fatalf("Expected session to be checkedout for more than 4 seconds")
}
if (time.Now().Sub(sessionCheckoutTime)).Seconds() < 4 {
t.Fatalf("Expected session to be checkedout for more than 4 seconds")
}
// force run task to clean up unexpected long-running sessions whose lastUseTime >= 3sec.
// The session should not be cleaned since the latest operation on the transaction has updated the lastUseTime.
p.removeLongRunningSessions()
if p.numOfLeakedSessionsRemoved > 0 {
t.Fatalf("Expected session to not get cleaned by background maintainer")
}
}

func setQueryOptionsEnvVars(opts *sppb.ExecuteSqlRequest_QueryOptions) func() {
os.Setenv("SPANNER_OPTIMIZER_VERSION", opts.OptimizerVersion)
os.Setenv("SPANNER_OPTIMIZER_STATISTICS_PACKAGE", opts.OptimizerStatisticsPackage)
Expand Down Expand Up @@ -1460,10 +1526,10 @@ func TestClient_ReadWriteTransaction_WhenLongRunningSessionCleaned_TransactionSh
return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w)
}

// Simulate the session to be checked out for more than 60 mins.
// Simulate the session to be last used before 60 mins.
// The background task cleans up this long-running session.
tx.sh.mu.Lock()
tx.sh.checkoutTime = time.Now().Add(-time.Hour)
tx.sh.lastUseTime = time.Now().Add(-time.Hour)
if g, w := tx.sh.eligibleForLongRunning, false; g != w {
tx.sh.mu.Unlock()
return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w)
Expand All @@ -1489,6 +1555,73 @@ func TestClient_ReadWriteTransaction_WhenLongRunningSessionCleaned_TransactionSh
}
}

func TestClient_ReadWriteTransaction_WhenMultipleOperations_SessionLastUseTimeShouldBeUpdated(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
idleTimeThreshold: 3 * time.Second,
},
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteSql,
SimulatedExecutionTime{
MinimumExecutionTime: 2 * time.Second,
})
ctx := context.Background()
p := client.idleSessions
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
// Execute first operation on the transaction
_, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}

// Get the session last use time.
tx.sh.mu.Lock()
sessionPrevLastUseTime := tx.sh.lastUseTime
tx.sh.mu.Unlock()

// Execute second operation on the transaction
_, err = tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}
// Get the latest session last use time
tx.sh.mu.Lock()
sessionLatestLastUseTime := tx.sh.lastUseTime
sessionCheckoutTime := tx.sh.checkoutTime
tx.sh.mu.Unlock()

// sessionLatestLastUseTime should not be equal to sessionPrevLastUseTime.
// This is because session lastUse time should be updated whenever a new operation is being executed on the transaction.
if (sessionLatestLastUseTime.Sub(sessionPrevLastUseTime)).Seconds() <= 0 {
t.Fatalf("Session lastUseTime times should not be equal")
}

if (time.Now().Sub(sessionPrevLastUseTime)).Seconds() < 4 {
t.Fatalf("Expected session to be checkedout for more than 4 seconds")
}
if (time.Now().Sub(sessionCheckoutTime)).Seconds() < 4 {
t.Fatalf("Expected session to be checkedout for more than 4 seconds")
}
// force run task to clean up unexpected long-running sessions whose lastUseTime >= 3sec.
// The session should not be cleaned since the latest operation on the transaction has updated the lastUseTime.
p.removeLongRunningSessions()
if p.numOfLeakedSessionsRemoved > 0 {
t.Fatalf("Expected session to not get cleaned by background maintainer")
}
return nil
})
if err != nil {
t.Fatal(err)
}
}

func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1546,7 +1679,7 @@ func TestClient_ReadWriteTransaction_WhenLongRunningExecuteBatchUpdate_TakeNoAct
if attempts == 2 {
// Simulate the session to be long-running. The background task should not clean up this long-running session.
tx.sh.mu.Lock()
tx.sh.checkoutTime = time.Now().Add(-time.Hour)
tx.sh.lastUseTime = time.Now().Add(-time.Hour)
if g, w := tx.sh.eligibleForLongRunning, true; g != w {
tx.sh.mu.Unlock()
return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w)
Expand Down
16 changes: 14 additions & 2 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type sessionHandle struct {
session *session
// checkoutTime is the time the session was checked out of the pool.
checkoutTime time.Time
// lastUseTime is the time the session was last used after checked out of the pool.
lastUseTime time.Time
// trackedSessionHandle is the linked list node which links the session to
// the list of tracked session handles. trackedSessionHandle is only set if
// TrackSessionHandles has been enabled in the session pool configuration.
Expand Down Expand Up @@ -118,6 +120,7 @@ func (sh *sessionHandle) recycle() {
sh.session = nil
sh.trackedSessionHandle = nil
sh.checkoutTime = time.Time{}
sh.lastUseTime = time.Time{}
sh.stack = nil
sh.mu.Unlock()
s.recycle()
Expand Down Expand Up @@ -187,6 +190,7 @@ func (sh *sessionHandle) destroy() {
sh.session = nil
sh.trackedSessionHandle = nil
sh.checkoutTime = time.Time{}
sh.lastUseTime = time.Time{}
sh.stack = nil
sh.mu.Unlock()

Expand All @@ -199,6 +203,14 @@ func (sh *sessionHandle) destroy() {
s.destroy(false)
}

func (sh *sessionHandle) updateLastUseTime() {
sh.mu.Lock()
defer sh.mu.Unlock()
if sh.session != nil {
sh.lastUseTime = time.Now()
}
}

// session wraps a Cloud Spanner session ID through which transactions are
// created and executed.
type session struct {
Expand Down Expand Up @@ -712,7 +724,7 @@ func (p *sessionPool) getLongRunningSessionsLocked() []*sessionHandle {
for element != nil {
sh := element.Value.(*sessionHandle)
sh.mu.Lock()
diff := time.Now().Sub(sh.checkoutTime)
diff := time.Now().Sub(sh.lastUseTime)
if !sh.eligibleForLongRunning && diff.Seconds() >= p.idleTimeThreshold.Seconds() {
if (p.actionOnInactiveTransaction == Warn || p.actionOnInactiveTransaction == WarnAndClose) && !sh.isSessionLeakLogged {
if p.actionOnInactiveTransaction == Warn {
Expand Down Expand Up @@ -880,7 +892,7 @@ var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canc
// stack if the session pool has been configured to track the call stacks of
// sessions being checked out of the pool.
func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) {
sh = &sessionHandle{session: s, checkoutTime: time.Now()}
sh = &sessionHandle{session: s, checkoutTime: time.Now(), lastUseTime: time.Now()}
if p.TrackSessionHandles || p.actionOnInactiveTransaction == Warn || p.actionOnInactiveTransaction == WarnAndClose || p.actionOnInactiveTransaction == Close {
p.mu.Lock()
sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh)
Expand Down
38 changes: 19 additions & 19 deletions spanner/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ func TestSessionLeak_WhenInactiveTransactions_RemoveSessionsFromPool(t *testing.
t.Fatalf("isLongRunningTransaction mismatch\nGot: %v\nWant: %v\n", g, w)
}

// Mock the session checkout time to be greater than 60 mins
single.sh.checkoutTime = time.Now().Add(-time.Hour)
// Mock the session lastUseTime to be greater than 60 mins
single.sh.lastUseTime = time.Now().Add(-time.Hour)
single.sh.mu.Unlock()

// force run task to clean up unexpected long-running sessions
Expand Down Expand Up @@ -526,17 +526,17 @@ func TestMaintainer_LongRunningTransactionsCleanup_IfClose_VerifyInactiveSession
sp.mu.Unlock()
s1.mu.Lock()
s1.eligibleForLongRunning = false
s1.checkoutTime = time.Now().Add(-time.Hour)
s1.lastUseTime = time.Now().Add(-time.Hour)
s1.mu.Unlock()

s2.mu.Lock()
s2.eligibleForLongRunning = false
s2.checkoutTime = time.Now().Add(-time.Hour)
s2.lastUseTime = time.Now().Add(-time.Hour)
s2.mu.Unlock()

s3.mu.Lock()
s3.eligibleForLongRunning = true
s3.checkoutTime = time.Now().Add(-time.Hour)
s3.lastUseTime = time.Now().Add(-time.Hour)
s3.mu.Unlock()

// Sleep for maintainer to run long-running cleanup task
Expand Down Expand Up @@ -596,17 +596,17 @@ func TestLongRunningTransactionsCleanup_IfClose_VerifyInactiveSessionsClosed(t *
sp.mu.Unlock()
s1.mu.Lock()
s1.eligibleForLongRunning = false
s1.checkoutTime = time.Now().Add(-time.Hour)
s1.lastUseTime = time.Now().Add(-time.Hour)
s1.mu.Unlock()

s2.mu.Lock()
s2.eligibleForLongRunning = false
s2.checkoutTime = time.Now().Add(-time.Hour)
s2.lastUseTime = time.Now().Add(-time.Hour)
s2.mu.Unlock()

s3.mu.Lock()
s3.eligibleForLongRunning = true
s3.checkoutTime = time.Now().Add(-time.Hour)
s3.lastUseTime = time.Now().Add(-time.Hour)
s3.mu.Unlock()

// force run task to clean up unexpected long-running sessions
Expand Down Expand Up @@ -666,17 +666,17 @@ func TestLongRunningTransactionsCleanup_IfLog_VerifyInactiveSessionsOpen(t *test
sp.mu.Unlock()
s1.mu.Lock()
s1.eligibleForLongRunning = false
s1.checkoutTime = time.Now().Add(-time.Hour)
s1.lastUseTime = time.Now().Add(-time.Hour)
s1.mu.Unlock()

s2.mu.Lock()
s2.eligibleForLongRunning = false
s2.checkoutTime = time.Now().Add(-time.Hour)
s2.lastUseTime = time.Now().Add(-time.Hour)
s2.mu.Unlock()

s3.mu.Lock()
s3.eligibleForLongRunning = true
s3.checkoutTime = time.Now().Add(-time.Hour)
s3.lastUseTime = time.Now().Add(-time.Hour)
s3.mu.Unlock()

// force run task to clean up unexpected long-running sessions
Expand Down Expand Up @@ -750,12 +750,12 @@ func TestLongRunningTransactionsCleanup_UtilisationBelowThreshold_VerifyInactive
sp.mu.Unlock()
s1.mu.Lock()
s1.eligibleForLongRunning = false
s1.checkoutTime = time.Now().Add(-time.Hour)
s1.lastUseTime = time.Now().Add(-time.Hour)
s1.mu.Unlock()

s2.mu.Lock()
s2.eligibleForLongRunning = false
s2.checkoutTime = time.Now().Add(-time.Hour)
s2.lastUseTime = time.Now().Add(-time.Hour)
s2.mu.Unlock()

// force run task to clean up unexpected long-running sessions
Expand Down Expand Up @@ -816,17 +816,17 @@ func TestLongRunningTransactions_WhenAllExpectedlyLongRunning_VerifyInactiveSess
sp.mu.Unlock()
s1.mu.Lock()
s1.eligibleForLongRunning = true
s1.checkoutTime = time.Now().Add(-time.Hour)
s1.lastUseTime = time.Now().Add(-time.Hour)
s1.mu.Unlock()

s2.mu.Lock()
s2.eligibleForLongRunning = true
s2.checkoutTime = time.Now().Add(-time.Hour)
s2.lastUseTime = time.Now().Add(-time.Hour)
s2.mu.Unlock()

s3.mu.Lock()
s3.eligibleForLongRunning = true
s3.checkoutTime = time.Now().Add(-time.Hour)
s3.lastUseTime = time.Now().Add(-time.Hour)
s3.mu.Unlock()

// force run task to clean up unexpected long-running sessions
Expand Down Expand Up @@ -886,17 +886,17 @@ func TestLongRunningTransactions_WhenDurationBelowThreshold_VerifyInactiveSessio
sp.mu.Unlock()
s1.mu.Lock()
s1.eligibleForLongRunning = false
s1.checkoutTime = time.Now().Add(-50 * time.Minute)
s1.lastUseTime = time.Now().Add(-50 * time.Minute)
s1.mu.Unlock()

s2.mu.Lock()
s2.eligibleForLongRunning = false
s2.checkoutTime = time.Now().Add(-50 * time.Minute)
s2.lastUseTime = time.Now().Add(-50 * time.Minute)
s2.mu.Unlock()

s3.mu.Lock()
s3.eligibleForLongRunning = true
s3.checkoutTime = time.Now().Add(-50 * time.Minute)
s3.lastUseTime = time.Now().Add(-50 * time.Minute)
s3.mu.Unlock()

// force run task to clean up unexpected long-running sessions
Expand Down
Loading
Loading