Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): Add lastUseTime property to session #8942

Merged
merged 8 commits into from
Nov 3, 2023
3 changes: 3 additions & 0 deletions spanner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
return nil, err
}
var md metadata.MD
sh.updateLastUseTime()
resp, err = client.PartitionRead(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), &sppb.PartitionReadRequest{
Session: sid,
Transaction: ts,
Expand Down Expand Up @@ -203,6 +204,7 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
Params: params,
ParamTypes: paramTypes,
}
sh.updateLastUseTime()
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader), req, gax.WithGRPCOptions(grpc.Header(&md)))

if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
Expand Down Expand Up @@ -306,6 +308,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
// Might happen if transaction is closed in the middle of a API call.
return &RowIterator{err: errSessionClosed(sh)}
}
sh.updateLastUseTime()
// Read or query partition.
if p.rreq != nil {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
Expand Down
2 changes: 2 additions & 0 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
return nil, err
}
sh = &sessionHandle{session: s}
sh.updateLastUseTime()

// Begin transaction.
res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata(), true), &sppb.BeginTransactionRequest{
Expand Down Expand Up @@ -854,6 +855,7 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup

rpc := func(ct context.Context) (sppb.Spanner_BatchWriteClient, error) {
var md metadata.MD
sh.updateLastUseTime()
stream, rpcErr := sh.getClient().BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
Session: sh.getID(),
MutationGroups: mgsPb,
Expand Down
143 changes: 138 additions & 5 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ func TestClient_Single_WhenInactiveTransactionsAndSessionIsNotFoundOnBackend_Rem
iter := single.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
p := client.idleSessions
sh := single.sh
// simulate session to be checked out for more than 60mins
// simulate session to be last used before 60 mins
sh.mu.Lock()
sh.checkoutTime = time.Now().Add(-time.Hour)
sh.lastUseTime = time.Now().Add(-time.Hour)
sh.mu.Unlock()

// force run task to clean up unexpected long-running sessions
Expand Down Expand Up @@ -1038,6 +1038,72 @@ func TestClient_ReadOnlyTransaction_ReadOptions(t *testing.T) {
}
}

func TestClient_ReadOnlyTransaction_WhenMultipleOperations_SessionLastUseTimeShouldBeUpdated(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
idleTimeThreshold: 30 * time.Millisecond,
},
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
SimulatedExecutionTime{
MinimumExecutionTime: 20 * time.Millisecond,
})
server.TestSpanner.PutExecutionTime(MethodStreamingRead,
SimulatedExecutionTime{
MinimumExecutionTime: 20 * time.Millisecond,
})
ctx := context.Background()
p := client.idleSessions

roTxn := client.ReadOnlyTransaction()
defer roTxn.Close()
iter := roTxn.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
iter.Next()
iter.Stop()

// Get the session last use time.
roTxn.sh.mu.Lock()
sessionPrevLastUseTime := roTxn.sh.lastUseTime
roTxn.sh.mu.Unlock()

iter = roTxn.Read(ctx, "FOO", AllKeys(), []string{"BAR"})
iter.Next()
iter.Stop()

// Get the latest session last use time
roTxn.sh.mu.Lock()
sessionLatestLastUseTime := roTxn.sh.lastUseTime
sessionCheckoutTime := roTxn.sh.checkoutTime
roTxn.sh.mu.Unlock()

// sessionLatestLastUseTime should not be equal to sessionPrevLastUseTime.
// This is because session lastUse time should be updated whenever a new operation is being executed on the transaction.
if (sessionLatestLastUseTime.Sub(sessionPrevLastUseTime)).Milliseconds() <= 0 {
t.Fatalf("Session lastUseTime times should not be equal")
}

if (time.Now().Sub(sessionPrevLastUseTime)).Milliseconds() < 40 {
t.Fatalf("Expected session to be checkedout for more than 40 milliseconds")
}
if (time.Now().Sub(sessionCheckoutTime)).Milliseconds() < 40 {
t.Fatalf("Expected session to be checkedout for more than 40 milliseconds")
}
// force run task to clean up unexpected long-running sessions whose lastUseTime >= 3sec.
// The session should not be cleaned since the latest operation on the transaction has updated the lastUseTime.
p.removeLongRunningSessions()
if p.numOfLeakedSessionsRemoved > 0 {
t.Fatalf("Expected session to not get cleaned by background maintainer")
}
}

