Skip to content

Commit

Permalink
sql,persistedsqlstats: prevent a deadlock during shutdown
Browse files Browse the repository at this point in the history
Prior to this change, the coordination between the stats flusher
task (an async stopper task) and the activity flusher job was
performed using a two-step process:

- the stats persistence task offered to call a callback _function_
  every time a flush would complete.
- the job would _reconfigure the callback function_ on each iteration.
- the function was writing to a channel that was subsequently
  read by the job iteration body.

This approach was defective in 3 ways:

1. if the job iteration body would exit (e.g. due to a server drain)
   *after* it installed the callback fn, but *before* the stats flusher
   would read and call the callback fn, a window of time existed
   where a deadlock could occur:
   - the stats flusher retrieves the pointer to the caller fn
     but doesn't call it yet.
   - the job loop exits. From then on it will not read from the
     channel any more.
   - the stats flusher attempts to write to the channel.
     A deadlock occurs.

   (This was seen during testing. See #102574)

   The fix here is to always jointly `select` the write to the channel
   and also a read from the drain/stopper signals, to abort the
   channel operation if a shutdown is requested.

2. the stats flusher task was holding the mutex locked while
   performing the channel write. This is generally bad code hygiene
   as it forces the code maintainer to double-check whether the lock
   and channel operations don't mutually interlock.

   The fix is to use the mutex to retrieve the channel reference, and
   then write to the channel while the mutex is not held any more.

3. the API between the two was defining a *callback function* where
   really just a notification channel was needed.

   The fix here is to simplify the API.

Release note: None
  • Loading branch information
knz committed Apr 28, 2023
1 parent 4183fc2 commit d83ba78
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 17 deletions.
6 changes: 2 additions & 4 deletions pkg/sql/sql_activity_update_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,12 @@ func (j *sqlActivityUpdateJob) Resume(ctx context.Context, execCtxI interface{})

flushDoneSignal := make(chan struct{})
defer func() {
statsFlush.SetFlushDoneCallback(nil)
statsFlush.SetFlushDoneSignalCh(nil)
close(flushDoneSignal)
}()

statsFlush.SetFlushDoneSignalCh(flushDoneSignal)
for {
statsFlush.SetFlushDoneCallback(func() {
flushDoneSignal <- struct{}{}
})
select {
case <-flushDoneSignal:
// A flush was done. Set the timer and wait for it to complete.
Expand Down
36 changes: 23 additions & 13 deletions pkg/sql/sqlstats/persistedsqlstats/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ type PersistedSQLStats struct {
memoryPressureSignal chan struct{}

// Used to signal the flush completed.
flushDoneCallback func()
flushMutex syncutil.Mutex
flushDoneMu struct {
syncutil.Mutex
signalCh chan<- struct{}
}

lastFlushStarted time.Time
jobMonitor jobMonitor
Expand All @@ -94,7 +96,6 @@ func New(cfg *Config, memSQLStats *sslocal.SQLStats) *PersistedSQLStats {
cfg: cfg,
memoryPressureSignal: make(chan struct{}),
drain: make(chan struct{}),
flushDoneCallback: nil,
}

p.jobMonitor = jobMonitor{
Expand Down Expand Up @@ -134,10 +135,11 @@ func (s *PersistedSQLStats) Stop(ctx context.Context) {
s.tasksDoneWG.Wait()
}

func (s *PersistedSQLStats) SetFlushDoneCallback(callBackFunc func()) {
s.flushMutex.Lock()
defer s.flushMutex.Unlock()
s.flushDoneCallback = callBackFunc
// SetFlushDoneSignalCh sets the channel to signal each time a flush has been completed.
func (s *PersistedSQLStats) SetFlushDoneSignalCh(sigCh chan<- struct{}) {
s.flushDoneMu.Lock()
defer s.flushDoneMu.Unlock()
s.flushDoneMu.signalCh = sigCh
}

// GetController returns the controller of the PersistedSQLStats.
Expand Down Expand Up @@ -186,13 +188,21 @@ func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper

s.Flush(ctx)

func() {
s.flushMutex.Lock()
defer s.flushMutex.Unlock()
if s.flushDoneCallback != nil {
s.flushDoneCallback()
// Tell the local activity translator job, if any, that we've
// performed a round of flush.
if sigCh := func() chan<- struct{} {
s.flushDoneMu.Lock()
defer s.flushDoneMu.Unlock()
return s.flushDoneMu.signalCh
}(); sigCh != nil {
select {
case sigCh <- struct{}{}:
case <-stopper.ShouldQuiesce():
return
case <-s.drain:
return
}
}()
}
}
})
if err != nil {
Expand Down

0 comments on commit d83ba78

Please sign in to comment.