Skip to content

Commit

Permalink
feat(spanner): update code for session leaks cleanup (#8978)
Browse files Browse the repository at this point in the history
* feat(spanner): fix nil pointer and move to a function

* feat(spanner): remove mutex

* feat(spanner): move variable parameter to sessionpool struct

* feat(spanner): fix test flakiness by force running clean up task

* feat(spanner): fix flakiness
  • Loading branch information
harshachinta authored Nov 8, 2023
1 parent ca76671 commit cc83515
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 61 deletions.
11 changes: 3 additions & 8 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,18 +569,13 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
if sh == nil || sh.getID() == "" || sh.getClient() == nil {
// Session handle hasn't been allocated or has been destroyed.
sh, err = c.idleSessions.take(ctx)
if t != nil {
// Some operations (for ex BatchUpdate) can be long-running. For such operations set the isLongRunningTransaction flag to be true
sh.mu.Lock()
t.mu.Lock()
sh.eligibleForLongRunning = t.isLongRunningTransaction
t.mu.Unlock()
sh.mu.Unlock()
}
if err != nil {
// If session retrieval fails, just fail the transaction.
return err
}

// Some operations (for ex BatchUpdate) can be long-running. For such operations set the isLongRunningTransaction flag to be true
t.setSessionEligibilityForLongRunning(sh)
}
if t.shouldExplicitBegin(attempt) {
// Make sure we set the current session handle before calling BeginTransaction.
Expand Down
22 changes: 10 additions & 12 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,6 @@ func TestClient_Single_WhenInactiveTransactionsAndSessionIsNotFoundOnBackend_Rem
if g, w := sh.eligibleForLongRunning, false; g != w {
t.Fatalf("isLongRunningTransaction mismatch\nGot: %v\nWant: %v\n", g, w)
}
p.InactiveTransactionRemovalOptions.mu.Lock()
defer p.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := p.numOfLeakedSessionsRemoved, uint64(1); g != w {
t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
Expand Down Expand Up @@ -1563,14 +1561,14 @@ func TestClient_ReadWriteTransaction_WhenMultipleOperations_SessionLastUseTimeSh
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
idleTimeThreshold: 30 * time.Millisecond,
idleTimeThreshold: 300 * time.Millisecond,
},
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteSql,
SimulatedExecutionTime{
MinimumExecutionTime: 20 * time.Millisecond,
MinimumExecutionTime: 200 * time.Millisecond,
})
ctx := context.Background()
p := client.idleSessions
Expand Down Expand Up @@ -1603,11 +1601,11 @@ func TestClient_ReadWriteTransaction_WhenMultipleOperations_SessionLastUseTimeSh
t.Fatalf("Session lastUseTime times should not be equal")
}

