Skip to content

Commit

Permalink
stopper: remove RunWorker and ShouldStop
Browse files Browse the repository at this point in the history
`Stopper` has grown organically over the years. Initially, we were
trying really hard to give us an ordered shutdown regimen in which a
cluster would continue to "work" at a basic level but would only shed
incoming load (until no more load was there). This was ultimately
abandoned (today we have higher-level primitives for such things, like
the Drain RPC), but the corresponding unwieldy APIs not phased out.

Roughly we were left with two types of goroutines:

- "Workers", which were expected to be long-running and would not be blocked
from starting by the stopper (we probably initially assumed that they
would all be spun up by the time the stopper could possibly stop, which
has not been true for a long time and will likely never be again), and
- "Tasks", which are everything else and which are no longer allowed to
start once `stopper.Stop()` has been called.

In practice, this lead to a lot of confusion and undesired behavior. In
particular, a common workaround in our code was to retrofit the "don't
allow starting if already shutting down" behavior for workers by
wrapping their creation in a synchronous task.

It also complicated the shutdown semantics because there were three
stages:

- quiescing: i.e. no more new tasks but there are still tasks running
- stopping: i.e. no more tasks, but there could still be workers
- stopped-A: the workers are gone now, but we still have to run the
  closers!
- stopped-B: the closers have also run (signals `IsStopped`).

This commit rips off the band-aid:

1. there are no more workers. Everything is a task and can run as long
   as it likes.
2. there is no more `ShouldStop` and consequently no confusion about
   which one to listen to (listening to `ShouldStop` only in a task
   is a deadlock - no longer possible).

Now, on Stop(), the Stopper

- invokes Quiesce, which causes the Stopper to refuse new work
  (that is, its Run* family of methods starts returning ErrUnavailable),
  closes the channel returned by ShouldQuiesce, and blocks until
  until no more tasks are tracked, then
- runs all of the methods supplied to AddCloser, then
- closes the IsStopped channel.

The `stop` package hasn't gotten TLC in many years, so there is a lot
more one might want to do, but that is for another day.

Release note: None
  • Loading branch information