func setQueryOptionsEnvVars(opts *sppb.ExecuteSqlRequest_QueryOptions) func() {
os.Setenv("SPANNER_OPTIMIZER_VERSION", opts.OptimizerVersion)
os.Setenv("SPANNER_OPTIMIZER_STATISTICS_PACKAGE", opts.OptimizerStatisticsPackage)
Expand Down Expand Up @@ -1460,10 +1526,10 @@ func TestClient_ReadWriteTransaction_WhenLongRunningSessionCleaned_TransactionSh
return status.Errorf(codes.FailedPrecondition, "Row count mismatch\nGot: %v\nWant: %v", g, w)
}

// Simulate the session to be checked out for more than 60 mins.
// Simulate the session to be last used before 60 mins.
// The background task cleans up this long-running session.
tx.sh.mu.Lock()
tx.sh.checkoutTime = time.Now().Add(-time.Hour)
tx.sh.lastUseTime = time.Now().Add(-time.Hour)
if g, w := tx.sh.eligibleForLongRunning, false; g != w {
tx.sh.mu.Unlock()
return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w)
Expand All @@ -1489,6 +1555,73 @@ func TestClient_ReadWriteTransaction_WhenLongRunningSessionCleaned_TransactionSh
}
}

func TestClient_ReadWriteTransaction_WhenMultipleOperations_SessionLastUseTimeShouldBeUpdated(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
InactiveTransactionRemovalOptions: InactiveTransactionRemovalOptions{
actionOnInactiveTransaction: WarnAndClose,
idleTimeThreshold: 30 * time.Millisecond,
},
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(MethodExecuteSql,
SimulatedExecutionTime{
MinimumExecutionTime: 20 * time.Millisecond,
})
ctx := context.Background()
p := client.idleSessions
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
// Execute first operation on the transaction
_, err := tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}

// Get the session last use time.
tx.sh.mu.Lock()
sessionPrevLastUseTime := tx.sh.lastUseTime
tx.sh.mu.Unlock()

// Execute second operation on the transaction
_, err = tx.Update(ctx, NewStatement(UpdateBarSetFoo))
if err != nil {
return err
}
// Get the latest session last use time
tx.sh.mu.Lock()
sessionLatestLastUseTime := tx.sh.lastUseTime
sessionCheckoutTime := tx.sh.checkoutTime
tx.sh.mu.Unlock()

// sessionLatestLastUseTime should not be equal to sessionPrevLastUseTime.
// This is because session lastUse time should be updated whenever a new operation is being executed on the transaction.
if (sessionLatestLastUseTime.Sub(sessionPrevLastUseTime)).Milliseconds() <= 0 {
t.Fatalf("Session lastUseTime times should not be equal")
}

if (time.Now().Sub(sessionPrevLastUseTime)).Milliseconds() < 40 {
t.Fatalf("Expected session to be checkedout for more than 40 milliseconds")
}
if (time.Now().Sub(sessionCheckoutTime)).Milliseconds() < 40 {
t.Fatalf("Expected session to be checkedout for more than 40 milliseconds")
}
// force run task to clean up unexpected long-running sessions whose lastUseTime >= 3sec.
// The session should not be cleaned since the latest operation on the transaction has updated the lastUseTime.
p.removeLongRunningSessions()
if p.numOfLeakedSessionsRemoved > 0 {
t.Fatalf("Expected session to not get cleaned by background maintainer")
}
return nil
})
if err != nil {
t.Fatal(err)
}
}