if (time.Now().Sub(sessionPrevLastUseTime)).Milliseconds() < 40 {
t.Fatalf("Expected session to be checkedout for more than 40 milliseconds")
if (time.Now().Sub(sessionPrevLastUseTime)).Milliseconds() < 400 {
t.Fatalf("Expected session to be checkedout for more than 400 milliseconds")
}
if (time.Now().Sub(sessionCheckoutTime)).Milliseconds() < 40 {
t.Fatalf("Expected session to be checkedout for more than 40 milliseconds")
if (time.Now().Sub(sessionCheckoutTime)).Milliseconds() < 400 {
t.Fatalf("Expected session to be checkedout for more than 400 milliseconds")
}
// force run task to clean up unexpected long-running sessions whose lastUseTime >= 3sec.
// The session should not be cleaned since the latest operation on the transaction has updated the lastUseTime.
Expand Down Expand Up @@ -1707,8 +1705,8 @@ func TestClient_ReadWriteTransaction_WhenLongRunningExecuteBatchUpdate_TakeNoAct
if g, w := attempts, 2; g != w {
t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w)
}
p.InactiveTransactionRemovalOptions.mu.Lock()
defer p.InactiveTransactionRemovalOptions.mu.Unlock()
p.mu.Lock()
defer p.mu.Unlock()
if g, w := p.numOfLeakedSessionsRemoved, uint64(0); g != w {
t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
Expand Down Expand Up @@ -4256,8 +4254,8 @@ func TestClient_WhenLongRunningPartitionedUpdateRequest_TakeNoAction(t *testing.
t.Errorf("Row count mismatch\nGot: %v\nWant: %v", g, w)
}
p := client.idleSessions
p.InactiveTransactionRemovalOptions.mu.Lock()
defer p.InactiveTransactionRemovalOptions.mu.Unlock()
p.mu.Lock()
defer p.mu.Unlock()
if g, w := p.numOfLeakedSessionsRemoved, uint64(0); g != w {
t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
Expand Down
8 changes: 4 additions & 4 deletions spanner/pdml.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
}

sh, err := c.idleSessions.take(ctx)
// Mark isLongRunningTransaction to true, as the session in case of partitioned dml can be long-running
sh.mu.Lock()
sh.eligibleForLongRunning = true
sh.mu.Unlock()
if err != nil {
return 0, ToSpannerError(err)
}
if sh != nil {
defer sh.recycle()
}
// Mark isLongRunningTransaction to true, as the session in case of partitioned dml can be long-running
sh.mu.Lock()
sh.eligibleForLongRunning = true
sh.mu.Unlock()

// Create the parameters and the SQL request, but without a transaction.
// The transaction reference will be added by the executePdml method.
Expand Down
16 changes: 9 additions & 7 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ const (

// InactiveTransactionRemovalOptions has configurations for action on long-running transactions.
type InactiveTransactionRemovalOptions struct {
mu sync.Mutex
// actionOnInactiveTransaction is the configuration to choose action for inactive transactions.
// It can be one of Warn, Close, WarnAndClose.
actionOnInactiveTransaction ActionOnInactiveTransactionKind
Expand All @@ -74,9 +73,6 @@ type InactiveTransactionRemovalOptions struct {
// variable that keeps track of the last execution time when inactive transactions
// were removed by the maintainer task.
lastExecutionTime time.Time
// indicates the number of leaked sessions removed from the session pool.
// This is valid only when ActionOnInactiveTransaction is WarnAndClose or ActionOnInactiveTransaction is Close.
numOfLeakedSessionsRemoved uint64
}

// sessionHandle is an interface for transactions to access Cloud Spanner
Expand Down Expand Up @@ -613,6 +609,10 @@ type sessionPool struct {

// tagMap is a map of all tags that are associated with the emitted metrics.
tagMap *tag.Map

// indicates the number of leaked sessions removed from the session pool.
// This is valid only when ActionOnInactiveTransaction is WarnAndClose or ActionOnInactiveTransaction is Close in InactiveTransactionRemovalOptions.
numOfLeakedSessionsRemoved uint64
}

// newSessionPool creates a new session pool.
Expand Down Expand Up @@ -761,14 +761,16 @@ func (p *sessionPool) removeLongRunningSessions() {

// destroy long-running sessions
if p.actionOnInactiveTransaction == WarnAndClose || p.actionOnInactiveTransaction == Close {
var leakedSessionsRemovedCount uint64
for _, sh := range longRunningSessions {
// removes inner session out of the pool to reduce the probability of two processes trying
// to use the same session at the same time.
sh.destroy()
p.InactiveTransactionRemovalOptions.mu.Lock()
p.InactiveTransactionRemovalOptions.numOfLeakedSessionsRemoved++
p.InactiveTransactionRemovalOptions.mu.Unlock()
leakedSessionsRemovedCount++
}
p.mu.Lock()
p.numOfLeakedSessionsRemoved += leakedSessionsRemovedCount
p.mu.Unlock()
}
}

Expand Down
16 changes: 2 additions & 14 deletions spanner/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,6 @@ func TestSessionLeak_WhenInactiveTransactions_RemoveSessionsFromPool(t *testing.
if g, w := p.numOpened, uint64(0); g != w {
t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w)
}
p.InactiveTransactionRemovalOptions.mu.Lock()
defer p.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := p.numOfLeakedSessionsRemoved, uint64(1); g != w {
t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
Expand Down Expand Up @@ -541,11 +539,11 @@ func TestMaintainer_LongRunningTransactionsCleanup_IfClose_VerifyInactiveSession

// Sleep for maintainer to run long-running cleanup task
time.Sleep(30 * time.Millisecond)
// force run task to clean up unexpected long-running sessions
sp.removeLongRunningSessions()

sp.mu.Lock()
defer sp.mu.Unlock()
sp.InactiveTransactionRemovalOptions.mu.Lock()
defer sp.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := sp.numOfLeakedSessionsRemoved, uint64(2); g != w {
t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
Expand Down Expand Up @@ -614,8 +612,6 @@ func TestLongRunningTransactionsCleanup_IfClose_VerifyInactiveSessionsClosed(t *

sp.mu.Lock()
defer sp.mu.Unlock()
sp.InactiveTransactionRemovalOptions.mu.Lock()
defer sp.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := sp.numOfLeakedSessionsRemoved, uint64(2); g != w {
t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
Expand Down Expand Up @@ -702,8 +698,6 @@ func TestLongRunningTransactionsCleanup_IfLog_VerifyInactiveSessionsOpen(t *test

sp.mu.Lock()
defer sp.mu.Unlock()
sp.InactiveTransactionRemovalOptions.mu.Lock()
defer sp.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := sp.numOfLeakedSessionsRemoved, uint64(0); g != w {
t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
Expand Down Expand Up @@ -763,8 +757,6 @@ func TestLongRunningTransactionsCleanup_UtilisationBelowThreshold_VerifyInactive

sp.mu.Lock()
defer sp.mu.Unlock()
sp.InactiveTransactionRemovalOptions.mu.Lock()
defer sp.InactiveTransactionRemovalOptions.mu.Unlock()
// 2/3 sessions are used. Hence utilisation < 95%.
if g, w := sp.numOfLeakedSessionsRemoved, uint64(0); g != w {
t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
Expand Down Expand Up @@ -834,8 +826,6 @@ func TestLongRunningTransactions_WhenAllExpectedlyLongRunning_VerifyInactiveSess

sp.mu.Lock()
defer sp.mu.Unlock()
sp.InactiveTransactionRemovalOptions.mu.Lock()
defer sp.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := sp.numOfLeakedSessionsRemoved, uint64(0); g != w {
t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
Expand Down Expand Up @@ -916,8 +906,6 @@ func TestLongRunningTransactions_WhenDurationBelowThreshold_VerifyInactiveSessio

sp.mu.Lock()
defer sp.mu.Unlock()
sp.InactiveTransactionRemovalOptions.mu.Lock()
defer sp.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := sp.numOfLeakedSessionsRemoved, uint64(0); g != w {
t.Fatalf("No of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
Expand Down
46 changes: 30 additions & 16 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type txReadEnv interface {
// release should be called at the end of every transactional read to deal
// with session recycling.
release(error)
setSessionEligibilityForLongRunning(sh *sessionHandle)
}

// txReadOnly contains methods for doing transactional reads.
Expand Down Expand Up @@ -676,10 +677,10 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
// Retry the BeginTransaction call if a 'Session not found' is returned.
for {
sh, err = t.sp.take(ctx)
sh.eligibleForLongRunning = t.isLongRunningTransaction
if err != nil {
return err
}
t.setSessionEligibilityForLongRunning(sh)
sh.updateLastUseTime()
var md metadata.MD
res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), &sppb.BeginTransactionRequest{
Expand Down Expand Up @@ -937,6 +938,16 @@ func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTra
return t
}

func (t *ReadOnlyTransaction) setSessionEligibilityForLongRunning(sh *sessionHandle) {
if t != nil && sh != nil {
sh.mu.Lock()
t.mu.Lock()
sh.eligibleForLongRunning = t.isLongRunningTransaction
t.mu.Unlock()
sh.mu.Unlock()
}
}

// ReadWriteTransaction provides a locking read-write transaction.
//
// This type of transaction is the only way to write data into Cloud Spanner;
Expand Down Expand Up @@ -1136,13 +1147,6 @@ func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts
if err != nil {
return nil, err
}
// mark transaction and session to be eligible for long-running
t.mu.Lock()
t.isLongRunningTransaction = true
t.mu.Unlock()
t.sh.mu.Lock()
t.sh.eligibleForLongRunning = true
t.sh.mu.Unlock()

// Cloud Spanner will return "Session not found" on bad sessions.
sid := sh.getID()
Expand All @@ -1151,6 +1155,12 @@ func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts
return nil, errSessionClosed(sh)
}

// mark transaction and session to be eligible for long-running
t.mu.Lock()
t.isLongRunningTransaction = true
t.mu.Unlock()
t.setSessionEligibilityForLongRunning(sh)

var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement
for _, st := range stmts {
params, paramTypes, err := st.convertParams()
Expand Down Expand Up @@ -1339,6 +1349,16 @@ func (t *ReadWriteTransaction) release(err error) {
}
}

func (t *ReadWriteTransaction) setSessionEligibilityForLongRunning(sh *sessionHandle) {
if t != nil && sh != nil {
sh.mu.Lock()
t.mu.Lock()
sh.eligibleForLongRunning = t.isLongRunningTransaction
t.mu.Unlock()
sh.mu.Unlock()
}
}

func beginTransaction(ctx context.Context, sid string, client *vkit.Client, opts TransactionOptions) (transactionID, error) {
res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
Session: sid,
Expand Down Expand Up @@ -1411,14 +1431,8 @@ func (t *ReadWriteTransaction) begin(ctx context.Context) error {
if err != nil {
return err
}

sh.mu.Lock()
t.mu.Lock()
// for batch update operations, isLongRunningTransaction will be true
sh.eligibleForLongRunning = t.isLongRunningTransaction
t.mu.Unlock()
sh.mu.Unlock()

// Some operations (for ex BatchUpdate) can be long-running. For such operations set the isLongRunningTransaction flag to be true
t.setSessionEligibilityForLongRunning(sh)
continue
} else {
err = ToSpannerError(err)
Expand Down

0 comments on commit cc83515

Please sign in to comment.