Skip to content

Commit

Permalink
worker: add lock protection for w.cfg (#5250)
Browse files Browse the repository at this point in the history
close #5249
  • Loading branch information
lichunzhu authored Apr 25, 2022
1 parent 321e9e1 commit eb637b1
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
7 changes: 5 additions & 2 deletions dm/dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,11 +790,14 @@ func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR
log.L().Info("", zap.String("request", "OperateSchema"), zap.Stringer("payload", req))

w := s.getSourceWorker(true)
w.RLock()
sourceID := w.cfg.SourceID
w.RUnlock()
if w == nil {
log.L().Warn("fail to call OperateSchema, because no mysql source is being handled in the worker")
return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil
} else if req.Source != w.cfg.SourceID {
log.L().Error("fail to call OperateSchema, because source mismatch", zap.String("request", req.Source), zap.String("current", w.cfg.SourceID))
} else if req.Source != sourceID {
log.L().Error("fail to call OperateSchema, because source mismatch", zap.String("request", req.Source), zap.String("current", sourceID))
return makeCommonWorkerResponse(terror.ErrWorkerSourceNotMatch.Generate()), nil
}

Expand Down
15 changes: 11 additions & 4 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ import (

// SourceWorker manages a source(upstream) which is mainly related to subtasks and relay.
type SourceWorker struct {
// ensure no other operation can be done when closing (we can use `WatGroup`/`Context` to archive this)
// ensure no other operation can be done when closing (we can use `WaitGroup`/`Context` to archive this)
// TODO: check what does it guards. Now it's used to guard relayHolder and relayPurger (maybe subTaskHolder?) since
// query-status maybe access them when closing/disable functionalities
// This lock is used to guards source worker's source config and subtask holder(subtask configs)
sync.RWMutex

wg sync.WaitGroup
Expand Down Expand Up @@ -249,9 +250,12 @@ func (w *SourceWorker) Stop(graceful bool) {
// updateSourceStatus updates w.sourceStatus.
func (w *SourceWorker) updateSourceStatus(ctx context.Context) error {
w.sourceDBMu.Lock()
w.RLock()
cfg := w.cfg
w.RUnlock()
if w.sourceDB == nil {
var err error
w.sourceDB, err = conn.DefaultDBProvider.Apply(&w.cfg.DecryptPassword().From)
w.sourceDB, err = conn.DefaultDBProvider.Apply(&cfg.DecryptPassword().From)
if err != nil {
w.sourceDBMu.Unlock()
return err
Expand All @@ -262,7 +266,7 @@ func (w *SourceWorker) updateSourceStatus(ctx context.Context) error {
var status binlog.SourceStatus
ctx, cancel := context.WithTimeout(ctx, utils.DefaultDBTimeout)
defer cancel()
pos, gtidSet, err := utils.GetPosAndGs(ctx, w.sourceDB.DB, w.cfg.Flavor)
pos, gtidSet, err := utils.GetPosAndGs(ctx, w.sourceDB.DB, cfg.Flavor)
if err != nil {
return err
}
Expand Down Expand Up @@ -1160,7 +1164,10 @@ func (w *SourceWorker) observeValidatorStage(ctx context.Context, lastUsedRev in
case <-ctx.Done():
return nil
case <-time.After(500 * time.Millisecond):
startRevision, err = w.getCurrentValidatorRevision(w.cfg.SourceID)
w.RLock()
sourceID := w.cfg.SourceID
w.RUnlock()
startRevision, err = w.getCurrentValidatorRevision(sourceID)
if err != nil {
log.L().Error("reset validator stage failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
}
Expand Down
5 changes: 4 additions & 1 deletion dm/dm/worker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ func (w *SourceWorker) GetValidateStatus(stName string, filterStatus pb.Stage) [
if st == nil {
return res
}
sourceIP := w.cfg.From.Host + ":" + strconv.Itoa(w.cfg.From.Port)
w.RLock()
cfg := w.cfg
w.RUnlock()
sourceIP := cfg.From.Host + ":" + strconv.Itoa(cfg.From.Port)
tblStats := st.GetValidatorStatus()
for _, stat := range tblStats {
if filterStatus == pb.Stage_InvalidStage || stat.ValidationStatus == filterStatus.String() {
Expand Down

0 comments on commit eb637b1

Please sign in to comment.