Skip to content

Commit

Permalink
Merge #69290
Browse files Browse the repository at this point in the history
69290: sqlliveness/slinstance: fix data race r=ajwerner a=ajwerner

The Session implementation did not use a mutex around its expiration. That
meant that when the expiration was updated, a data race occurred.

Fixes #69266.

Release justification: bug fixes and low-risk updates to new functionality

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Aug 24, 2021
2 parents 6c05f99 + ee331cd commit ec0fa4e
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions pkg/sql/sqlliveness/slinstance/slinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ type Writer interface {
}

type session struct {
id sqlliveness.SessionID
exp hlc.Timestamp
mu struct {
id sqlliveness.SessionID
mu struct {
syncutil.RWMutex
exp hlc.Timestamp
sessionExpiryCallbacks []func(ctx context.Context)
}
}
Expand All @@ -70,7 +70,11 @@ type session struct {
func (s *session) ID() sqlliveness.SessionID { return s.id }

// Expiration implements the Session interface method Expiration.
func (s *session) Expiration() hlc.Timestamp { return s.exp }
func (s *session) Expiration() hlc.Timestamp {
s.mu.RLock()
defer s.mu.RUnlock()
return s.mu.exp
}

func (s *session) RegisterCallbackForSessionExpiry(sExp func(context.Context)) {
s.mu.Lock()
Expand All @@ -86,6 +90,12 @@ func (s *session) invokeSessionExpiryCallbacks(ctx context.Context) {
}
}

func (s *session) setExpiration(exp hlc.Timestamp) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.exp = exp
}

// Instance implements the sqlliveness.Instance interface by storing the
// liveness sessions in table system.sqlliveness and relying on a heart beat
// loop to extend the existing sessions' expirations or creating a new session
Expand Down Expand Up @@ -141,7 +151,8 @@ func (l *Instance) clearSession(ctx context.Context) {
func (l *Instance) createSession(ctx context.Context) (*session, error) {
id := sqlliveness.SessionID(uuid.MakeV4().GetBytes())
exp := l.clock.Now().Add(l.ttl().Nanoseconds(), 0)
s := &session{id: id, exp: exp}
s := &session{id: id}
s.mu.exp = exp

opts := retry.Options{
InitialBackoff: 10 * time.Millisecond,
Expand All @@ -152,7 +163,7 @@ func (l *Instance) createSession(ctx context.Context) (*session, error) {
var err error
for i, r := 0, retry.StartWithCtx(ctx, opts); r.Next(); {
i++
if err = l.storage.Insert(ctx, s.id, s.exp); err != nil {
if err = l.storage.Insert(ctx, s.id, s.Expiration()); err != nil {
if ctx.Err() != nil {
break
}
Expand All @@ -170,7 +181,7 @@ func (l *Instance) createSession(ctx context.Context) (*session, error) {
return s, nil
}

func (l *Instance) extendSession(ctx context.Context, s sqlliveness.Session) (bool, error) {
func (l *Instance) extendSession(ctx context.Context, s *session) (bool, error) {
exp := l.clock.Now().Add(l.ttl().Nanoseconds(), 0)

opts := retry.Options{
Expand All @@ -197,9 +208,7 @@ func (l *Instance) extendSession(ctx context.Context, s sqlliveness.Session) (bo
return false, nil
}

l.mu.Lock()
l.mu.s.exp = exp
l.mu.Unlock()
s.setExpiration(exp)
return true, nil
}

Expand Down

0 comments on commit ec0fa4e

Please sign in to comment.