diff --git a/pkg/sql/sqlliveness/slinstance/slinstance.go b/pkg/sql/sqlliveness/slinstance/slinstance.go index 9c707748faf2..344024b74d6c 100644 --- a/pkg/sql/sqlliveness/slinstance/slinstance.go +++ b/pkg/sql/sqlliveness/slinstance/slinstance.go @@ -58,10 +58,10 @@ type Writer interface { } type session struct { - id sqlliveness.SessionID - exp hlc.Timestamp - mu struct { + id sqlliveness.SessionID + mu struct { syncutil.RWMutex + exp hlc.Timestamp sessionExpiryCallbacks []func(ctx context.Context) } } @@ -70,7 +70,11 @@ type session struct { func (s *session) ID() sqlliveness.SessionID { return s.id } // Expiration implements the Session interface method Expiration. -func (s *session) Expiration() hlc.Timestamp { return s.exp } +func (s *session) Expiration() hlc.Timestamp { + s.mu.RLock() + defer s.mu.RUnlock() + return s.mu.exp +} func (s *session) RegisterCallbackForSessionExpiry(sExp func(context.Context)) { s.mu.Lock() @@ -86,6 +90,12 @@ func (s *session) invokeSessionExpiryCallbacks(ctx context.Context) { } } +func (s *session) setExpiration(exp hlc.Timestamp) { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.exp = exp +} + // Instance implements the sqlliveness.Instance interface by storing the // liveness sessions in table system.sqlliveness and relying on a heart beat // loop to extend the existing sessions' expirations or creating a new session @@ -141,7 +151,8 @@ func (l *Instance) clearSession(ctx context.Context) { func (l *Instance) createSession(ctx context.Context) (*session, error) { id := sqlliveness.SessionID(uuid.MakeV4().GetBytes()) exp := l.clock.Now().Add(l.ttl().Nanoseconds(), 0) - s := &session{id: id, exp: exp} + s := &session{id: id} + s.mu.exp = exp opts := retry.Options{ InitialBackoff: 10 * time.Millisecond, @@ -152,7 +163,7 @@ func (l *Instance) createSession(ctx context.Context) (*session, error) { var err error for i, r := 0, retry.StartWithCtx(ctx, opts); r.Next(); { i++ - if err = l.storage.Insert(ctx, s.id, s.exp); err != nil { + if err = l.storage.Insert(ctx, s.id, s.Expiration()); err != nil { if ctx.Err() != nil { break } @@ -170,7 +181,7 @@ func (l *Instance) createSession(ctx context.Context) (*session, error) { return s, nil } -func (l *Instance) extendSession(ctx context.Context, s sqlliveness.Session) (bool, error) { +func (l *Instance) extendSession(ctx context.Context, s *session) (bool, error) { exp := l.clock.Now().Add(l.ttl().Nanoseconds(), 0) opts := retry.Options{ @@ -197,9 +208,7 @@ func (l *Instance) extendSession(ctx context.Context, s sqlliveness.Session) (bo return false, nil } - l.mu.Lock() - l.mu.s.exp = exp - l.mu.Unlock() + s.setExpiration(exp) return true, nil }