From 6ad748fd6b8781c6813eae7e52f8903b204d96dc Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Thu, 27 Jun 2024 13:41:06 +0530 Subject: [PATCH 1/4] fix(spanner): fix negative values for max_in_use_sessions metrics --- spanner/session_test.go | 45 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/spanner/session_test.go b/spanner/session_test.go index 367242e86cb3..14553469686a 100644 --- a/spanner/session_test.go +++ b/spanner/session_test.go @@ -1022,6 +1022,51 @@ func TestMinOpenedSessions(t *testing.T) { } } +// TestPositiveNumInUseSessions tests that num_in_use session should always be greater than 0. +func TestPositiveNumInUseSessions(t *testing.T) { + t.Parallel() + ctx := context.Background() + _, client, teardown := setupMockedTestServerWithConfig(t, + ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + healthCheckSampleInterval: time.Millisecond, + }, + }) + defer teardown() + sp := client.idleSessions + defer sp.close(ctx) + // Take ten sessions from session pool and recycle them. + var shs []*sessionHandle + for i := 0; i < 10; i++ { + sh, err := sp.take(ctx) + if err != nil { + t.Fatalf("failed to get session(%v): %v", i, err) + } + shs = append(shs, sh) + } + for _, sh := range shs { + sh.recycle() + } + + for sp.idleList.Len() != 1 { + continue + } + sp.mu.Lock() + defer sp.mu.Unlock() + if int64(sp.numInUse) < 0 { + t.Fatal("numInUse must be >= 0") + } + // There should be still one session left in either the idle list or in one + // of the other opened states due to the min open sessions constraint. + if (sp.idleList.Len() + + int(sp.createReqs)) != 1 { + t.Fatalf( + "got %v sessions in idle lists, want 1. Opened: %d, Creation: %d", + sp.idleList.Len(), sp.numOpened, sp.createReqs) + } +} + // TestMaxBurst tests max burst constraint. func TestMaxBurst(t *testing.T) { t.Parallel() From 837ea3bf891230a7cb0031805905c01767cbf4fc Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Thu, 27 Jun 2024 13:57:17 +0530 Subject: [PATCH 2/4] fix failing tests --- spanner/session.go | 6 ++++-- spanner/session_test.go | 8 +++++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/spanner/session.go b/spanner/session.go index b6b8996790fc..8e1fee89ffd1 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -1152,8 +1152,10 @@ func (p *sessionPool) remove(s *session, isExpire bool) bool { if s.invalidate() { // Decrease the number of opened sessions. p.numOpened-- - // Decrease the number of sessions in use. - p.decNumInUseLocked(ctx) + // Decrease the number of sessions in use, only when not from idle list. + if !isExpire { + p.decNumInUseLocked(ctx) + } p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) // Broadcast that a session has been destroyed. close(p.mayGetSession) diff --git a/spanner/session_test.go b/spanner/session_test.go index 14553469686a..3743c2eb4ca2 100644 --- a/spanner/session_test.go +++ b/spanner/session_test.go @@ -1049,7 +1049,13 @@ func TestPositiveNumInUseSessions(t *testing.T) { sh.recycle() } - for sp.idleList.Len() != 1 { + for true { + sp.mu.Lock() + if sp.idleList.Len() == 1 { + sp.mu.Unlock() + break + } + sp.mu.Unlock() continue } sp.mu.Lock() From 78e81d5ac7f2e182b472efd6dfcbec5f44dfe94b Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Thu, 27 Jun 2024 15:29:23 +0530 Subject: [PATCH 3/4] incorporate changes --- spanner/session.go | 26 +++++++++++++------------- spanner/session_test.go | 29 ++++++++++++----------------- 2 files changed, 25 insertions(+), 30 deletions(-) diff --git a/spanner/session.go b/spanner/session.go index 8e1fee89ffd1..a31524c56a5b 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -197,7 +197,7 @@ func (sh *sessionHandle) destroy() { p.trackedSessionHandles.Remove(tracked) p.mu.Unlock() } - s.destroy(false) + s.destroy(false, true) } func (sh *sessionHandle) updateLastUseTime() { @@ -374,7 +374,7 @@ func (s *session) recycle() { // s is rejected by its home session pool because it expired and the // session pool currently has enough open sessions. s.pool.mu.Unlock() - s.destroy(false) + s.destroy(false, true) s.pool.mu.Lock() } s.pool.decNumInUseLocked(context.Background()) @@ -383,15 +383,15 @@ func (s *session) recycle() { // destroy removes the session from its home session pool, healthcheck queue // and Cloud Spanner service. -func (s *session) destroy(isExpire bool) bool { +func (s *session) destroy(isExpire, wasInUse bool) bool { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - return s.destroyWithContext(ctx, isExpire) + return s.destroyWithContext(ctx, isExpire, wasInUse) } -func (s *session) destroyWithContext(ctx context.Context, isExpire bool) bool { +func (s *session) destroyWithContext(ctx context.Context, isExpire, wasInUse bool) bool { // Remove s from session pool. - if !s.pool.remove(s, isExpire) { + if !s.pool.remove(s, isExpire, wasInUse) { return false } // Unregister s from healthcheck queue. @@ -907,7 +907,7 @@ func (p *sessionPool) close(ctx context.Context) { func deleteSession(ctx context.Context, s *session, wg *sync.WaitGroup) { defer wg.Done() - s.destroyWithContext(ctx, false) + s.destroyWithContext(ctx, false, true) } // errInvalidSessionPool is the error for using an invalid session pool. @@ -1022,7 +1022,7 @@ func (p *sessionPool) isHealthy(s *session) bool { if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) { if err := s.ping(); isSessionNotFoundError(err) { // The session is already bad, continue to fetch/create a new one. - s.destroy(false) + s.destroy(false, true) return false } p.hc.scheduledHC(s) @@ -1133,7 +1133,7 @@ func (p *sessionPool) recycleLocked(s *session) bool { // remove atomically removes session s from the session pool and invalidates s. // If isExpire == true, the removal is triggered by session expiration and in // such cases, only idle sessions can be removed. -func (p *sessionPool) remove(s *session, isExpire bool) bool { +func (p *sessionPool) remove(s *session, isExpire bool, wasInUse bool) bool { p.mu.Lock() defer p.mu.Unlock() if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) { @@ -1153,7 +1153,7 @@ func (p *sessionPool) remove(s *session, isExpire bool) bool { // Decrease the number of opened sessions. p.numOpened-- // Decrease the number of sessions in use, only when not from idle list. - if !isExpire { + if wasInUse { p.decNumInUseLocked(ctx) } p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) @@ -1458,12 +1458,12 @@ func (hc *healthChecker) healthCheck(s *session) { defer hc.markDone(s) if !s.pool.isValid() { // Session pool is closed, perform a garbage collection. - s.destroy(false) + s.destroy(false, true) return } if err := s.ping(); isSessionNotFoundError(err) { // Ping failed, destroy the session. - s.destroy(false) + s.destroy(false, true) } } @@ -1645,7 +1645,7 @@ func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uin if s != nil { deleted++ // destroy session as expire. - s.destroy(true) + s.destroy(true, false) } else { break } diff --git a/spanner/session_test.go b/spanner/session_test.go index 3743c2eb4ca2..0a9181b09258 100644 --- a/spanner/session_test.go +++ b/spanner/session_test.go @@ -997,7 +997,7 @@ func TestMinOpenedSessions(t *testing.T) { // Simulate session expiration. for _, s := range ss { - s.destroy(true) + s.destroy(true, false) } // Wait until the maintainer has had a chance to replenish the pool. @@ -1048,28 +1048,23 @@ func TestPositiveNumInUseSessions(t *testing.T) { for _, sh := range shs { sh.recycle() } - - for true { + waitFor(t, func() error { sp.mu.Lock() - if sp.idleList.Len() == 1 { + if sp.idleList.Len() != 1 { sp.mu.Unlock() - break + return errInvalidSessionPool } sp.mu.Unlock() - continue - } + return nil + }) sp.mu.Lock() defer sp.mu.Unlock() if int64(sp.numInUse) < 0 { t.Fatal("numInUse must be >= 0") } - // There should be still one session left in either the idle list or in one - // of the other opened states due to the min open sessions constraint. - if (sp.idleList.Len() + - int(sp.createReqs)) != 1 { - t.Fatalf( - "got %v sessions in idle lists, want 1. Opened: %d, Creation: %d", - sp.idleList.Len(), sp.numOpened, sp.createReqs) + // There should be still one session left in the idle list. + if sp.idleList.Len() != 1 { + t.Fatalf("got %v sessions in idle lists, want 1. Opened: %d, Creation: %d", sp.idleList.Len(), sp.numOpened, sp.createReqs) } } @@ -1196,11 +1191,11 @@ func TestSessionDestroy(t *testing.T) { } s := sh.session sh.recycle() - if d := s.destroy(true); d || !s.isValid() { - // Session should be remaining because of min open sessions constraint. + if d := s.destroy(true, false); d || !s.isValid() { + // Session should be remaining because of min open session's constraint. t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d) } - if d := s.destroy(false); !d || s.isValid() { + if d := s.destroy(false, true); !d || s.isValid() { // Session should be destroyed. t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d) } From 0d4cbcab8bb7ed609af75d65b37912312e23bb3c Mon Sep 17 00:00:00 2001 From: rahul yadav Date: Thu, 27 Jun 2024 16:13:06 +0530 Subject: [PATCH 4/4] add comment --- spanner/session.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/spanner/session.go b/spanner/session.go index a31524c56a5b..727952724d4a 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -197,6 +197,7 @@ func (sh *sessionHandle) destroy() { p.trackedSessionHandles.Remove(tracked) p.mu.Unlock() } + // since sessionHandle is always used by Transactions we can safely destroy the session with wasInUse=true s.destroy(false, true) } @@ -900,14 +901,14 @@ func (p *sessionPool) close(ctx context.Context) { wg := sync.WaitGroup{} for _, s := range allSessions { wg.Add(1) - go deleteSession(ctx, s, &wg) + go closeSession(ctx, s, &wg) } wg.Wait() } -func deleteSession(ctx context.Context, s *session, wg *sync.WaitGroup) { +func closeSession(ctx context.Context, s *session, wg *sync.WaitGroup) { defer wg.Done() - s.destroyWithContext(ctx, false, true) + s.destroyWithContext(ctx, false, false) } // errInvalidSessionPool is the error for using an invalid session pool.