From 05c5a749beb3f0a45775526ef808dc09fc0704df Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 1 Feb 2021 16:05:21 +0100 Subject: [PATCH] stop: rip out expensive task tracking We are likely going to invest more in the stopper-conferred observability in the near future as part of initiatives such as #58164, but the task tracking that has been a part of the stopper since near its conception has not proven to be useful in practice, while at the same time raising concern about stopper use in hot paths. When shutting down a running server, we don't particularly care about leaking goroutines (as the process will end anyway). In tests, we want to ensure goroutine hygiene, but if a test hangs during `Stop`, it is easier to look at the stacks to find out why than to consult the task map. Together, this left little reason to do anything more complicated than what's left after this commit: we keep track of the running number of tasks, and wait until this drops to zero. With this change in, we should feel comfortable using the stopper extensively and, for example, ensuring that any CRDB goroutine is anchored in a Stopper task; this is the right approach for test flakes such as in #51544 and makes sense for all of the reasons mentioned in issue #58164 as well. In a future change, we should make the Stopper more configurable and, through this configurability, we could in principle bring a version of the task map back (in debug builds) without backing it into the stopper, though I don't anticipate that we'll want to. Closes #52894. Release note: None --- pkg/util/stop/BUILD.bazel | 1 - pkg/util/stop/stopper.go | 198 ++++++++++++++-------------------- pkg/util/stop/stopper_test.go | 29 +---- 3 files changed, 83 insertions(+), 145 deletions(-) diff --git a/pkg/util/stop/BUILD.bazel b/pkg/util/stop/BUILD.bazel index b4077bb85e00..30c6ceaa5f0b 100644 --- a/pkg/util/stop/BUILD.bazel +++ b/pkg/util/stop/BUILD.bazel @@ -8,7 +8,6 @@ go_library( deps = [ "//pkg/roachpb", "//pkg/settings", - "//pkg/util/caller", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/log/logcrash", diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index 5f5f61071cdf..ecdfefa83c13 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -15,15 +15,12 @@ import ( "fmt" "net/http" "runtime/debug" - "sort" - "strings" - "sync" + "sync/atomic" "testing" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" - "github.com/cockroachdb/cockroach/pkg/util/caller" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" @@ -84,9 +81,7 @@ func HandleDebug(w http.ResponseWriter, r *http.Request) { defer trackedStoppers.Unlock() for _, ss := range trackedStoppers.stoppers { s := ss.s - s.mu.Lock() - fmt.Fprintf(w, "%p: %d tasks\n%s", s, s.mu.numTasks, s.runningTasksLocked()) - s.mu.Unlock() + fmt.Fprintf(w, "%p: %d tasks", s, s.NumTasks()) } } @@ -166,15 +161,20 @@ type Stopper struct { onPanic func(interface{}) // called with recover() on panic on any goroutine mu struct { - syncutil.Mutex - quiesce *sync.Cond // Conditional variable to wait for outstanding tasks - quiescing bool // true when Stop() has been called - numTasks int // number of outstanding tasks - tasks TaskMap - closers []Closer - idAlloc int // allocates index into qCancels - qCancels map[int]func() // ctx cancels to be called on Quiesce - stopCalled bool // turns all but first call to Stop into noop + syncutil.RWMutex + // _numTasks is the number of active tasks. It is incremented atomically via + // addTask() under the read lock for task acquisition. We need the read lock + // to ensure task creation is prohibited atomically with the quiescing or + // stopping bools set below. When simply reading or decrementing the number + // of tasks, the lock is not necessary. + _numTasks int32 + // quiescing and stopping are set in Quiesce and Stop (which calls + // Quiesce). When either is set, no new tasks are allowed and closers + // should execute immediately. + quiescing, stopping bool + closers []Closer + idAlloc int // allocates index into qCancels + qCancels map[int]func() // ctx cancels to be called on Quiesce } } @@ -205,14 +205,12 @@ func NewStopper(options ...Option) *Stopper { stopped: make(chan struct{}), } - s.mu.tasks = TaskMap{} s.mu.qCancels = map[int]func(){} for _, opt := range options { opt.apply(s) } - s.mu.quiesce = sync.NewCond(&s.mu) register(s) return s } @@ -234,6 +232,16 @@ func (s *Stopper) Recover(ctx context.Context) { } } +func (s *Stopper) addTask(delta int32) (updated int32) { + return atomic.AddInt32(&s.mu._numTasks, delta) +} + +// refuseRLocked returns true if the stopper refuses new tasks. This +// means that the stopper is either quiescing or stopping. +func (s *Stopper) refuseRLocked() bool { + return s.mu.stopping || s.mu.quiescing +} + // AddCloser adds an object to close after the stopper has been stopped. // // WARNING: memory resources acquired by this method will stay around for @@ -244,7 +252,7 @@ func (s *Stopper) Recover(ctx context.Context) { func (s *Stopper) AddCloser(c Closer) { s.mu.Lock() defer s.mu.Unlock() - if s.mu.stopCalled { + if s.refuseRLocked() { c.Close() return } @@ -258,31 +266,26 @@ func (s *Stopper) AddCloser(c Closer) { // Canceling this context releases resources associated with it, so code should // call cancel as soon as the operations running in this Context complete. func (s *Stopper) WithCancelOnQuiesce(ctx context.Context) (context.Context, func()) { - return s.withCancel(ctx, s.mu.qCancels, s.quiescer) + return s.withCancel(ctx) } -func (s *Stopper) withCancel( - ctx context.Context, cancels map[int]func(), cancelCh chan struct{}, -) (context.Context, func()) { +func (s *Stopper) withCancel(ctx context.Context) (context.Context, func()) { var cancel func() ctx, cancel = context.WithCancel(ctx) s.mu.Lock() defer s.mu.Unlock() - select { - case <-cancelCh: - // Cancel immediately. + if s.refuseRLocked() { cancel() return ctx, func() {} - default: - id := s.mu.idAlloc - s.mu.idAlloc++ - cancels[id] = cancel - return ctx, func() { - cancel() - s.mu.Lock() - defer s.mu.Unlock() - delete(cancels, id) - } + } + id := s.mu.idAlloc + s.mu.idAlloc++ + s.mu.qCancels[id] = cancel + return ctx, func() { + cancel() + s.mu.Lock() + defer s.mu.Unlock() + delete(s.mu.qCancels, id) } } @@ -300,13 +303,13 @@ func (s *Stopper) withCancel( // Returns an error to indicate that the system is currently quiescing and // function f was not called. func (s *Stopper) RunTask(ctx context.Context, taskName string, f func(context.Context)) error { - if !s.runPrelude(taskName) { + if !s.runPrelude() { return ErrUnavailable } // Call f. defer s.Recover(ctx) - defer s.runPostlude(taskName) + defer s.runPostlude() f(ctx) return nil @@ -317,13 +320,13 @@ func (s *Stopper) RunTask(ctx context.Context, taskName string, f func(context.C func (s *Stopper) RunTaskWithErr( ctx context.Context, taskName string, f func(context.Context) error, ) error { - if !s.runPrelude(taskName) { + if !s.runPrelude() { return ErrUnavailable } // Call f. defer s.Recover(ctx) - defer s.runPostlude(taskName) + defer s.runPostlude() return f(ctx) } @@ -334,7 +337,7 @@ func (s *Stopper) RunAsyncTask( ctx context.Context, taskName string, f func(context.Context), ) error { taskName = asyncTaskNamePrefix + taskName - if !s.runPrelude(taskName) { + if !s.runPrelude() { return ErrUnavailable } @@ -343,7 +346,7 @@ func (s *Stopper) RunAsyncTask( // Call f. go func() { defer s.Recover(ctx) - defer s.runPostlude(taskName) + defer s.runPostlude() defer span.Finish() f(ctx) @@ -393,7 +396,7 @@ func (s *Stopper) RunLimitedAsyncTask( if ctx.Err() != nil { return ctx.Err() } - if !s.runPrelude(taskName) { + if !s.runPrelude() { return ErrUnavailable } @@ -401,7 +404,7 @@ func (s *Stopper) RunLimitedAsyncTask( go func() { defer s.Recover(ctx) - defer s.runPostlude(taskName) + defer s.runPostlude() defer alloc.Release() defer span.Finish() @@ -410,63 +413,26 @@ func (s *Stopper) RunLimitedAsyncTask( return nil } -func (s *Stopper) runPrelude(taskName string) bool { - s.mu.Lock() - defer s.mu.Unlock() - if s.mu.quiescing { +func (s *Stopper) runPrelude() bool { + s.mu.RLock() + defer s.mu.RUnlock() + if s.refuseRLocked() { return false } - s.mu.numTasks++ - s.mu.tasks[taskName]++ + // NB: we run this under the read lock to ensure that `refuseRLocked()` cannot + // change until the task is registered. If we didn't do this, we'd run the + // risk of starting a task after a successful call to Stop(). + s.addTask(1) return true } -func (s *Stopper) runPostlude(taskName string) { - s.mu.Lock() - defer s.mu.Unlock() - s.mu.numTasks-- - s.mu.tasks[taskName]-- - s.mu.quiesce.Broadcast() +func (s *Stopper) runPostlude() { + s.addTask(-1) } // NumTasks returns the number of active tasks. func (s *Stopper) NumTasks() int { - s.mu.Lock() - defer s.mu.Unlock() - return s.mu.numTasks -} - -// A TaskMap is returned by RunningTasks(). -type TaskMap map[string]int - -// String implements fmt.Stringer and returns a sorted multi-line listing of -// the TaskMap. -func (tm TaskMap) String() string { - var lines []string - for location, num := range tm { - lines = append(lines, fmt.Sprintf("%-6d %s", num, location)) - } - sort.Sort(sort.Reverse(sort.StringSlice(lines))) - return strings.Join(lines, "\n") -} - -// RunningTasks returns a map containing the count of running tasks keyed by -// call site. -func (s *Stopper) RunningTasks() TaskMap { - s.mu.Lock() - defer s.mu.Unlock() - return s.runningTasksLocked() -} - -func (s *Stopper) runningTasksLocked() TaskMap { - m := TaskMap{} - for k := range s.mu.tasks { - if s.mu.tasks[k] == 0 { - continue - } - m[k] = s.mu.tasks[k] - } - return m + return int(atomic.LoadInt32(&s.mu._numTasks)) } // Stop signals all live workers to stop and then waits for each to @@ -475,8 +441,8 @@ func (s *Stopper) runningTasksLocked() TaskMap { // Stop is idempotent; concurrent calls will block on each other. func (s *Stopper) Stop(ctx context.Context) { s.mu.Lock() - stopCalled := s.mu.stopCalled - s.mu.stopCalled = true + stopCalled := s.mu.stopping + s.mu.stopping = true s.mu.Unlock() if stopCalled { @@ -491,11 +457,6 @@ func (s *Stopper) Stop(ctx context.Context) { close(s.stopped) }() - if log.V(1) { - file, line, _ := caller.Lookup(1) - log.Infof(ctx, - "stop has been called from %s:%d, stopping or quiescing all running tasks", file, line) - } // Don't bother doing stuff cleanly if we're panicking, that would likely // block. Instead, best effort only. This cleans up the stack traces, // avoids stalls and helps some tests in `./cli` finish cleanly (where @@ -542,25 +503,26 @@ func (s *Stopper) IsStopped() <-chan struct{} { // Quiesce moves the stopper to state quiescing and waits until all // tasks complete. This is used from Stop() and unittests. func (s *Stopper) Quiesce(ctx context.Context) { + defer time.AfterFunc(5*time.Second, func() { + log.Infof(ctx, "quiescing...") + }).Stop() defer s.Recover(ctx) - s.mu.Lock() - defer s.mu.Unlock() - for _, cancel := range s.mu.qCancels { - cancel() - } - if !s.mu.quiescing { - log.Infof(ctx, "quiescing") - s.mu.quiescing = true - close(s.quiescer) - } - for s.mu.numTasks > 0 { - t := time.AfterFunc(5*time.Second, func() { - // If we're waiting for 5+s without a task terminating, log the ones - // that remain. - log.Infof(ctx, "quiescing; tasks left:\n%s", s.RunningTasks()) - }) - // Unlock s.mu, wait for the signal, and lock s.mu. - s.mu.quiesce.Wait() - t.Stop() + + func() { + s.mu.Lock() + defer s.mu.Unlock() + if !s.mu.quiescing { + s.mu.quiescing = true + close(s.quiescer) + } + + for _, cancel := range s.mu.qCancels { + cancel() + } + s.mu.qCancels = nil + }() + + for s.NumTasks() > 0 { + time.Sleep(5 * time.Millisecond) } } diff --git a/pkg/util/stop/stopper_test.go b/pkg/util/stop/stopper_test.go index d97aac411de1..4cb5ceeddd68 100644 --- a/pkg/util/stop/stopper_test.go +++ b/pkg/util/stop/stopper_test.go @@ -269,6 +269,7 @@ func TestStopperCloserConcurrent(t *testing.T) { func TestStopperNumTasks(t *testing.T) { defer leaktest.AfterTest(t)() s := stop.NewStopper() + defer s.Stop(context.Background()) var tasks []chan bool for i := 0; i < 3; i++ { c := make(chan bool) @@ -279,30 +280,11 @@ func TestStopperNumTasks(t *testing.T) { }); err != nil { t.Fatal(err) } - tm := s.RunningTasks() - if numTypes, numTasks := len(tm), s.NumTasks(); numTypes != 1 || numTasks != i+1 { - t.Errorf("stopper should have %d running tasks, got %d / %+v", i+1, numTasks, tm) - } - m := s.RunningTasks() - if len(m) != 1 { - t.Fatalf("expected exactly one task map entry: %+v", m) - } - for _, v := range m { - if expNum := len(tasks); v != expNum { - t.Fatalf("%d: expected %d tasks, got %d", i, expNum, v) - } + if numTasks := s.NumTasks(); numTasks != i+1 { + t.Errorf("stopper should have %d running tasks, got %d", i+1, numTasks) } } for i, c := range tasks { - m := s.RunningTasks() - if len(m) != 1 { - t.Fatalf("%d: expected exactly one task map entry: %+v", i, m) - } - for _, v := range m { - if expNum := len(tasks[i:]); v != expNum { - t.Fatalf("%d: expected %d tasks, got %d:\n%s", i, expNum, v, m) - } - } // Close the channel to let the task proceed. close(c) expNum := len(tasks[i+1:]) @@ -313,11 +295,6 @@ func TestStopperNumTasks(t *testing.T) { return nil }) } - // The taskmap should've been cleared out. - if m := s.RunningTasks(); len(m) != 0 { - t.Fatalf("task map not empty: %+v", m) - } - s.Stop(context.Background()) } // TestStopperRunTaskPanic ensures that a panic handler can recover panicking