diff --git a/spanner/session.go b/spanner/session.go index 8e1fee89ffd1..a31524c56a5b 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -197,7 +197,7 @@ func (sh *sessionHandle) destroy() { p.trackedSessionHandles.Remove(tracked) p.mu.Unlock() } - s.destroy(false) + s.destroy(false, true) } func (sh *sessionHandle) updateLastUseTime() { @@ -374,7 +374,7 @@ func (s *session) recycle() { // s is rejected by its home session pool because it expired and the // session pool currently has enough open sessions. s.pool.mu.Unlock() - s.destroy(false) + s.destroy(false, true) s.pool.mu.Lock() } s.pool.decNumInUseLocked(context.Background()) @@ -383,15 +383,15 @@ func (s *session) recycle() { // destroy removes the session from its home session pool, healthcheck queue // and Cloud Spanner service. -func (s *session) destroy(isExpire bool) bool { +func (s *session) destroy(isExpire, wasInUse bool) bool { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - return s.destroyWithContext(ctx, isExpire) + return s.destroyWithContext(ctx, isExpire, wasInUse) } -func (s *session) destroyWithContext(ctx context.Context, isExpire bool) bool { +func (s *session) destroyWithContext(ctx context.Context, isExpire, wasInUse bool) bool { // Remove s from session pool. - if !s.pool.remove(s, isExpire) { + if !s.pool.remove(s, isExpire, wasInUse) { return false } // Unregister s from healthcheck queue. @@ -907,7 +907,7 @@ func (p *sessionPool) close(ctx context.Context) { func deleteSession(ctx context.Context, s *session, wg *sync.WaitGroup) { defer wg.Done() - s.destroyWithContext(ctx, false) + s.destroyWithContext(ctx, false, true) } // errInvalidSessionPool is the error for using an invalid session pool. @@ -1022,7 +1022,7 @@ func (p *sessionPool) isHealthy(s *session) bool { if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) { if err := s.ping(); isSessionNotFoundError(err) { // The session is already bad, continue to fetch/create a new one. - s.destroy(false) + s.destroy(false, true) return false } p.hc.scheduledHC(s) @@ -1133,7 +1133,7 @@ func (p *sessionPool) recycleLocked(s *session) bool { // remove atomically removes session s from the session pool and invalidates s. // If isExpire == true, the removal is triggered by session expiration and in // such cases, only idle sessions can be removed. -func (p *sessionPool) remove(s *session, isExpire bool) bool { +func (p *sessionPool) remove(s *session, isExpire bool, wasInUse bool) bool { p.mu.Lock() defer p.mu.Unlock() if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) { @@ -1153,7 +1153,7 @@ func (p *sessionPool) remove(s *session, isExpire bool) bool { // Decrease the number of opened sessions. p.numOpened-- // Decrease the number of sessions in use, only when not from idle list. - if !isExpire { + if wasInUse { p.decNumInUseLocked(ctx) } p.recordStat(ctx, OpenSessionCount, int64(p.numOpened)) @@ -1458,12 +1458,12 @@ func (hc *healthChecker) healthCheck(s *session) { defer hc.markDone(s) if !s.pool.isValid() { // Session pool is closed, perform a garbage collection. - s.destroy(false) + s.destroy(false, true) return } if err := s.ping(); isSessionNotFoundError(err) { // Ping failed, destroy the session. - s.destroy(false) + s.destroy(false, true) } } @@ -1645,7 +1645,7 @@ func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uin if s != nil { deleted++ // destroy session as expire. - s.destroy(true) + s.destroy(true, false) } else { break } diff --git a/spanner/session_test.go b/spanner/session_test.go index 3743c2eb4ca2..849c7adb8549 100644 --- a/spanner/session_test.go +++ b/spanner/session_test.go @@ -997,7 +997,7 @@ func TestMinOpenedSessions(t *testing.T) { // Simulate session expiration. for _, s := range ss { - s.destroy(true) + s.destroy(true, false) } // Wait until the maintainer has had a chance to replenish the pool. @@ -1048,16 +1048,15 @@ func TestPositiveNumInUseSessions(t *testing.T) { for _, sh := range shs { sh.recycle() } - - for true { + waitFor(t, func() error { sp.mu.Lock() if sp.idleList.Len() == 1 { sp.mu.Unlock() - break + return errInvalidSessionPool } sp.mu.Unlock() - continue - } + return nil + }) sp.mu.Lock() defer sp.mu.Unlock() if int64(sp.numInUse) < 0 { @@ -1065,11 +1064,8 @@ func TestPositiveNumInUseSessions(t *testing.T) { } // There should be still one session left in either the idle list or in one // of the other opened states due to the min open sessions constraint. - if (sp.idleList.Len() + - int(sp.createReqs)) != 1 { - t.Fatalf( - "got %v sessions in idle lists, want 1. Opened: %d, Creation: %d", - sp.idleList.Len(), sp.numOpened, sp.createReqs) + if sp.idleList.Len() != 1 { + t.Fatalf("got %v sessions in idle lists, want 1. Opened: %d, Creation: %d", sp.idleList.Len(), sp.numOpened, sp.createReqs) } } @@ -1196,11 +1192,11 @@ func TestSessionDestroy(t *testing.T) { } s := sh.session sh.recycle() - if d := s.destroy(true); d || !s.isValid() { - // Session should be remaining because of min open sessions constraint. + if d := s.destroy(true, false); d || !s.isValid() { + // Session should be remaining because of min open session's constraint. t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d) } - if d := s.destroy(false); !d || s.isValid() { + if d := s.destroy(false, true); !d || s.isValid() { // Session should be destroyed. t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d) }