Skip to content

Commit

Permalink
feat(spanner): long running transaction clean up - disabled (#8177)
Browse files Browse the repository at this point in the history
* feat(spanner): long running transaction clean up to prevent session leaks

* feat(spanner): code refactoring in session.go file

* fix(spanner): fix vet

* feat(spanner): refactor client.go file

* feat(spanner): add lock when updating session handle

* feat(spanner): code refactoring in transaction.go file

* test(spanner): remove test

* feat(spanner): code refactor

* feat(spanner): refactor nit comments

* feat(spanner): reduce sleep timing to milli seconds for unit tests

* feat(spanner): update idleTimeThresholdSecs field to time.Duration

* feat(spanner): make the log messages conditional based on type of action for inactive transactions

* feat(spanner): combine get and remove long running sessions in a single function to avoid sleep statements during unit tests

* feat(spanner): modify presubmit condition to run tests for changed modules. Currently tests are not getting run as part of github presubmits.

* feat(spanner): revert presubmit.sh fix

* feat(spanner): update doc

* feat(spanner): reword isLongRunning to eligibleForLongRunning in sessionHandle

* feat(spanner): update TrackSessionHandles logic to turn off stack trace by default for long running sessions

* feat(spanner): fix test

* feat(spanner): change action on inactive transactions option to enum

* feat(spanner): fix lint

* feat(spanner): fix lint

* feat(spanner): fix lint

* feat(spanner): fix doc

* feat(spanner): disable the feature by default

* feat(spanner): revert commit - disable the feature by default

* fix: lint

* docs: add Readme

* feat(spanner): make WARN as default for action on inactive transactions

* docs: address review comments

* docs: move README.md changes in a different PR

* feat: disable feature
  • Loading branch information
harshachinta authored and bhshkh committed Nov 3, 2023
1 parent e3fbe6e commit c37a735
Show file tree
Hide file tree
Showing 6 changed files with 928 additions and 18 deletions.
26 changes: 18 additions & 8 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,11 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound

t := &BatchReadOnlyTransaction{
ReadOnlyTransaction: ReadOnlyTransaction{
tx: tx,
txReadyOrClosed: make(chan struct{}),
state: txActive,
rts: rts,
tx: tx,
txReadyOrClosed: make(chan struct{}),
state: txActive,
rts: rts,
isLongRunningTransaction: true,
},
ID: BatchReadOnlyTransactionID{
tid: tx,
Expand Down Expand Up @@ -481,10 +482,11 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)

t := &BatchReadOnlyTransaction{
ReadOnlyTransaction: ReadOnlyTransaction{
tx: tid.tid,
txReadyOrClosed: make(chan struct{}),
state: txActive,
rts: tid.rts,
tx: tid.tid,
txReadyOrClosed: make(chan struct{}),
state: txActive,
rts: tid.rts,
isLongRunningTransaction: true,
},
ID: tid,
}
Expand Down Expand Up @@ -566,6 +568,14 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
if sh == nil || sh.getID() == "" || sh.getClient() == nil {
// Session handle hasn't been allocated or has been destroyed.
sh, err = c.idleSessions.take(ctx)
if t != nil {
// Some operations (for ex BatchUpdate) can be long-running. For such operations set the isLongRunningTransaction flag to be true
sh.mu.Lock()
t.mu.Lock()
sh.eligibleForLongRunning = t.isLongRunningTransaction
t.mu.Unlock()
sh.mu.Unlock()
}
if err != nil {
// If session retrieval fails, just fail the transaction.
return err
Expand Down
217 changes: 217 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,73 @@ func TestClient_Single_Read_SessionNotFound(t *testing.T) {
}
}

func TestClient_Single_WhenInactiveTransactionsAndSessionIsNotFoundOnBackend_RemoveSessionFromPool(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
},
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteStreamingSql,
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
ctx := context.Background()
single := client.Single()
iter := single.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
p := client.idleSessions
sh := single.sh
// simulate session to be checked out for more than 60mins
sh.mu.Lock()
sh.checkoutTime = time.Now().Add(-time.Hour)
sh.mu.Unlock()

// force run task to clean up unexpected long-running sessions
p.removeLongRunningSessions()
rowCount := int64(0)
for {
// Backend throws SessionNotFoundError. Session gets replaced with new session
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
t.Fatal(err)
}
rowCount++
}
// New session returns back to pool
iter.Stop()

p.mu.Lock()
defer p.mu.Unlock()
if g, w := p.idleList.Len(), 1; g != w {
t.Fatalf("Idle Sessions in pool, count mismatch\nGot: %d\nWant: %d\n", g, w)
}
if g, w := p.numInUse, uint64(0); g != w {
t.Fatalf("Number of sessions currently in use mismatch\nGot: %d\nWant: %d\n", g, w)
}
if g, w := p.numOpened, uint64(1); g != w {
t.Fatalf("Session pool size mismatch\nGot: %d\nWant: %d\n", g, w)
}

sh.mu.Lock()
defer sh.mu.Unlock()
if g, w := sh.eligibleForLongRunning, false; g != w {
t.Fatalf("isLongRunningTransaction mismatch\nGot: %v\nWant: %v\n", g, w)
}
p.InactiveTransactionRemovalOptions.mu.Lock()
defer p.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := p.numOfLeakedSessionsRemoved, uint64(1); g != w {
t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
}

func TestClient_Single_ReadRow_SessionNotFound(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1369,6 +1436,59 @@ func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteUpdate(t *testing.T
}
}

func TestClient_ReadWriteTransaction_WhenLongRunningSessionCleaned_TransactionShouldFail(t *testing.T) {
t.Parallel()
_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
},
},
})
defer teardown()
ctx := context.Background()
p := client.idleSessions
msg := "session is already recycled / destroyed"
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
rowCount, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}
if g, w := rowCount, int64(UpdateBarSetFooRowCount); g != w {
return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w)
}

