Skip to content

Commit

Permalink
tabletmanager: Fully stop shard sync loop on shutdown. (#5667)
Browse files Browse the repository at this point in the history
* tabletmanager: Fully stop shard sync loop on shutdown.

We've received reports of a panic upon vttablet shutdown that we believe
comes from the shard sync loop trying to restart its watch. This change
ensures that the shard sync loop is fully stopped before we close the
topology client.

Signed-off-by: Anthony Yeh <[email protected]>

* consultopo: Respect cancellation of Watch().

Signed-off-by: Anthony Yeh <[email protected]>
  • Loading branch information
enisoc authored Jan 9, 2020
1 parent e958cf5 commit d2d8d5a
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 9 deletions.
3 changes: 2 additions & 1 deletion go/cmd/vttablet/vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func main() {
}

servenv.OnClose(func() {
// stop the agent so that our topo entry gets pruned properly
// Close the agent so that our topo entry gets pruned properly and any
// background goroutines that use the topo connection are stopped.
agent.Close()

// We will still use the topo server during lameduck period
Expand Down
8 changes: 8 additions & 0 deletions go/vt/topo/consultopo/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package consultopo

import (
"errors"
"net/url"

"golang.org/x/net/context"

Expand All @@ -38,11 +39,18 @@ var (
// convertError converts a context error into a topo error. All errors
// are either application-level errors, or context errors.
func convertError(err error, nodePath string) error {
// Unwrap errors from the Go HTTP client.
if urlErr, ok := err.(*url.Error); ok {
err = urlErr.Err
}

// Convert specific sentinel values.
switch err {
case context.Canceled:
return topo.NewError(topo.Interrupted, nodePath)
case context.DeadlineExceeded:
return topo.NewError(topo.Timeout, nodePath)
}

return err
}
11 changes: 6 additions & 5 deletions go/vt/topo/consultopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

var (
watchPollDuration = flag.Duration("topo_consul_watch_poll_duration", 30*time.Second, "time of the long poll for watch queries. Interrupting a watch may wait for up to that time.")
watchPollDuration = flag.Duration("topo_consul_watch_poll_duration", 30*time.Second, "time of the long poll for watch queries.")
)

// Watch is part of the topo.Conn interface.
Expand Down Expand Up @@ -65,14 +65,15 @@ func (s *Server) Watch(ctx context.Context, filePath string) (*topo.WatchData, <
// if it didn't change. So we just check for that
// and swallow the notifications when version matches.
waitIndex := pair.ModifyIndex
pair, _, err = s.kv.Get(nodePath, &api.QueryOptions{
opts := &api.QueryOptions{
WaitIndex: waitIndex,
WaitTime: *watchPollDuration,
})
}
pair, _, err = s.kv.Get(nodePath, opts.WithContext(watchCtx))
if err != nil {
// Serious error.
// Serious error or context cancelled.
notifications <- &topo.WatchData{
Err: err,
Err: convertError(err, nodePath),
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/topo/test/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func checkWatchInterrupt(t *testing.T, ts *topo.Server) {
break
}
if wd.Err != nil {
t.Fatalf("bad error returned for deletion: %v", wd.Err)
t.Fatalf("bad error returned for cancellation: %v", wd.Err)
}
// we got something, better be the right value
got := &topodatapb.SrvKeyspace{}
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vttablet/tabletmanager/action_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ type ActionAgent struct {
// Call agent.notifyShardSync() instead of sending directly to this channel.
_shardSyncChan chan struct{}

// _shardSyncDone is a channel for waiting until the shard sync goroutine
// has really finished after _shardSyncCancel was called.
_shardSyncDone chan struct{}

// _shardSyncCancel is the function to stop the background shard sync goroutine.
_shardSyncCancel context.CancelFunc

Expand Down Expand Up @@ -732,6 +736,11 @@ func (agent *ActionAgent) Start(ctx context.Context, mysqlHost string, mysqlPort
// then prune the tablet topology entry of all post-init fields. This prevents
// stale identifiers from hanging around in topology.
func (agent *ActionAgent) Close() {
// Stop the shard sync loop and wait for it to exit. We do this in Close()
// rather than registering it as an OnTerm hook so the shard sync loop keeps
// running during lame duck.
agent.stopShardSync()

// cleanup initialized fields in the tablet entry
f := func(tablet *topodatapb.Tablet) error {
if err := topotools.CheckOwnership(agent.initialTablet, tablet); err != nil {
Expand All @@ -752,11 +761,16 @@ func (agent *ActionAgent) Close() {
// while taking lameduck into account. However, this may be useful for tests,
// when you want to clean up an agent immediately.
func (agent *ActionAgent) Stop() {
// Stop the shard sync loop and wait for it to exit. This needs to be done
// here in addition to in Close() because tests do not call Close().
agent.stopShardSync()

if agent.UpdateStream != nil {
agent.UpdateStream.Disable()
}

agent.VREngine.Close()

if agent.MysqlDaemon != nil {
agent.MysqlDaemon.Close()
}
Expand Down
16 changes: 15 additions & 1 deletion go/vt/vttablet/tabletmanager/shard_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ var (
// topo watch on the shard record. It gets woken up for tablet state changes by
// a notification signal from setTablet().
func (agent *ActionAgent) shardSyncLoop(ctx context.Context) {
// Make a copy of the channel so we don't race when stopShardSync() clears it.
// Make a copy of the channels so we don't race when stopShardSync() clears them.
agent.mutex.Lock()
notifyChan := agent._shardSyncChan
doneChan := agent._shardSyncDone
agent.mutex.Unlock()

defer close(doneChan)

// retryChan is how we wake up after going to sleep between retries.
// If no retry is pending, this channel will be nil, which means it's fine
// to always select on it -- a nil channel is never ready.
Expand Down Expand Up @@ -232,6 +235,7 @@ func (agent *ActionAgent) startShardSync() {
// be told it needs to recheck the state.
agent.mutex.Lock()
agent._shardSyncChan = make(chan struct{}, 1)
agent._shardSyncDone = make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
agent._shardSyncCancel = cancel
agent.mutex.Unlock()
Expand All @@ -244,13 +248,23 @@ func (agent *ActionAgent) startShardSync() {
}

func (agent *ActionAgent) stopShardSync() {
var doneChan <-chan struct{}

agent.mutex.Lock()
if agent._shardSyncCancel != nil {
agent._shardSyncCancel()
agent._shardSyncCancel = nil
agent._shardSyncChan = nil

doneChan = agent._shardSyncDone
agent._shardSyncDone = nil
}
agent.mutex.Unlock()

// If the shard sync loop was running, wait for it to fully stop.
if doneChan != nil {
<-doneChan
}
}

func (agent *ActionAgent) notifyShardSync() {
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/shard_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ func (sw *shardWatcher) stop() {
return
}

log.Infof("Stopping shard watch...")
sw.watchCancel()

// Drain all remaining watch events.
log.Infof("Stopping shard watch...")
for range sw.watchChan {
}
log.Infof("Shard watch stopped.")

sw.watchChan = nil
sw.watchCancel = nil
Expand Down

0 comments on commit d2d8d5a

Please sign in to comment.