Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(spanner): fix negative values for max_in_use_sessions metrics #10449

Merged
merged 6 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ func (sh *sessionHandle) destroy() {
p.trackedSessionHandles.Remove(tracked)
p.mu.Unlock()
}
s.destroy(false)
// since sessionHandle is always used by Transactions we can safely destroy the session with wasInUse=true
s.destroy(false, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is destroy() only called for sessions that are in use? I would expect that it could also be called for a session that is in the list of idle sessions, and in that case it was not in use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sessionHandle is the wrapper which is created only for transactions(to be used) so we can assume any call for destroy using sessionHandle was in use

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@olavloite added comment for the same

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes it clearer, thanks!

}

func (sh *sessionHandle) updateLastUseTime() {
Expand Down Expand Up @@ -374,7 +375,7 @@ func (s *session) recycle() {
// s is rejected by its home session pool because it expired and the
// session pool currently has enough open sessions.
s.pool.mu.Unlock()
s.destroy(false)
s.destroy(false, true)
s.pool.mu.Lock()
}
s.pool.decNumInUseLocked(context.Background())
Expand All @@ -383,15 +384,15 @@ func (s *session) recycle() {

// destroy removes the session from its home session pool, healthcheck queue
// and Cloud Spanner service.
func (s *session) destroy(isExpire bool) bool {
func (s *session) destroy(isExpire, wasInUse bool) bool {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
return s.destroyWithContext(ctx, isExpire)
return s.destroyWithContext(ctx, isExpire, wasInUse)
}

func (s *session) destroyWithContext(ctx context.Context, isExpire bool) bool {
func (s *session) destroyWithContext(ctx context.Context, isExpire, wasInUse bool) bool {
// Remove s from session pool.
if !s.pool.remove(s, isExpire) {
if !s.pool.remove(s, isExpire, wasInUse) {
return false
}
// Unregister s from healthcheck queue.
Expand Down Expand Up @@ -900,14 +901,14 @@ func (p *sessionPool) close(ctx context.Context) {
wg := sync.WaitGroup{}
for _, s := range allSessions {
wg.Add(1)
go deleteSession(ctx, s, &wg)
go closeSession(ctx, s, &wg)
}
wg.Wait()
}

func deleteSession(ctx context.Context, s *session, wg *sync.WaitGroup) {
func closeSession(ctx context.Context, s *session, wg *sync.WaitGroup) {
defer wg.Done()
s.destroyWithContext(ctx, false)
s.destroyWithContext(ctx, false, false)
}

// errInvalidSessionPool is the error for using an invalid session pool.
Expand Down Expand Up @@ -1022,7 +1023,7 @@ func (p *sessionPool) isHealthy(s *session) bool {
if s.getNextCheck().Add(2 * p.hc.getInterval()).Before(time.Now()) {
if err := s.ping(); isSessionNotFoundError(err) {
// The session is already bad, continue to fetch/create a new one.
s.destroy(false)
s.destroy(false, true)
return false
}
p.hc.scheduledHC(s)
Expand Down Expand Up @@ -1133,7 +1134,7 @@ func (p *sessionPool) recycleLocked(s *session) bool {
// remove atomically removes session s from the session pool and invalidates s.
// If isExpire == true, the removal is triggered by session expiration and in
// such cases, only idle sessions can be removed.
func (p *sessionPool) remove(s *session, isExpire bool) bool {
func (p *sessionPool) remove(s *session, isExpire bool, wasInUse bool) bool {
p.mu.Lock()
defer p.mu.Unlock()
if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) {
Expand All @@ -1152,8 +1153,10 @@ func (p *sessionPool) remove(s *session, isExpire bool) bool {
if s.invalidate() {
// Decrease the number of opened sessions.
p.numOpened--
// Decrease the number of sessions in use.
p.decNumInUseLocked(ctx)
// Decrease the number of sessions in use, only when not from idle list.
if wasInUse {
p.decNumInUseLocked(ctx)
}
p.recordStat(ctx, OpenSessionCount, int64(p.numOpened))
// Broadcast that a session has been destroyed.
close(p.mayGetSession)
Expand Down Expand Up @@ -1456,12 +1459,12 @@ func (hc *healthChecker) healthCheck(s *session) {
defer hc.markDone(s)
if !s.pool.isValid() {
// Session pool is closed, perform a garbage collection.
s.destroy(false)
s.destroy(false, true)
return
}
if err := s.ping(); isSessionNotFoundError(err) {
// Ping failed, destroy the session.
s.destroy(false)
s.destroy(false, true)
}
}

Expand Down Expand Up @@ -1643,7 +1646,7 @@ func (hc *healthChecker) shrinkPool(ctx context.Context, shrinkToNumSessions uin
if s != nil {
deleted++
// destroy session as expire.
s.destroy(true)
s.destroy(true, false)
} else {
break
}
Expand Down
54 changes: 50 additions & 4 deletions spanner/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ func TestMinOpenedSessions(t *testing.T) {

// Simulate session expiration.
for _, s := range ss {
s.destroy(true)
s.destroy(true, false)
}

// Wait until the maintainer has had a chance to replenish the pool.
Expand All @@ -1022,6 +1022,52 @@ func TestMinOpenedSessions(t *testing.T) {
}
}

// TestPositiveNumInUseSessions tests that num_in_use session should always be greater than 0.
func TestPositiveNumInUseSessions(t *testing.T) {
t.Parallel()
ctx := context.Background()
_, client, teardown := setupMockedTestServerWithConfig(t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
healthCheckSampleInterval: time.Millisecond,
},
})
defer teardown()
sp := client.idleSessions
defer sp.close(ctx)
// Take ten sessions from session pool and recycle them.
var shs []*sessionHandle
for i := 0; i < 10; i++ {
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("failed to get session(%v): %v", i, err)
}
shs = append(shs, sh)
}
for _, sh := range shs {
sh.recycle()
}
waitFor(t, func() error {
sp.mu.Lock()
if sp.idleList.Len() != 1 {
sp.mu.Unlock()
return errInvalidSessionPool
}
sp.mu.Unlock()
return nil
})
sp.mu.Lock()
defer sp.mu.Unlock()
if int64(sp.numInUse) < 0 {
t.Fatal("numInUse must be >= 0")
}
// There should be still one session left in the idle list.
if sp.idleList.Len() != 1 {
t.Fatalf("got %v sessions in idle lists, want 1. Opened: %d, Creation: %d", sp.idleList.Len(), sp.numOpened, sp.createReqs)
}
}

// TestMaxBurst tests max burst constraint.
func TestMaxBurst(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -1145,11 +1191,11 @@ func TestSessionDestroy(t *testing.T) {
}
s := sh.session
sh.recycle()
if d := s.destroy(true); d || !s.isValid() {
// Session should be remaining because of min open sessions constraint.
if d := s.destroy(true, false); d || !s.isValid() {
// Session should be remaining because of min open session's constraint.
t.Fatalf("session %s invalid, want it to stay alive. (destroy in expiration mode, success: %v)", s.id, d)
}
if d := s.destroy(false); !d || s.isValid() {
if d := s.destroy(false, true); !d || s.isValid() {
// Session should be destroyed.
t.Fatalf("failed to destroy session %s. (destroy in default mode, success: %v)", s.id, d)
}
Expand Down
Loading