tbg committed Jan 22, 2021
1 parent 8854d95 commit 8c5253b
Show file tree
Hide file tree
Showing 71 changed files with 524 additions and 499 deletions.
6 changes: 3 additions & 3 deletions pkg/acceptance/cluster/dockercluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func CreateDocker(
ctx context.Context, cfg TestConfig, volumesDir string, stopper *stop.Stopper,
) *DockerCluster {
select {
case <-stopper.ShouldStop():
case <-stopper.ShouldQuiesce():
// The stopper was already closed, exit early.
os.Exit(1)
default:
Expand Down Expand Up @@ -563,7 +563,7 @@ func (l *DockerCluster) processEvent(ctx context.Context, event events.Message)

// An event on any other container is unexpected. Die.
select {
case <-l.stopper.ShouldStop():
case <-l.stopper.ShouldQuiesce():
case <-l.monitorCtx.Done():
default:
// There is a very tiny race here: the signal handler might be closing the
Expand Down Expand Up @@ -698,7 +698,7 @@ func (l *DockerCluster) AssertAndStop(ctx context.Context, t testing.TB) {
func (l *DockerCluster) stop(ctx context.Context) {
if *waitOnStop {
log.Infof(ctx, "waiting for interrupt")
<-l.stopper.ShouldStop()
<-l.stopper.ShouldQuiesce()
}

log.Infof(ctx, "stopping")
Expand Down
8 changes: 1 addition & 7 deletions pkg/acceptance/test_acceptance.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,7 @@ func RunTests(m *testing.M) int {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
<-sig
select {
case <-stopper.ShouldStop():
default:
// There is a very tiny race here: the cluster might be closing
// the stopper simultaneously.
stopper.Stop(ctx)
}
stopper.Stop(ctx)
}()
return m.Run()
}
2 changes: 1 addition & 1 deletion pkg/acceptance/util_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func StartCluster(ctx context.Context, t *testing.T, cfg cluster.TestConfig) (c

testutils.SucceedsSoon(t, func() error {
select {
case <-stopper.ShouldStop():
case <-stopper.ShouldQuiesce():
t.Fatal("interrupted")
case <-time.After(time.Second):
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3414,7 +3414,7 @@ func TestBackupRestoreWithConcurrentWrites(t *testing.T) {
var allowErrors int32
for task := 0; task < numBackgroundTasks; task++ {
taskNum := task
tc.Stopper().RunWorker(context.Background(), func(context.Context) {
_ = tc.Stopper().RunAsyncTask(context.Background(), "bg-task", func(context.Context) {
conn := tc.Conns[taskNum%len(tc.Conns)]
// Use different sql gateways to make sure leasing is right.
if err := startBackgroundWrites(tc.Stopper(), conn, rows, bgActivity, &allowErrors); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,14 @@ func (connectorFactory) NewConnector(
// cluster's ID and set Connector.rpcContext.ClusterID.
func (c *Connector) Start(ctx context.Context) error {
startupC := c.startupC
c.rpcContext.Stopper.RunWorker(context.Background(), func(ctx context.Context) {
if err := c.rpcContext.Stopper.RunAsyncTask(context.Background(), "connector", func(ctx context.Context) {
ctx = c.AnnotateCtx(ctx)
ctx, cancel := c.rpcContext.Stopper.WithCancelOnQuiesce(ctx)
defer cancel()
c.runGossipSubscription(ctx)
})
}); err != nil {
return err
}
// Synchronously block until the first GossipSubscription event.
select {
case <-startupC:
Expand Down
7 changes: 3 additions & 4 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,10 @@ func TestConnectorRetriesUnreachable(t *testing.T) {
// Decompose netutil.ListenAndServeGRPC so we can listen before serving.
ln, err := net.Listen(util.TestAddr.Network(), util.TestAddr.String())
require.NoError(t, err)
stopper.RunWorker(ctx, func(context.Context) {
stopper.AddCloser(stop.CloserFn(s.Stop))
_ = stopper.RunAsyncTask(ctx, "wait-quiesce", func(context.Context) {
<-stopper.ShouldQuiesce()
netutil.FatalIfUnexpected(ln.Close())
<-stopper.ShouldStop()
s.Stop()
})

// Add listen address into list of other bogus addresses.
Expand All @@ -401,7 +400,7 @@ func TestConnectorRetriesUnreachable(t *testing.T) {

// Begin serving on gRPC server. Connector should quickly connect
// and complete startup.
stopper.RunWorker(ctx, func(context.Context) {
_ = stopper.RunAsyncTask(ctx, "serve", func(context.Context) {
netutil.FatalIfUnexpected(s.Serve(ln))
})
require.NoError(t, <-startedC)
Expand Down
9 changes: 1 addition & 8 deletions pkg/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,7 @@ func (c *cliTest) stopServer() {
if c.TestServer != nil {
log.Infof(context.Background(), "stopping server at %s / %s",
c.ServingRPCAddr(), c.ServingSQLAddr())
select {
case <-c.Stopper().ShouldStop():
// If ShouldStop() doesn't block, that means someone has already
// called Stop(). We just need to wait.
<-c.Stopper().IsStopped()
default:
c.Stopper().Stop(context.Background())
}
c.Stopper().Stop(context.Background())
}
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/cli/debug_synctest.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func runSyncer(

waitFailure := time.After(time.Duration(rand.Int63n(5 * time.Second.Nanoseconds())))

stopper.RunWorker(ctx, func(ctx context.Context) {
if err := stopper.RunAsyncTask(ctx, "syncer", func(ctx context.Context) {
<-waitFailure
if err := nemesis.On(); err != nil {
panic(err)
Expand All @@ -167,7 +167,9 @@ func runSyncer(
}
}()
<-stopper.ShouldQuiesce()
})
}); err != nil {
return 0, err
}

ch := make(chan os.Signal, 1)
signal.Notify(ch, drainSignals...)
Expand Down
6 changes: 4 additions & 2 deletions pkg/cli/demo_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ func (c *transientCluster) runWorkload(
log.Warningf(ctx, "error running workload query: %+v", err)
}
select {
case <-c.s.Stopper().ShouldStop():
case <-c.s.Stopper().ShouldQuiesce():
return
default:
}
Expand All @@ -761,7 +761,9 @@ func (c *transientCluster) runWorkload(
// As the SQL shell is tied to `c.s`, this means we want to tie the workload
// onto this as we want the workload to stop when the server dies,
// rather than the cluster. Otherwise, interrupts on cockroach demo hangs.
c.s.Stopper().RunWorker(ctx, workloadFun(workerFn))
if err := c.s.Stopper().RunAsyncTask(ctx, "workload", workloadFun(workerFn)); err != nil {
return err
}
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ If problems persist, please see %s.`
log.StartAlwaysFlush()
return err

case <-stopper.ShouldStop():
case <-stopper.ShouldQuiesce():
// Server is being stopped externally and our job is finished
// here since we don't know if it's a graceful shutdown or not.
<-stopper.IsStopped()
Expand Down Expand Up @@ -826,7 +826,7 @@ If problems persist, please see %s.`
select {
case <-ticker.C:
log.Ops.Infof(context.Background(), "%d running tasks", stopper.NumTasks())
case <-stopper.ShouldStop():
case <-stopper.IsStopped():
return
case <-stopWithoutDrain:
return
Expand Down
17 changes: 12 additions & 5 deletions pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (r *testRunner) Run(
for i := 0; i < parallelism; i++ {
i := i // Copy for closure.
wg.Add(1)
stopper.RunWorker(ctx, func(ctx context.Context) {
if err := stopper.RunAsyncTask(ctx, "worker", func(ctx context.Context) {
defer wg.Done()

if err := r.runWorker(
Expand All @@ -284,15 +284,22 @@ func (r *testRunner) Run(
msg := fmt.Sprintf("Worker %d returned with error. Quiescing. Error: %+v", i, err)
shout(ctx, l, lopt.stdout, msg)
errs.AddErr(err)
// Quiesce the stopper. This will cause all workers to not pick up more
// tests after finishing the currently running one.
stopper.Quiesce(ctx)
// Stop the stopper. This will cause all workers to not pick up more
// tests after finishing the currently running one. We add one to the
// WaitGroup so that wg.Wait() will also wait for the stopper.
wg.Add(1)
go func() {
defer wg.Done()
stopper.Stop(ctx)
}()
// Interrupt everybody waiting for resources.
if qp != nil {
qp.Close(msg)
}
}
})
}); err != nil {
wg.Done()
}
}

// Wait for all the workers to finish.
Expand Down
15 changes: 10 additions & 5 deletions pkg/gossip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (c *client) startLocked(
g.outgoing.addPlaceholder()

ctx, cancel := context.WithCancel(c.AnnotateCtx(context.Background()))
stopper.RunWorker(ctx, func(ctx context.Context) {
if err := stopper.RunAsyncTask(ctx, "gossip-client", func(ctx context.Context) {
var wg sync.WaitGroup
defer func() {
// This closes the outgoing stream, causing any attempt to send or
Expand Down Expand Up @@ -133,7 +133,9 @@ func (c *client) startLocked(
g.mu.RUnlock()
}
}
})
}); err != nil {
disconnected <- c
}
}

// close stops the client gossip loop and returns immediately.
Expand Down Expand Up @@ -311,7 +313,7 @@ func (c *client) gossip(
// This wait group is used to allow the caller to wait until gossip
// processing is terminated.
wg.Add(1)
stopper.RunWorker(ctx, func(ctx context.Context) {
if err := stopper.RunAsyncTask(ctx, "client-gossip", func(ctx context.Context) {
defer wg.Done()

errCh <- func() error {
Expand All @@ -335,7 +337,10 @@ func (c *client) gossip(
}
}
}()
})
}); err != nil {
wg.Done()
return err
}

// We attempt to defer registration of the callback until we've heard a
// response from the remote node which will contain the remote's high water
Expand Down Expand Up @@ -366,7 +371,7 @@ func (c *client) gossip(
select {
case <-c.closer:
return nil
case <-stopper.ShouldStop():
case <-stopper.ShouldQuiesce():
return nil
case err := <-errCh:
return err
Expand Down
16 changes: 7 additions & 9 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,12 +1296,12 @@ func (g *Gossip) getNextBootstrapAddressLocked() net.Addr {
// lost and requires re-bootstrapping.
func (g *Gossip) bootstrap() {
ctx := g.AnnotateCtx(context.Background())
g.server.stopper.RunWorker(ctx, func(ctx context.Context) {
_ = g.server.stopper.RunAsyncTask(ctx, "gossip-bootstrap", func(ctx context.Context) {
ctx = logtags.AddTag(ctx, "bootstrap", nil)
var bootstrapTimer timeutil.Timer
defer bootstrapTimer.Stop()
for {
if g.server.stopper.RunTask(ctx, "gossip.Gossip: bootstrap ", func(ctx context.Context) {
func(ctx context.Context) {
g.mu.Lock()
defer g.mu.Unlock()
haveClients := g.outgoing.len() > 0
Expand All @@ -1322,9 +1322,7 @@ func (g *Gossip) bootstrap() {
g.maybeSignalStatusChangeLocked()
}
}
}) != nil {
return
}
}(ctx)

// Pause an interval before next possible bootstrap.
bootstrapTimer.Reset(g.bootstrapInterval)
Expand All @@ -1333,7 +1331,7 @@ func (g *Gossip) bootstrap() {
case <-bootstrapTimer.C:
bootstrapTimer.Read = true
// continue
case <-g.server.stopper.ShouldStop():
case <-g.server.stopper.ShouldQuiesce():
return
}
log.Eventf(ctx, "idling until bootstrap required")
Expand All @@ -1342,7 +1340,7 @@ func (g *Gossip) bootstrap() {
case <-g.stalledCh:
log.Eventf(ctx, "detected stall; commencing bootstrap")
// continue
case <-g.server.stopper.ShouldStop():
case <-g.server.stopper.ShouldQuiesce():
return
}
}
Expand All @@ -1361,7 +1359,7 @@ func (g *Gossip) bootstrap() {
// is notified via the stalled conditional variable.
func (g *Gossip) manage() {
ctx := g.AnnotateCtx(context.Background())
g.server.stopper.RunWorker(ctx, func(ctx context.Context) {
_ = g.server.stopper.RunAsyncTask(ctx, "gossip-manage", func(ctx context.Context) {
clientsTimer := timeutil.NewTimer()
cullTimer := timeutil.NewTimer()
stallTimer := timeutil.NewTimer()
Expand All @@ -1374,7 +1372,7 @@ func (g *Gossip) manage() {
stallTimer.Reset(jitteredInterval(g.stallInterval))
for {
select {
case <-g.server.stopper.ShouldStop():
case <-g.server.stopper.ShouldQuiesce():
return
case c := <-g.disconnected:
g.doDisconnected(c)
Expand Down
2 changes: 1 addition & 1 deletion pkg/gossip/infostore.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func newInfoStore(
callbackCh: make(chan struct{}, 1),
}

is.stopper.RunWorker(context.Background(), func(ctx context.Context) {
_ = is.stopper.RunAsyncTask(context.Background(), "infostore", func(ctx context.Context) {
for {
for {
is.callbackWorkMu.Lock()
Expand Down
14 changes: 7 additions & 7 deletions pkg/gossip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,8 @@ func (s *server) Gossip(stream Gossip_GossipServer) error {

errCh := make(chan error, 1)

// Starting workers in a task prevents data races during shutdown.
if err := s.stopper.RunTask(ctx, "gossip.server: receiver", func(ctx context.Context) {
s.stopper.RunWorker(ctx, func(ctx context.Context) {
errCh <- s.gossipReceiver(ctx, &args, send, stream.Recv)
})
if err := s.stopper.RunAsyncTask(ctx, "gossip receiver", func(ctx context.Context) {
errCh <- s.gossipReceiver(ctx, &args, send, stream.Recv)
}); err != nil {
return err
}
Expand Down Expand Up @@ -379,15 +376,18 @@ func (s *server) start(addr net.Addr) {
broadcast()
}, Redundant)

s.stopper.RunWorker(context.TODO(), func(context.Context) {
waitQuiesce := func(context.Context) {
<-s.stopper.ShouldQuiesce()

s.mu.Lock()
unregister()
s.mu.Unlock()

broadcast()
})
}
if err := s.stopper.RunAsyncTask(context.Background(), "gossip-wait-quiesce", waitQuiesce); err != nil {
waitQuiesce(context.Background())
}
}

func (s *server) status() ServerStatus {
Expand Down
8 changes: 3 additions & 5 deletions pkg/gossip/simulation/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,10 @@ func (n *Network) CreateNode(defaultZoneConfig *zonepb.ZoneConfig) (*Node, error
}
node := &Node{Server: server, Listener: ln, Registry: metric.NewRegistry()}
node.Gossip = gossip.NewTest(0, n.RPCContext, server, n.Stopper, node.Registry, defaultZoneConfig)
n.Stopper.RunWorker(context.TODO(), func(context.Context) {
n.Stopper.AddCloser(stop.CloserFn(server.Stop))
_ = n.Stopper.RunAsyncTask(context.TODO(), "node-wait-quiesce", func(context.Context) {
<-n.Stopper.ShouldQuiesce()
netutil.FatalIfUnexpected(ln.Close())
<-n.Stopper.ShouldStop()
server.Stop()
node.Gossip.EnableSimulationCycler(false)
})
n.Nodes = append(n.Nodes, node)
Expand All @@ -144,10 +143,9 @@ func (n *Network) StartNode(node *Node) error {
encoding.EncodeUint64Ascending(nil, 0), time.Hour); err != nil {
return err
}
n.Stopper.RunWorker(context.TODO(), func(context.Context) {
return n.Stopper.RunAsyncTask(context.TODO(), "start-node", func(context.Context) {
netutil.FatalIfUnexpected(node.Server.Serve(node.Listener))
})
return nil
}

// GetNodeFromID returns the simulation node associated with
Expand Down
Loading

0 comments on commit 8c5253b

Please sign in to comment.