// Simulate the session to be checked out for more than 60 mins.
// The background task cleans up this long-running session.
tx.sh.mu.Lock()
tx.sh.checkoutTime = time.Now().Add(-time.Hour)
if g, w := tx.sh.eligibleForLongRunning, false; g != w {
tx.sh.mu.Unlock()
return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w)
}
tx.sh.mu.Unlock()

// force run task to clean up unexpected long-running sessions
p.removeLongRunningSessions()

// The session associated with this transaction tx has been destroyed. So the below call should fail.
// Eventually this means the entire transaction should not succeed.
_, err = tx.Update(ctx, NewStatement("UPDATE FOO SET BAR='value' WHERE ID=1"))
if err != nil {
return err
}
return nil
})
if err == nil {
t.Fatalf("Missing expected exception")
}
if status.Code(err) != codes.FailedPrecondition || !strings.Contains(err.Error(), msg) {
t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg)
}
}

func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1402,6 +1522,65 @@ func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *test
}
}

func TestClient_ReadWriteTransaction_WhenLongRunningExecuteBatchUpdate_TakeNoAction(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
},
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteBatchDml,
SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
ctx := context.Background()
p := client.idleSessions
var attempts int
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
attempts++
if attempts == 2 {
// Simulate the session to be long-running. The background task should not clean up this long-running session.
tx.sh.mu.Lock()
tx.sh.checkoutTime = time.Now().Add(-time.Hour)
if g, w := tx.sh.eligibleForLongRunning, true; g != w {
tx.sh.mu.Unlock()
return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w)
}
tx.sh.mu.Unlock()

// force run task to clean up unexpected long-running sessions
p.removeLongRunningSessions()
}
rowCounts, err := tx.BatchUpdate(ctx, []Statement{NewStatement(UpdateBarSetFoo)})
if err != nil {
return err
}
if g, w := len(rowCounts), 1; g != w {
return status.Errorf(codes.FailedPrecondition, "Row counts length mismatch\nGot: %v\nWant: %v", g, w)
}
if g, w := rowCounts[0], int64(UpdateBarSetFooRowCount); g != w {
return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w)
}
return nil
})
if err != nil {
t.Fatal(err)
}
if g, w := attempts, 2; g != w {
t.Fatalf("number of attempts mismatch:\nGot%d\nWant:%d", g, w)
}
p.InactiveTransactionRemovalOptions.mu.Lock()
defer p.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := p.numOfLeakedSessionsRemoved, uint64(0); g != w {
t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
}

func TestClient_ReadWriteTransaction_Query_QueryOptions(t *testing.T) {
for _, tt := range queryOptionsTestCases() {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -3913,6 +4092,44 @@ func TestClient_PDML_Priority(t *testing.T) {
}
}

func TestClient_WhenLongRunningPartitionedUpdateRequest_TakeNoAction(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
healthCheckSampleInterval: 10 * time.Millisecond, // maintainer runs every 10ms
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
executionFrequency: 15 * time.Millisecond, // check long-running sessions every 15ms
},
},
})
defer teardown()
// delay the rpc by 30ms. The background task runs to clean long-running sessions.
server.TestSpanner.PutExecutionTime(MethodExecuteSql,
SimulatedExecutionTime{
MinimumExecutionTime: 30 * time.Millisecond,
})

stmt := NewStatement(UpdateBarSetFoo)
// This transaction is eligible to be long-running, so the background task should not clean its session.
rowCount, err := client.PartitionedUpdate(ctx, stmt)
if err != nil {
t.Fatal(err)
}
if g, w := rowCount, int64(UpdateBarSetFooRowCount); g != w {
t.Errorf("Row count mismatch\nGot: %v\nWant: %v", g, w)
}
p := client.idleSessions
p.InactiveTransactionRemovalOptions.mu.Lock()
defer p.InactiveTransactionRemovalOptions.mu.Unlock()
if g, w := p.numOfLeakedSessionsRemoved, uint64(0); g != w {
t.Fatalf("Number of leaked sessions removed mismatch\nGot: %d\nWant: %d\n", g, w)
}
}

func TestClient_Apply_Priority(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 4 additions & 0 deletions spanner/pdml.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
}

sh, err := c.idleSessions.take(ctx)
// Mark isLongRunningTransaction to true, as the session in case of partitioned dml can be long-running
sh.mu.Lock()
sh.eligibleForLongRunning = true
sh.mu.Unlock()
if err != nil {
return 0, ToSpannerError(err)
}
Expand Down
Loading

0 comments on commit c37a735

Please sign in to comment.