Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm-worker: use run ctx instead global ctx to fix double write #7661

Merged
merged 3 commits into from
Nov 24, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ type Server struct {
wg sync.WaitGroup
kaWg sync.WaitGroup
httpWg sync.WaitGroup
runWg sync.WaitGroup

ctx context.Context
cancel context.CancelFunc

runCtx context.Context
runCancel context.CancelFunc

kaCtx context.Context
kaCancel context.CancelFunc

Expand Down Expand Up @@ -107,6 +111,8 @@ func (s *Server) Start() error {

var m cmux.CMux

s.runCtx, s.runCancel = context.WithCancel(s.ctx)

// protect member from data race. some functions below like GetRelayConfig,
// GetSourceBoundConfig has a built-in timeout so it will not be stuck for a
// long time.
Expand Down Expand Up @@ -143,10 +149,10 @@ func (s *Server) Start() error {

s.setWorker(nil, true)

s.wg.Add(1)
s.runWg.Add(1)
go func() {
s.runBackgroundJob(s.ctx)
s.wg.Done()
s.runBackgroundJob(s.runCtx)
s.runWg.Done()
}()

s.startKeepAlive()
Expand All @@ -162,13 +168,13 @@ func (s *Server) Start() error {
}
}

s.wg.Add(1)
s.runWg.Add(1)
go func(ctx context.Context) {
defer s.wg.Done()
defer s.runWg.Done()
// TODO: handle fatal error from observeRelayConfig
//nolint:errcheck
s.observeRelayConfig(ctx, revRelay)
}(s.ctx)
}(s.runCtx)

bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name)
if err != nil {
Expand All @@ -182,17 +188,17 @@ func (s *Server) Start() error {
log.L().Info("started to handle mysql source", zap.String("sourceCfg", sourceCfg.String()))
}

s.wg.Add(1)
s.runWg.Add(1)
go func(ctx context.Context) {
defer s.wg.Done()
defer s.runWg.Done()
for {
err1 := s.observeSourceBound(ctx, revBound)
if err1 == nil {
return
}
s.restartKeepAlive()
}
}(s.ctx)
}(s.runCtx)

// create a cmux
m = cmux.New(s.rootLis)
Expand Down Expand Up @@ -467,8 +473,8 @@ func (s *Server) doClose() {
return
}
// stop server in advance, stop receiving source bound and relay bound
s.cancel()
s.wg.Wait()
s.runCancel()
s.runWg.Wait()

// stop worker and wait for return(we already lock the whole Sever, so no need use lock to get source worker)
if w := s.getSourceWorker(true); w != nil {
Expand All @@ -494,6 +500,9 @@ func (s *Server) Close() {
s.doClose() // we should stop current sync first, otherwise master may schedule task on new worker while we are closing
s.stopKeepAlive()

s.cancel()
s.wg.Wait()

if s.etcdClient != nil {
s.etcdClient.Close()
}
Expand Down