From 271565552cfeea0571fb3a6dcfce692d8a44e60e Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 28 Apr 2023 15:04:57 +0200 Subject: [PATCH] sql,persistedsqlstats: prevent a deadlock during shutdown Prior to this change, the coordination between the stats flusher task (an async stopper task) and the activity flusher job was performed using a two-step process: - the stats persistence task offered to call a callback _function_ every time a flush would complete. - the job would _reconfigure the callback function_ on each iteration. - the function was writing to a channel that was subsequently read by the job iteration body. This approach was defective in 3 ways: 1. if the job iteration body would exit (e.g. due to a server drain) *after* it installed the callback fn, but *before* the stats flusher would read and call the callback fn, a window of time existed where a deadlock could occur: - the stats flusher retrieves the pointer to the caller fn but doesn't call it yet. - the job loop exits. From then on it will not read from the channel any more. - the stats flusher attempts to write to the channel. A deadlock occurs. (This was seen during testing. See #102574) The fix here is to always jointly `select` the write to the channel and also a read from the drain/stopper signals, to abort the channel operation if a shutdown is requested. 2. the stats flusher task was holding the mutex locked while performing the channel write. This is generally bad code hygiene as it forces the code maintainer to double-check whether the lock and channel operations don't mutually interlock. The fix is to use the mutex to retrieve the channel reference, and then write to the channel while the mutex is not held any more. 3. the API between the two was defining a *callback function* where really just a notification channel was needed. The fix here is to simplify the API. Release note: None --- pkg/sql/sql_activity_update_job.go | 6 ++-- .../sqlstats/persistedsqlstats/provider.go | 36 ++++++++++++------- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/pkg/sql/sql_activity_update_job.go b/pkg/sql/sql_activity_update_job.go index a3ef9c59e8eb..c53630d32b87 100644 --- a/pkg/sql/sql_activity_update_job.go +++ b/pkg/sql/sql_activity_update_job.go @@ -92,14 +92,12 @@ func (j *sqlActivityUpdateJob) Resume(ctx context.Context, execCtxI interface{}) flushDoneSignal := make(chan struct{}) defer func() { - statsFlush.SetFlushDoneCallback(nil) + statsFlush.SetFlushDoneSignalCh(nil) close(flushDoneSignal) }() + statsFlush.SetFlushDoneSignalCh(flushDoneSignal) for { - statsFlush.SetFlushDoneCallback(func() { - flushDoneSignal <- struct{}{} - }) select { case <-flushDoneSignal: // A flush was done. Set the timer and wait for it to complete. diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go index d77c5502f098..574fdc275f12 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/provider.go +++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go @@ -69,8 +69,10 @@ type PersistedSQLStats struct { memoryPressureSignal chan struct{} // Used to signal the flush completed. - flushDoneCallback func() - flushMutex syncutil.Mutex + flushDoneMu struct { + syncutil.Mutex + signalCh chan<- struct{} + } lastFlushStarted time.Time jobMonitor jobMonitor @@ -94,7 +96,6 @@ func New(cfg *Config, memSQLStats *sslocal.SQLStats) *PersistedSQLStats { cfg: cfg, memoryPressureSignal: make(chan struct{}), drain: make(chan struct{}), - flushDoneCallback: nil, } p.jobMonitor = jobMonitor{ @@ -134,10 +135,11 @@ func (s *PersistedSQLStats) Stop(ctx context.Context) { s.tasksDoneWG.Wait() } -func (s *PersistedSQLStats) SetFlushDoneCallback(callBackFunc func()) { - s.flushMutex.Lock() - defer s.flushMutex.Unlock() - s.flushDoneCallback = callBackFunc +// SetFlushDoneSignalCh sets the channel to signal each time a flush has been completed. +func (s *PersistedSQLStats) SetFlushDoneSignalCh(sigCh chan<- struct{}) { + s.flushDoneMu.Lock() + defer s.flushDoneMu.Unlock() + s.flushDoneMu.signalCh = sigCh } // GetController returns the controller of the PersistedSQLStats. @@ -186,13 +188,21 @@ func (s *PersistedSQLStats) startSQLStatsFlushLoop(ctx context.Context, stopper s.Flush(ctx) - func() { - s.flushMutex.Lock() - defer s.flushMutex.Unlock() - if s.flushDoneCallback != nil { - s.flushDoneCallback() + // Tell the local activity translator job, if any, that we've + // performed a round of flush. + if sigCh := func() chan<- struct{} { + s.flushDoneMu.Lock() + defer s.flushDoneMu.Unlock() + return s.flushDoneMu.signalCh + }(); sigCh != nil { + select { + case sigCh <- struct{}{}: + case <-stopper.ShouldQuiesce(): + return + case <-s.drain: + return } - }() + } } }) if err != nil {