Skip to content

Commit

Permalink
dm-worker: use run ctx instead global ctx to fix double write (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 30, 2022
1 parent 784dd7a commit c164be5
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions dm/dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,14 @@ type Server struct {
wg sync.WaitGroup
kaWg sync.WaitGroup
httpWg sync.WaitGroup
runWg sync.WaitGroup

ctx context.Context
cancel context.CancelFunc

runCtx context.Context
runCancel context.CancelFunc

kaCtx context.Context
kaCancel context.CancelFunc

Expand Down Expand Up @@ -105,6 +109,8 @@ func (s *Server) Start() error {

var m cmux.CMux

s.runCtx, s.runCancel = context.WithCancel(s.ctx)

// protect member from data race. some functions below like GetRelayConfig,
// GetSourceBoundConfig has a built-in timeout so it will not be stuck for a
// long time.
Expand Down Expand Up @@ -141,10 +147,10 @@ func (s *Server) Start() error {

s.setWorker(nil, true)

s.wg.Add(1)
s.runWg.Add(1)
go func() {
s.runBackgroundJob(s.ctx)
s.wg.Done()
s.runBackgroundJob(s.runCtx)
s.runWg.Done()
}()

s.startKeepAlive()
Expand All @@ -160,13 +166,13 @@ func (s *Server) Start() error {
}
}

s.wg.Add(1)
s.runWg.Add(1)
go func(ctx context.Context) {
defer s.wg.Done()
defer s.runWg.Done()
// TODO: handle fatal error from observeRelayConfig
//nolint:errcheck
s.observeRelayConfig(ctx, revRelay)
}(s.ctx)
}(s.runCtx)

bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name)
if err != nil {
Expand All @@ -180,17 +186,17 @@ func (s *Server) Start() error {
log.L().Info("started to handle mysql source", zap.String("sourceCfg", sourceCfg.String()))
}

s.wg.Add(1)
s.runWg.Add(1)
go func(ctx context.Context) {
defer s.wg.Done()
defer s.runWg.Done()
for {
err1 := s.observeSourceBound(ctx, revBound)
if err1 == nil {
return
}
s.restartKeepAlive()
}
}(s.ctx)
}(s.runCtx)

// create a cmux
m = cmux.New(s.rootLis)
Expand Down Expand Up @@ -468,8 +474,8 @@ func (s *Server) doClose() {
return
}
// stop server in advance, stop receiving source bound and relay bound
s.cancel()
s.wg.Wait()
s.runCancel()
s.runWg.Wait()

// stop worker and wait for return(we already lock the whole Sever, so no need use lock to get source worker)
if w := s.getSourceWorker(true); w != nil {
Expand All @@ -493,6 +499,10 @@ func (s *Server) doClose() {
func (s *Server) Close() {
s.doClose() // we should stop current sync first, otherwise master may schedule task on new worker while we are closing
s.stopKeepAlive()

s.cancel()
s.wg.Wait()

if s.etcdClient != nil {
s.etcdClient.Close()
}
Expand Down

0 comments on commit c164be5

Please sign in to comment.