Skip to content

Commit

Permalink
incorporate changes
Browse files Browse the repository at this point in the history
  • Loading branch information
rahul2393 committed Jun 27, 2024
1 parent 837ea3b commit 208c629
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 27 deletions.
26 changes: 13 additions & 13 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (sh *sessionHandle) destroy() {
p.trackedSessionHandles.Remove(tracked)
p.mu.Unlock()
}
s.destroy(false)
s.destroy(false, true)
}

func (sh *sessionHandle) updateLastUseTime() {
Expand Down Expand Up @@ -374,7 +374,7 @@ func (s *session) recycle() {
// s is rejected by its home session pool because it expired and the
// session pool currently has enough open sessions.
s.pool.mu.Unlock()
s.destroy(false)
s.destroy(false, true)
s.pool.mu.Lock()
}
s.pool.decNumInUseLocked(context.Background())
Expand All @@ -383,15 +383,15 @@ func (s *session) recycle() {

// destroy removes the session from its home session pool, healthcheck queue
// and Cloud Spanner service.
func (s *session) destroy(isExpire bool) bool {
func (s *session) destroy(isExpire, wasInUse bool) bool {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
return s.destroyWithContext(ctx, isExpire)
return s.destroyWithContext(ctx, isExpire, wasInUse)
}

func (s *session) destroyWithContext(ctx context.Context, isExpire bool) bool {
func (s *session) destroyWithContext(ctx context.Context, isExpire, wasInUse bool) bool {
// Remove s from session pool.
if !s.pool.remove(s, isExpire) {
if !s.pool.remove(s, isExpire, wasInUse) {
return false
}
// Unregister s from healthcheck queue.
Expand Down Expand Up @@ -907,7 +907,7 @@ func (p *sessionPool) close(ctx context.Context) {

func deleteSession(ctx context.Context, s *session, wg *sync.WaitGroup) {
defer wg.Done()
s.destroyWithContext(ctx, false)
s.destroyWithContext(ctx, false, true)
}

// errInvalidSessionPool is the error for using an invalid session pool.
Expand Down Expand Up @@ -1022,7 +1022,7 @@ func (p *sessionPool) isHealthy(s *session) bool {
if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) {
if err := s.ping(); isSessionNotFoundError(err) {
// The session is already bad, continue to fetch/create a new one.
s.destroy(false)
s.destroy(false, true)
return false
}
p.hc.scheduledHC(s)
Expand Down Expand Up @@ -1133,7 +1133,7 @@ func (p *sessionPool) recycleLocked(s *session) bool {
// remove atomically removes session s from the session pool and invalidates s.
// If isExpire == true, the removal is triggered by session expiration and in
// such cases, only idle sessions can be removed.
func (p *sessionPool) remove(s *session, isExpire bool) bool {
func (p *sessionPool) remove(s *session, isExpire bool, wasInUse bool) bool {
p.mu.Lock()
defer p.mu.Unlock()
if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) {
Expand All @@ -1153,7 +1153,7 @@ func (p *sessionPool) remove(s *session, isExpire bool) bool {
// Decrease the number of opened sessions.
p.numOpened--
// Decrease the number of sessions in use, only when not from idle list.
if !isExpire {
if wasInUse {
p.decNumInUseLocked(ctx)
}
p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
Expand Down Expand Up @@ -1458,12 +1458,12 @@ func (hc *healthChecker) healthCheck(s *session) {
defer hc.markDone(s)
if !s.pool.isValid() {
// Session pool is closed, perform a garbage collection.
s.destroy(false)
s.destroy(false, true)
return
}
if err := s.ping(); isSessionNotFoundError(err) {
// Ping failed, destroy the session.
s.destroy(false)
s.destroy(false, true)
}
}

Expand Down Expand Up @@ -1645,7 +1645,7 @@ func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uin
if s != nil {
deleted++
// destroy session as expire.
s.destroy(true)
s.destroy(true, false)
} else {
break
}
Expand Down
24 changes: 10 additions & 14 deletions spanner/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ func TestMinOpenedSessions(t *testing.T) {

// Simulate session expiration.
for _, s := range ss {
s.destroy(true)
s.destroy(true, false)
}

// Wait until the maintainer has had a chance to replenish the pool.
Expand Down Expand Up @@ -1048,28 +1048,24 @@ func TestPositiveNumInUseSessions(t *testing.T) {
for _, sh := range shs {
sh.recycle()
}

for true {
waitFor(t, func() error {
sp.mu.Lock()
if sp.idleList.Len() == 1 {
sp.mu.Unlock()
break
return errInvalidSessionPool
}
sp.mu.Unlock()
continue
}
return nil
})
sp.mu.Lock()
defer sp.mu.Unlock()
if int64(sp.numInUse) < 0 {
t.Fatal("numInUse must be >= 0")
}
// There should be still one session left in either the idle list or in one
// of the other opened states due to the min open sessions constraint.
if (sp.idleList.Len() +
int(sp.createReqs)) != 1 {
t.Fatalf(
"got %v sessions in idle lists, want 1. Opened: %d, Creation: %d",
sp.idleList.Len(), sp.numOpened, sp.createReqs)
if sp.idleList.Len() != 1 {
t.Fatalf("got %v sessions in idle lists, want 1. Opened: %d, Creation: %d", sp.idleList.Len(), sp.numOpened, sp.createReqs)
}
}

Expand Down Expand Up @@ -1196,11 +1192,11 @@ func TestSessionDestroy(t *testing.T) {
}
s := sh.session
sh.recycle()
if d := s.destroy(true); d || !s.isValid() {
// Session should be remaining because of min open sessions constraint.
if d := s.destroy(true, false); d || !s.isValid() {
// Session should be remaining because of min open session's constraint.
t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d)
}
if d := s.destroy(false); !d || s.isValid() {
if d := s.destroy(false, true); !d || s.isValid() {
// Session should be destroyed.
t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d)
}
Expand Down

0 comments on commit 208c629

Please sign in to comment.