func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteBatchUpdate(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1546,7 +1679,7 @@ func TestClient_ReadWriteTransaction_WhenLongRunningExecuteBatchUpdate_TakeNoAct
if attempts == 2 {
// Simulate the session to be long-running. The background task should not clean up this long-running session.
tx.sh.mu.Lock()
tx.sh.checkoutTime = time.Now().Add(-time.Hour)
tx.sh.lastUseTime = time.Now().Add(-time.Hour)
if g, w := tx.sh.eligibleForLongRunning, true; g != w {
tx.sh.mu.Unlock()
return status.Errorf(codes.FailedPrecondition, "isLongRunningTransaction value mismatch\nGot: %v\nWant: %v", g, w)
Expand Down
3 changes: 3 additions & 0 deletions spanner/pdml.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
// Note that PDML transactions cannot be committed or rolled back.
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) {
var md metadata.MD
sh.updateLastUseTime()
// Begin transaction.
res, err := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
Session: sh.getID(),
Expand All @@ -122,6 +123,8 @@ func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlReq
req.Transaction = &sppb.TransactionSelector{
Selector: &sppb.TransactionSelector_Id{Id: res.Id},
}

sh.updateLastUseTime()
resultSet, err := sh.getClient().ExecuteSql(ctx, req, gax.WithGRPCOptions(grpc.Header(&md)))
if getGFELatencyMetricsFlag() && md != nil && sh.session.pool != nil {
err := captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_ExecuteSql")
Expand Down
16 changes: 14 additions & 2 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type sessionHandle struct {
session *session
// checkoutTime is the time the session was checked out of the pool.
checkoutTime time.Time
// lastUseTime is the time the session was last used after checked out of the pool.
lastUseTime time.Time
// trackedSessionHandle is the linked list node which links the session to
// the list of tracked session handles. trackedSessionHandle is only set if
// TrackSessionHandles has been enabled in the session pool configuration.
Expand Down Expand Up @@ -118,6 +120,7 @@ func (sh *sessionHandle) recycle() {
sh.session = nil
sh.trackedSessionHandle = nil
sh.checkoutTime = time.Time{}
sh.lastUseTime = time.Time{}
sh.stack = nil
sh.mu.Unlock()
s.recycle()
Expand Down Expand Up @@ -187,6 +190,7 @@ func (sh *sessionHandle) destroy() {
sh.session = nil
sh.trackedSessionHandle = nil
sh.checkoutTime = time.Time{}
sh.lastUseTime = time.Time{}
sh.stack = nil
sh.mu.Unlock()

Expand All @@ -199,6 +203,14 @@ func (sh *sessionHandle) destroy() {
s.destroy(false)
}

func (sh *sessionHandle) updateLastUseTime() {
sh.mu.Lock()
defer sh.mu.Unlock()
if sh.session != nil {
sh.lastUseTime = time.Now()
}
}

// session wraps a Cloud Spanner session ID through which transactions are
// created and executed.
type session struct {
Expand Down Expand Up @@ -712,7 +724,7 @@ func (p *sessionPool) getLongRunningSessionsLocked() []*sessionHandle {
for element != nil {
sh := element.Value.(*sessionHandle)
sh.mu.Lock()
diff := time.Now().Sub(sh.checkoutTime)
diff := time.Now().Sub(sh.lastUseTime)
if !sh.eligibleForLongRunning && diff.Seconds() >= p.idleTimeThreshold.Seconds() {
if (p.actionOnInactiveTransaction == Warn || p.actionOnInactiveTransaction == WarnAndClose) && !sh.isSessionLeakLogged {
if p.actionOnInactiveTransaction == Warn {
Expand Down Expand Up @@ -880,7 +892,7 @@ var errGetSessionTimeout = spannerErrorf(codes.Canceled, "timeout / context canc
// stack if the session pool has been configured to track the call stacks of
// sessions being checked out of the pool.
func (p *sessionPool) newSessionHandle(s *session) (sh *sessionHandle) {
sh = &sessionHandle{session: s, checkoutTime: time.Now()}
sh = &sessionHandle{session: s, checkoutTime: time.Now(), lastUseTime: time.Now()}
if p.TrackSessionHandles || p.actionOnInactiveTransaction == Warn || p.actionOnInactiveTransaction == WarnAndClose || p.actionOnInactiveTransaction == Close {
p.mu.Lock()
sh.trackedSessionHandle = p.trackedSessionHandles.PushBack(sh)
Expand Down
Loading
Loading