Skip to content

Commit

Permalink
stop: rip out expensive task tracking
Browse files Browse the repository at this point in the history
We are likely going to invest more in the stopper-conferred
observability in the near future as part of initiatives such as cockroachdb#58164,
but the task tracking that has been a part of the stopper since near
its conception has not proven to be useful in practice, while at the
same time raising concern about stopper use in hot paths.

When shutting down a running server, we don't particularly care about
leaking goroutines (as the process will end anyway). In tests, we want
to ensure goroutine hygiene, but if a test hangs during `Stop`, it is easier to
look at the stacks to find out why than to consult the task map.

Together, this left little reason to do anything more complicated than
what's left after this commit: we keep track of the running number of
tasks, and wait until this drops to zero.

With this change in, we should feel comfortable using the stopper
extensively and, for example, ensuring that any CRDB goroutine is
anchored in a Stopper task; this is the right approach for test flakes
such as in cockroachdb#51544 and makes sense for all of the reasons mentioned in
issue cockroachdb#58164 as well.

In a future change, we should make the Stopper more configurable and,
through this configurability, we could in principle bring a version of
the task map back (in debug builds) without backing it into the stopper,
though I don't anticipate that we'll want to.

Closes cockroachdb#52894.

Release note: None
  • Loading branch information
tbg committed Feb 3, 2021
1 parent 4736b8d commit 05c5a74
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 145 deletions.
1 change: 0 additions & 1 deletion pkg/util/stop/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ go_library(
deps = [
"//pkg/roachpb",
"//pkg/settings",
"//pkg/util/caller",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/logcrash",
Expand Down
198 changes: 80 additions & 118 deletions pkg/util/stop/stopper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@ import (
"fmt"
"net/http"
"runtime/debug"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/caller"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
Expand Down Expand Up @@ -84,9 +81,7 @@ func HandleDebug(w http.ResponseWriter, r *http.Request) {
defer trackedStoppers.Unlock()
for _, ss := range trackedStoppers.stoppers {
s := ss.s
s.mu.Lock()
fmt.Fprintf(w, "%p: %d tasks\n%s", s, s.mu.numTasks, s.runningTasksLocked())
s.mu.Unlock()
fmt.Fprintf(w, "%p: %d tasks", s, s.NumTasks())
}
}

Expand Down Expand Up @@ -166,15 +161,20 @@ type Stopper struct {
onPanic func(interface{}) // called with recover() on panic on any goroutine

mu struct {
syncutil.Mutex
quiesce *sync.Cond // Conditional variable to wait for outstanding tasks
quiescing bool // true when Stop() has been called
numTasks int // number of outstanding tasks
tasks TaskMap
closers []Closer
idAlloc int // allocates index into qCancels
qCancels map[int]func() // ctx cancels to be called on Quiesce
stopCalled bool // turns all but first call to Stop into noop
syncutil.RWMutex
// _numTasks is the number of active tasks. It is incremented atomically via
// addTask() under the read lock for task acquisition. We need the read lock
// to ensure task creation is prohibited atomically with the quiescing or
// stopping bools set below. When simply reading or decrementing the number
// of tasks, the lock is not necessary.
_numTasks int32
// quiescing and stopping are set in Quiesce and Stop (which calls
// Quiesce). When either is set, no new tasks are allowed and closers
// should execute immediately.
quiescing, stopping bool
closers []Closer
idAlloc int // allocates index into qCancels
qCancels map[int]func() // ctx cancels to be called on Quiesce
}
}

Expand Down Expand Up @@ -205,14 +205,12 @@ func NewStopper(options ...Option) *Stopper {
stopped: make(chan struct{}),
}

s.mu.tasks = TaskMap{}
s.mu.qCancels = map[int]func(){}

for _, opt := range options {
opt.apply(s)
}

s.mu.quiesce = sync.NewCond(&s.mu)
register(s)
return s
}
Expand All @@ -234,6 +232,16 @@ func (s *Stopper) Recover(ctx context.Context) {
}
}

func (s *Stopper) addTask(delta int32) (updated int32) {
return atomic.AddInt32(&s.mu._numTasks, delta)
}

// refuseRLocked returns true if the stopper refuses new tasks. This
// means that the stopper is either quiescing or stopping.
func (s *Stopper) refuseRLocked() bool {
return s.mu.stopping || s.mu.quiescing
}

// AddCloser adds an object to close after the stopper has been stopped.
//
// WARNING: memory resources acquired by this method will stay around for
Expand All @@ -244,7 +252,7 @@ func (s *Stopper) Recover(ctx context.Context) {
func (s *Stopper) AddCloser(c Closer) {
s.mu.Lock()
defer s.mu.Unlock()
if s.mu.stopCalled {
if s.refuseRLocked() {
c.Close()
return
}
Expand All @@ -258,31 +266,26 @@ func (s *Stopper) AddCloser(c Closer) {
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func (s *Stopper) WithCancelOnQuiesce(ctx context.Context) (context.Context, func()) {
return s.withCancel(ctx, s.mu.qCancels, s.quiescer)
return s.withCancel(ctx)
}

func (s *Stopper) withCancel(
ctx context.Context, cancels map[int]func(), cancelCh chan struct{},
) (context.Context, func()) {
func (s *Stopper) withCancel(ctx context.Context) (context.Context, func()) {
var cancel func()
ctx, cancel = context.WithCancel(ctx)
s.mu.Lock()
defer s.mu.Unlock()
select {
case <-cancelCh:
// Cancel immediately.
if s.refuseRLocked() {
cancel()
return ctx, func() {}
default:
id := s.mu.idAlloc
s.mu.idAlloc++
cancels[id] = cancel
return ctx, func() {
cancel()
s.mu.Lock()
defer s.mu.Unlock()
delete(cancels, id)
}
}
id := s.mu.idAlloc
s.mu.idAlloc++
s.mu.qCancels[id] = cancel
return ctx, func() {
cancel()
s.mu.Lock()
defer s.mu.Unlock()
delete(s.mu.qCancels, id)
}
}

Expand All @@ -300,13 +303,13 @@ func (s *Stopper) withCancel(
// Returns an error to indicate that the system is currently quiescing and
// function f was not called.
func (s *Stopper) RunTask(ctx context.Context, taskName string, f func(context.Context)) error {
if !s.runPrelude(taskName) {
if !s.runPrelude() {
return ErrUnavailable
}

// Call f.
defer s.Recover(ctx)
defer s.runPostlude(taskName)
defer s.runPostlude()

f(ctx)
return nil
Expand All @@ -317,13 +320,13 @@ func (s *Stopper) RunTask(ctx context.Context, taskName string, f func(context.C
func (s *Stopper) RunTaskWithErr(
ctx context.Context, taskName string, f func(context.Context) error,
) error {
if !s.runPrelude(taskName) {
if !s.runPrelude() {
return ErrUnavailable
}

// Call f.
defer s.Recover(ctx)
defer s.runPostlude(taskName)
defer s.runPostlude()

return f(ctx)
}
Expand All @@ -334,7 +337,7 @@ func (s *Stopper) RunAsyncTask(
ctx context.Context, taskName string, f func(context.Context),
) error {
taskName = asyncTaskNamePrefix + taskName
if !s.runPrelude(taskName) {
if !s.runPrelude() {
return ErrUnavailable
}

Expand All @@ -343,7 +346,7 @@ func (s *Stopper) RunAsyncTask(
// Call f.
go func() {
defer s.Recover(ctx)
defer s.runPostlude(taskName)
defer s.runPostlude()
defer span.Finish()

f(ctx)
Expand Down Expand Up @@ -393,15 +396,15 @@ func (s *Stopper) RunLimitedAsyncTask(
if ctx.Err() != nil {
return ctx.Err()
}
if !s.runPrelude(taskName) {
if !s.runPrelude() {
return ErrUnavailable
}

ctx, span := tracing.ForkCtxSpan(ctx, taskName)

go func() {
defer s.Recover(ctx)
defer s.runPostlude(taskName)
defer s.runPostlude()
defer alloc.Release()
defer span.Finish()

Expand All @@ -410,63 +413,26 @@ func (s *Stopper) RunLimitedAsyncTask(
return nil
}

func (s *Stopper) runPrelude(taskName string) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.mu.quiescing {
func (s *Stopper) runPrelude() bool {
s.mu.RLock()
defer s.mu.RUnlock()
if s.refuseRLocked() {
return false
}
s.mu.numTasks++
s.mu.tasks[taskName]++
// NB: we run this under the read lock to ensure that `refuseRLocked()` cannot
// change until the task is registered. If we didn't do this, we'd run the
// risk of starting a task after a successful call to Stop().
s.addTask(1)
return true
}

func (s *Stopper) runPostlude(taskName string) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.numTasks--
s.mu.tasks[taskName]--
s.mu.quiesce.Broadcast()
func (s *Stopper) runPostlude() {
s.addTask(-1)
}

// NumTasks returns the number of active tasks.
func (s *Stopper) NumTasks() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.numTasks
}

// A TaskMap is returned by RunningTasks().
type TaskMap map[string]int

// String implements fmt.Stringer and returns a sorted multi-line listing of
// the TaskMap.
func (tm TaskMap) String() string {
var lines []string
for location, num := range tm {
lines = append(lines, fmt.Sprintf("%-6d %s", num, location))
}
sort.Sort(sort.Reverse(sort.StringSlice(lines)))
return strings.Join(lines, "\n")
}

// RunningTasks returns a map containing the count of running tasks keyed by
// call site.
func (s *Stopper) RunningTasks() TaskMap {
s.mu.Lock()
defer s.mu.Unlock()
return s.runningTasksLocked()
}

func (s *Stopper) runningTasksLocked() TaskMap {
m := TaskMap{}
for k := range s.mu.tasks {
if s.mu.tasks[k] == 0 {
continue
}
m[k] = s.mu.tasks[k]
}
return m
return int(atomic.LoadInt32(&s.mu._numTasks))
}

// Stop signals all live workers to stop and then waits for each to
Expand All @@ -475,8 +441,8 @@ func (s *Stopper) runningTasksLocked() TaskMap {
// Stop is idempotent; concurrent calls will block on each other.
func (s *Stopper) Stop(ctx context.Context) {
s.mu.Lock()
stopCalled := s.mu.stopCalled
s.mu.stopCalled = true
stopCalled := s.mu.stopping
s.mu.stopping = true
s.mu.Unlock()

if stopCalled {
Expand All @@ -491,11 +457,6 @@ func (s *Stopper) Stop(ctx context.Context) {
close(s.stopped)
}()

if log.V(1) {
file, line, _ := caller.Lookup(1)
log.Infof(ctx,
"stop has been called from %s:%d, stopping or quiescing all running tasks", file, line)
}
// Don't bother doing stuff cleanly if we're panicking, that would likely
// block. Instead, best effort only. This cleans up the stack traces,
// avoids stalls and helps some tests in `./cli` finish cleanly (where
Expand Down Expand Up @@ -542,25 +503,26 @@ func (s *Stopper) IsStopped() <-chan struct{} {
// Quiesce moves the stopper to state quiescing and waits until all
// tasks complete. This is used from Stop() and unittests.
func (s *Stopper) Quiesce(ctx context.Context) {
defer time.AfterFunc(5*time.Second, func() {
log.Infof(ctx, "quiescing...")
}).Stop()
defer s.Recover(ctx)
s.mu.Lock()
defer s.mu.Unlock()
for _, cancel := range s.mu.qCancels {
cancel()
}
if !s.mu.quiescing {
log.Infof(ctx, "quiescing")
s.mu.quiescing = true
close(s.quiescer)
}
for s.mu.numTasks > 0 {
t := time.AfterFunc(5*time.Second, func() {
// If we're waiting for 5+s without a task terminating, log the ones
// that remain.
log.Infof(ctx, "quiescing; tasks left:\n%s", s.RunningTasks())
})
// Unlock s.mu, wait for the signal, and lock s.mu.
s.mu.quiesce.Wait()
t.Stop()

func() {
s.mu.Lock()
defer s.mu.Unlock()
if !s.mu.quiescing {
s.mu.quiescing = true
close(s.quiescer)
}

for _, cancel := range s.mu.qCancels {
cancel()
}
s.mu.qCancels = nil
}()

for s.NumTasks() > 0 {
time.Sleep(5 * time.Millisecond)
}
}
Loading

0 comments on commit 05c5a74

Please sign in to comment.