diff --git a/checker/checker.go b/checker/checker.go index 1469578707..8fb083c58b 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" + "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/conn" fr "github.com/pingcap/dm/pkg/func-rollback" "github.com/pingcap/dm/pkg/log" @@ -420,7 +421,7 @@ func (c *Checker) IsFreshTask() (bool, error) { } // Status implements Unit interface. -func (c *Checker) Status() interface{} { +func (c *Checker) Status(_ *binlog.SourceStatus) interface{} { c.result.RLock() res := c.result.detail c.result.RUnlock() diff --git a/dm/unit/unit.go b/dm/unit/unit.go index 647fa2b533..d5821644e9 100644 --- a/dm/unit/unit.go +++ b/dm/unit/unit.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/terror" ) @@ -55,8 +56,9 @@ type Unit interface { // Update updates the configuration Update(cfg *config.SubTaskConfig) error - // Status returns the unit's current status - Status() interface{} + // Status returns the unit's current status. The result may need calculation with source status, like estimated time + // to catch up. If sourceStatus is nil, the calculation should be skipped. + Status(sourceStatus *binlog.SourceStatus) interface{} // Type returns the unit's type Type() pb.UnitType // IsFreshTask return whether is a fresh task (not processed before) diff --git a/dm/worker/hub.go b/dm/worker/hub.go index b3b32726d5..2c67797433 100644 --- a/dm/worker/hub.go +++ b/dm/worker/hub.go @@ -17,11 +17,11 @@ var conditionHub *ConditionHub // ConditionHub holds a DM-worker and it is used for wait condition detection. type ConditionHub struct { - w *Worker + w *SourceWorker } // InitConditionHub inits the singleton instance of ConditionHub. -func InitConditionHub(w *Worker) { +func InitConditionHub(w *SourceWorker) { conditionHub = &ConditionHub{ w: w, } diff --git a/dm/worker/relay.go b/dm/worker/relay.go index e058e775aa..1ef228b099 100644 --- a/dm/worker/relay.go +++ b/dm/worker/relay.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/terror" @@ -39,7 +40,7 @@ type RelayHolder interface { // Close closes the holder Close() // Status returns relay unit's status - Status(ctx context.Context) *pb.RelayStatus + Status(sourceStatus *binlog.SourceStatus) *pb.RelayStatus // Stage returns the stage of the relay Stage() pb.Stage // Error returns relay unit's status @@ -148,14 +149,14 @@ func (h *realRelayHolder) run() { } // Status returns relay unit's status. -func (h *realRelayHolder) Status(ctx context.Context) *pb.RelayStatus { +func (h *realRelayHolder) Status(sourceStatus *binlog.SourceStatus) *pb.RelayStatus { if h.closed.Load() || h.relay.IsClosed() { return &pb.RelayStatus{ Stage: pb.Stage_Stopped, } } - s := h.relay.Status(ctx).(*pb.RelayStatus) + s := h.relay.Status(sourceStatus).(*pb.RelayStatus) s.Stage = h.Stage() s.Result = h.Result() @@ -374,7 +375,7 @@ func (d *dummyRelayHolder) Close() { } // Status implements interface of RelayHolder. -func (d *dummyRelayHolder) Status(ctx context.Context) *pb.RelayStatus { +func (d *dummyRelayHolder) Status(sourceStatus *binlog.SourceStatus) *pb.RelayStatus { d.Lock() defer d.Unlock() return &pb.RelayStatus{ diff --git a/dm/worker/relay_test.go b/dm/worker/relay_test.go index 4bbec4a6b9..cccc9649be 100644 --- a/dm/worker/relay_test.go +++ b/dm/worker/relay_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" + "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/gtid" pkgstreamer "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/utils" @@ -104,7 +105,7 @@ func (d *DummyRelay) Error() interface{} { } // Status implements Process interface. -func (d *DummyRelay) Status(ctx context.Context) interface{} { +func (d *DummyRelay) Status(sourceStatus *binlog.SourceStatus) interface{} { return &pb.RelayStatus{ Stage: pb.Stage_New, } @@ -185,7 +186,7 @@ func (t *testRelay) testStart(c *C, holder *realRelayHolder) { c.Assert(holder.closed.Load(), IsFalse) // test status - status := holder.Status(context.Background()) + status := holder.Status(nil) c.Assert(status.Stage, Equals, pb.Stage_Running) c.Assert(status.Result, IsNil) @@ -226,7 +227,7 @@ func (t *testRelay) testClose(c *C, holder *realRelayHolder) { c.Assert(holder.closed.Load(), IsTrue) // todo: very strange, and can't resume - status := holder.Status(context.Background()) + status := holder.Status(nil) c.Assert(status.Stage, Equals, pb.Stage_Stopped) c.Assert(status.Result, IsNil) @@ -244,7 +245,7 @@ func (t *testRelay) testPauseAndResume(c *C, holder *realRelayHolder) { c.Assert(err, ErrorMatches, ".*current stage is Paused.*") // test status - status := holder.Status(context.Background()) + status := holder.Status(nil) c.Assert(status.Stage, Equals, pb.Stage_Paused) // test update @@ -260,7 +261,7 @@ func (t *testRelay) testPauseAndResume(c *C, holder *realRelayHolder) { c.Assert(err, ErrorMatches, ".*current stage is Running.*") // test status - status = holder.Status(context.Background()) + status = holder.Status(nil) c.Assert(status.Stage, Equals, pb.Stage_Running) c.Assert(status.Result, IsNil) diff --git a/dm/worker/server.go b/dm/worker/server.go index 8cd8ca6851..714edd9baa 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -72,7 +72,7 @@ type Server struct { rootLis net.Listener svr *grpc.Server - worker *Worker + worker *SourceWorker etcdClient *clientv3.Client // relay status will never be put in server.sourceStatus @@ -497,7 +497,7 @@ func (s *Server) Close() { } // if needLock is false, we should make sure Server has been locked in caller. -func (s *Server) getWorker(needLock bool) *Worker { +func (s *Server) getWorker(needLock bool) *SourceWorker { if needLock { s.Lock() defer s.Unlock() @@ -506,7 +506,7 @@ func (s *Server) getWorker(needLock bool) *Worker { } // if needLock is false, we should make sure Server has been locked in caller. -func (s *Server) setWorker(worker *Worker, needLock bool) { +func (s *Server) setWorker(worker *SourceWorker, needLock bool) { if needLock { s.Lock() defer s.Unlock() @@ -819,7 +819,7 @@ func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR }, nil } -func (s *Server) getOrStartWorker(cfg *config.SourceConfig, needLock bool) (*Worker, error) { +func (s *Server) getOrStartWorker(cfg *config.SourceConfig, needLock bool) (*SourceWorker, error) { if needLock { s.Lock() defer s.Unlock() @@ -834,7 +834,7 @@ func (s *Server) getOrStartWorker(cfg *config.SourceConfig, needLock bool) (*Wor } log.L().Info("will start a new worker", zap.String("sourceID", cfg.SourceID)) - w, err := NewWorker(cfg, s.etcdClient, s.cfg.Name) + w, err := NewSourceWorker(cfg, s.etcdClient, s.cfg.Name) if err != nil { return nil, err } diff --git a/dm/worker/worker.go b/dm/worker/source_worker.go similarity index 82% rename from dm/worker/worker.go rename to dm/worker/source_worker.go index c16a2584d1..3ab4ee3cf0 100644 --- a/dm/worker/worker.go +++ b/dm/worker/source_worker.go @@ -21,6 +21,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "go.etcd.io/etcd/clientv3" "go.uber.org/atomic" @@ -39,8 +40,8 @@ import ( "github.com/pingcap/dm/relay/purger" ) -// Worker manages sub tasks and process units for data migration. -type Worker struct { +// SourceWorker manages a source(upstream) which is mainly related to subtasks and relay. +type SourceWorker struct { // ensure no other operation can be done when closing (we can use `WatGroup`/`Context` to archive this) // TODO: check what does it guards. Now it's used to guard relayHolder and relayPurger (maybe subTaskHolder?) since // query-status maybe access them when closing/disable functionalities @@ -49,14 +50,17 @@ type Worker struct { wg sync.WaitGroup closed atomic.Bool - // context created when Worker created, and canceled when closing + // context created when SourceWorker created, and canceled when closing ctx context.Context cancel context.CancelFunc - cfg *config.SourceConfig - db *conn.BaseDB - dbMutex sync.Mutex - l log.Logger + cfg *config.SourceConfig + sourceDB *conn.BaseDB + sourceDBMu sync.Mutex // if the sourceDB can't be connected at start time, we try to re-connect before using it. + + l log.Logger + + sourceStatus atomic.Value // stores a pointer to SourceStatus // subtask functionality subTaskEnabled atomic.Bool @@ -81,10 +85,10 @@ type Worker struct { name string } -// NewWorker creates a new Worker. The functionality of relay and subtask is disabled by default, need call EnableRelay +// NewSourceWorker creates a new SourceWorker. The functionality of relay and subtask is disabled by default, need call EnableRelay // and EnableSubtask later. -func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client, name string) (w *Worker, err error) { - w = &Worker{ +func NewSourceWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client, name string) (w *SourceWorker, err error) { + w = &SourceWorker{ cfg: cfg, subTaskHolder: newSubTaskHolder(), l: log.With(zap.String("component", "worker controller")), @@ -97,7 +101,7 @@ func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client, name strin w.subTaskEnabled.Store(false) w.relayEnabled.Store(false) - defer func(w2 *Worker) { + defer func(w2 *SourceWorker) { if err != nil { // when err != nil, `w` will become nil in this func, so we pass `w` in defer. // release resources, NOTE: we need to refactor New/Init/Start/Close for components later. w2.cancel() @@ -122,19 +126,35 @@ func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client, name strin return w, nil } -// Start starts working. -func (w *Worker) Start() { +// Start starts working, but the functionalities should be turned on separately. +func (w *SourceWorker) Start() { // start task status checker if w.cfg.Checker.CheckEnable { w.taskStatusChecker.Start() } + var err error + w.sourceDB, err = conn.DefaultDBProvider.Apply(w.cfg.DecryptPassword().From) + if err != nil { + w.l.Error("can't connected to upstream", zap.Error(err)) + } + w.wg.Add(1) defer w.wg.Done() w.l.Info("start running") - ticker := time.NewTicker(5 * time.Second) + printTaskInterval := 30 * time.Second + failpoint.Inject("PrintStatusCheckSeconds", func(val failpoint.Value) { + if seconds, ok := val.(int); ok { + printTaskInterval = time.Duration(seconds) * time.Second + log.L().Info("set printStatusInterval", + zap.String("failpoint", "PrintStatusCheckSeconds"), + zap.Int("value", seconds)) + } + }) + + ticker := time.NewTicker(printTaskInterval) w.closed.Store(false) defer ticker.Stop() for { @@ -143,13 +163,30 @@ func (w *Worker) Start() { w.l.Info("status print process exits!") return case <-ticker.C: - w.l.Debug("runtime status", zap.String("status", w.StatusJSON(""))) + old := w.sourceStatus.Load() + if old != nil { + status := old.(*binlog.SourceStatus) + if time.Since(status.UpdateTime) < printTaskInterval/2 { + w.l.Info("we just updated the source status, skip once", + zap.Time("last update time", status.UpdateTime)) + continue + } + } + if err2 := w.updateSourceStatus(w.ctx); err2 != nil { + w.l.Error("failed to update source status", zap.Error(err2)) + continue + } + + sourceStatus := w.sourceStatus.Load().(*binlog.SourceStatus) + if w.l.Core().Enabled(zap.DebugLevel) { + w.l.Debug("runtime status", zap.String("status", w.GetUnitAndSourceStatusJSON("", sourceStatus))) + } } } } // Close stops working and releases resources. -func (w *Worker) Close() { +func (w *SourceWorker) Close() { if w.closed.Load() { w.l.Warn("already closed") return @@ -164,11 +201,6 @@ func (w *Worker) Close() { w.Lock() defer w.Unlock() - w.dbMutex.Lock() - w.db.Close() - w.db = nil - w.dbMutex.Unlock() - // close all sub tasks w.subTaskHolder.closeAllSubTasks() @@ -185,12 +217,55 @@ func (w *Worker) Close() { w.taskStatusChecker.Close() } + w.sourceDB.Close() + w.sourceDB = nil + w.closed.Store(true) + w.l.Info("Stop worker") } +// updateSourceStatus updates w.sourceStatus. +func (w *SourceWorker) updateSourceStatus(ctx context.Context) error { + w.sourceDBMu.Lock() + if w.sourceDB == nil { + var err error + w.sourceDB, err = conn.DefaultDBProvider.Apply(w.cfg.DecryptPassword().From) + if err != nil { + w.sourceDBMu.Unlock() + return err + } + } + w.sourceDBMu.Unlock() + + var status binlog.SourceStatus + ctx, cancel := context.WithTimeout(ctx, utils.DefaultDBTimeout) + defer cancel() + pos, gtidSet, err := utils.GetMasterStatus(ctx, w.sourceDB.DB, w.cfg.Flavor) + if err != nil { + return err + } + status.Location.Position = pos + if err2 := status.Location.SetGTID(gtidSet.Origin()); err2 != nil { + return err2 + } + + ctx2, cancel2 := context.WithTimeout(ctx, utils.DefaultDBTimeout) + defer cancel2() + binlogs, err := binlog.GetBinaryLogs(ctx2, w.sourceDB.DB) + if err != nil { + return err + } + status.Binlogs = binlogs + + status.UpdateTime = time.Now() + + w.sourceStatus.Store(&status) + return nil +} + // EnableRelay enables the functionality of start/watch/handle relay. -func (w *Worker) EnableRelay() (err error) { +func (w *SourceWorker) EnableRelay() (err error) { w.l.Info("enter EnableRelay") w.Lock() defer w.Unlock() @@ -270,7 +345,7 @@ func (w *Worker) EnableRelay() (err error) { } // DisableRelay disables the functionality of start/watch/handle relay. -func (w *Worker) DisableRelay() { +func (w *SourceWorker) DisableRelay() { w.l.Info("enter DisableRelay") w.Lock() defer w.Unlock() @@ -306,7 +381,7 @@ func (w *Worker) DisableRelay() { } // EnableHandleSubtasks enables the functionality of start/watch/handle subtasks. -func (w *Worker) EnableHandleSubtasks() error { +func (w *SourceWorker) EnableHandleSubtasks() error { w.l.Info("enter EnableHandleSubtasks") w.Lock() defer w.Unlock() @@ -353,7 +428,7 @@ func (w *Worker) EnableHandleSubtasks() error { } // DisableHandleSubtasks disables the functionality of start/watch/handle subtasks. -func (w *Worker) DisableHandleSubtasks() { +func (w *SourceWorker) DisableHandleSubtasks() { w.l.Info("enter DisableHandleSubtasks") if !w.subTaskEnabled.CAS(true, false) { w.l.Warn("already disabled handling subtasks") @@ -374,7 +449,7 @@ func (w *Worker) DisableHandleSubtasks() { // fetchSubTasksAndAdjust gets source's subtask stages and configs, adjust some values by worker's config and status // source **must not be empty** // return map{task name -> subtask stage}, map{task name -> subtask config}, revision, error. -func (w *Worker) fetchSubTasksAndAdjust() (map[string]ha.Stage, map[string]config.SubTaskConfig, int64, error) { +func (w *SourceWorker) fetchSubTasksAndAdjust() (map[string]ha.Stage, map[string]config.SubTaskConfig, int64, error) { // we get the newest subtask stages directly which will omit the subtask stage PUT/DELETE event // because triggering these events is useless now subTaskStages, subTaskCfgM, revSubTask, err := ha.GetSubTaskStageConfig(w.etcdClient, w.cfg.SourceID) @@ -389,7 +464,7 @@ func (w *Worker) fetchSubTasksAndAdjust() (map[string]ha.Stage, map[string]confi } // StartSubTask creates a subtask and run it. -func (w *Worker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage, needLock bool) error { +func (w *SourceWorker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage, needLock bool) error { if needLock { w.Lock() defer w.Unlock() @@ -431,7 +506,7 @@ func (w *Worker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.Stage, n } // UpdateSubTask update config for a sub task. -func (w *Worker) UpdateSubTask(cfg *config.SubTaskConfig) error { +func (w *SourceWorker) UpdateSubTask(cfg *config.SubTaskConfig) error { w.Lock() defer w.Unlock() @@ -449,7 +524,7 @@ func (w *Worker) UpdateSubTask(cfg *config.SubTaskConfig) error { } // OperateSubTask stop/resume/pause sub task. -func (w *Worker) OperateSubTask(name string, op pb.TaskOp) error { +func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error { w.Lock() defer w.Unlock() @@ -485,7 +560,7 @@ func (w *Worker) OperateSubTask(name string, op pb.TaskOp) error { } // QueryStatus query worker's sub tasks' status. If relay enabled, also return source status. -func (w *Worker) QueryStatus(ctx context.Context, name string) ([]*pb.SubTaskStatus, *pb.RelayStatus, error) { +func (w *SourceWorker) QueryStatus(ctx context.Context, name string) ([]*pb.SubTaskStatus, *pb.RelayStatus, error) { w.RLock() defer w.RUnlock() @@ -494,81 +569,25 @@ func (w *Worker) QueryStatus(ctx context.Context, name string) ([]*pb.SubTaskSta return nil, nil, nil } - ctx2, cancel2 := context.WithTimeout(ctx, utils.DefaultDBTimeout) - defer cancel2() var ( - subtaskStatus = w.Status(name) - relayStatus *pb.RelayStatus + sourceStatus *binlog.SourceStatus + relayStatus *pb.RelayStatus ) - if w.relayEnabled.Load() { - relayStatus = w.relayHolder.Status(ctx2) - w.postProcessStatus(subtaskStatus, relayStatus.MasterBinlog, relayStatus.MasterBinlogGtid) + + if err := w.updateSourceStatus(ctx); err != nil { + w.l.Error("failed to update source status", zap.Error(err)) } else { - // fetch master status if relay is not enabled - w.dbMutex.Lock() - if w.db == nil { - var err error - w.l.Info("will open a connection to get master status", zap.Any("upstream config", w.cfg.From)) - w.db, err = conn.DefaultDBProvider.Apply(w.cfg.DecryptPassword().From) - if err != nil { - w.l.Error("can't open a connection to get master status", zap.Error(err)) - w.dbMutex.Unlock() - return subtaskStatus, relayStatus, err - } - } - pos, gset, err := utils.GetMasterStatus(ctx2, w.db.DB, w.cfg.Flavor) - w.dbMutex.Unlock() - if err != nil { - return subtaskStatus, relayStatus, err - } - w.postProcessStatus(subtaskStatus, pos.String(), gset.String()) + sourceStatus = w.sourceStatus.Load().(*binlog.SourceStatus) } - return subtaskStatus, relayStatus, nil -} -// postProcessStatus fills the status of sync unit with master binlog location and other related fields. -func (w *Worker) postProcessStatus( - subtaskStatus []*pb.SubTaskStatus, - masterBinlogPos string, - masterBinlogGtid string, -) { - for _, status := range subtaskStatus { - syncStatus := status.GetSync() - if syncStatus == nil { - // not a Sync unit - continue - } - - syncStatus.MasterBinlog = masterBinlogPos - syncStatus.MasterBinlogGtid = masterBinlogGtid - if w.cfg.EnableGTID { - // rely on sorted GTID set when String() - if masterBinlogGtid == syncStatus.SyncerBinlogGtid { - syncStatus.Synced = true - } - } else { - syncPos, err := binlog.PositionFromPosStr(syncStatus.SyncerBinlog) - if err != nil { - w.l.Debug("fail to parse mysql position", zap.String("position", syncStatus.SyncerBinlog), log.ShortError(err)) - continue - } - masterPos, err := binlog.PositionFromPosStr(masterBinlogPos) - if err != nil { - w.l.Debug("fail to parse mysql position", zap.String("position", syncStatus.SyncerBinlog), log.ShortError(err)) - continue - } - - syncRealPos, err := binlog.RealMySQLPos(syncPos) - if err != nil { - w.l.Debug("fail to parse real mysql position", zap.String("position", syncStatus.SyncerBinlog), log.ShortError(err)) - continue - } - syncStatus.Synced = syncRealPos.Compare(masterPos) == 0 - } + subtaskStatus := w.Status(name, sourceStatus) + if w.relayEnabled.Load() { + relayStatus = w.relayHolder.Status(sourceStatus) } + return subtaskStatus, relayStatus, nil } -func (w *Worker) resetSubtaskStage() (int64, error) { +func (w *SourceWorker) resetSubtaskStage() (int64, error) { subTaskStages, subTaskCfgm, revSubTask, err := w.fetchSubTasksAndAdjust() if err != nil { return 0, err @@ -601,7 +620,7 @@ func (w *Worker) resetSubtaskStage() (int64, error) { return revSubTask, nil } -func (w *Worker) observeSubtaskStage(ctx context.Context, etcdCli *clientv3.Client, rev int64) error { +func (w *SourceWorker) observeSubtaskStage(ctx context.Context, etcdCli *clientv3.Client, rev int64) error { var wg sync.WaitGroup for { @@ -648,7 +667,7 @@ func (w *Worker) observeSubtaskStage(ctx context.Context, etcdCli *clientv3.Clie } } -func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage, errCh chan error) error { +func (w *SourceWorker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage, errCh chan error) error { closed := false for { select { @@ -687,7 +706,7 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage, } // operateSubTaskStage returns TaskOp.String() additionally to record metrics. -func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskConfig) (string, error) { +func (w *SourceWorker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskConfig) (string, error) { var op pb.TaskOp switch { case stage.Expect == pb.Stage_Running, stage.Expect == pb.Stage_Paused: @@ -709,7 +728,7 @@ func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskCo } // operateSubTaskStageWithoutConfig returns TaskOp additionally to record metrics. -func (w *Worker) operateSubTaskStageWithoutConfig(stage ha.Stage) (string, error) { +func (w *SourceWorker) operateSubTaskStageWithoutConfig(stage ha.Stage) (string, error) { var subTaskCfg config.SubTaskConfig if stage.Expect == pb.Stage_Running { if st := w.subTaskHolder.findSubTask(stage.Task); st == nil { @@ -727,7 +746,7 @@ func (w *Worker) operateSubTaskStageWithoutConfig(stage ha.Stage) (string, error return w.operateSubTaskStage(stage, subTaskCfg) } -func (w *Worker) observeRelayStage(ctx context.Context, etcdCli *clientv3.Client, rev int64) error { +func (w *SourceWorker) observeRelayStage(ctx context.Context, etcdCli *clientv3.Client, rev int64) error { var wg sync.WaitGroup for { relayStageCh := make(chan ha.Stage, 10) @@ -783,7 +802,7 @@ func (w *Worker) observeRelayStage(ctx context.Context, etcdCli *clientv3.Client } } -func (w *Worker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, errCh chan error) error { +func (w *SourceWorker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, errCh chan error) error { OUTER: for { select { @@ -815,7 +834,7 @@ OUTER: // operateRelayStage returns RelayOp.String() additionally to record metrics // *RelayOp is nil only when error is nil, so record on error will not meet nil-pointer deference. -func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) (string, error) { +func (w *SourceWorker) operateRelayStage(ctx context.Context, stage ha.Stage) (string, error) { var op pb.RelayOp switch { case stage.Expect == pb.Stage_Running: @@ -834,7 +853,7 @@ func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) (string, } // OperateRelay operates relay unit. -func (w *Worker) operateRelay(ctx context.Context, op pb.RelayOp) error { +func (w *SourceWorker) operateRelay(ctx context.Context, op pb.RelayOp) error { if w.closed.Load() { return terror.ErrWorkerAlreadyClosed.Generate() } @@ -849,7 +868,7 @@ func (w *Worker) operateRelay(ctx context.Context, op pb.RelayOp) error { } // PurgeRelay purges relay log files. -func (w *Worker) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) error { +func (w *SourceWorker) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) error { if w.closed.Load() { return terror.ErrWorkerAlreadyClosed.Generate() } @@ -862,7 +881,7 @@ func (w *Worker) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) erro if !w.subTaskEnabled.Load() { w.l.Info("worker received purge-relay but didn't handling subtasks, read global checkpoint to decided active relay log") - uuid := w.relayHolder.Status(ctx).RelaySubDir + uuid := w.relayHolder.Status(nil).RelaySubDir _, subTaskCfgs, _, err := w.fetchSubTasksAndAdjust() if err != nil { @@ -886,7 +905,7 @@ func (w *Worker) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) erro } // ForbidPurge implements PurgeInterceptor.ForbidPurge. -func (w *Worker) ForbidPurge() (bool, string) { +func (w *SourceWorker) ForbidPurge() (bool, string) { if w.closed.Load() { return false, "" } @@ -904,7 +923,7 @@ func (w *Worker) ForbidPurge() (bool, string) { } // OperateSchema operates schema for an upstream table. -func (w *Worker) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaRequest) (schema string, err error) { +func (w *SourceWorker) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaRequest) (schema string, err error) { w.Lock() defer w.Unlock() @@ -973,7 +992,7 @@ func copyConfigFromSourceForEach( // getAllSubTaskStatus returns all subtask status of this worker, note the field // in subtask status is not completed, only includes `Name`, `Stage` and `Result` now. -func (w *Worker) getAllSubTaskStatus() map[string]*pb.SubTaskStatus { +func (w *SourceWorker) getAllSubTaskStatus() map[string]*pb.SubTaskStatus { sts := w.subTaskHolder.getAllSubTasks() result := make(map[string]*pb.SubTaskStatus, len(sts)) for name, st := range sts { @@ -989,7 +1008,7 @@ func (w *Worker) getAllSubTaskStatus() map[string]*pb.SubTaskStatus { } // HandleError handle worker error. -func (w *Worker) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest) error { +func (w *SourceWorker) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest) error { w.Lock() defer w.Unlock() diff --git a/dm/worker/worker_test.go b/dm/worker/source_worker_test.go similarity index 97% rename from dm/worker/worker_test.go rename to dm/worker/source_worker_test.go index 2f59836b97..413d4efc30 100644 --- a/dm/worker/worker_test.go +++ b/dm/worker/source_worker_test.go @@ -78,14 +78,14 @@ func (t *testServer) testWorker(c *C) { defer func() { NewRelayHolder = NewRealRelayHolder }() - w, err := NewWorker(cfg, etcdCli, "") + w, err := NewSourceWorker(cfg, etcdCli, "") c.Assert(err, IsNil) c.Assert(w.EnableRelay(), ErrorMatches, "init error") NewRelayHolder = NewDummyRelayHolder - w, err = NewWorker(cfg, etcdCli, "") + w, err = NewSourceWorker(cfg, etcdCli, "") c.Assert(err, IsNil) - c.Assert(w.StatusJSON(""), HasLen, emptyWorkerStatusInfoJSONLength) + c.Assert(w.GetUnitAndSourceStatusJSON("", nil), HasLen, emptyWorkerStatusInfoJSONLength) // close twice w.Close() @@ -286,7 +286,7 @@ func (t *testWorkerFunctionalities) TestWorkerFunctionalities(c *C) { c.Assert(err, IsNil) // start worker - w, err := NewWorker(sourceCfg, etcdCli, "") + w, err := NewSourceWorker(sourceCfg, etcdCli, "") c.Assert(err, IsNil) defer w.Close() go func() { @@ -355,7 +355,7 @@ func (t *testWorkerFunctionalities) TestWorkerFunctionalities(c *C) { c.Assert(w.subTaskEnabled.Load(), IsFalse) } -func (t *testWorkerFunctionalities) testEnableRelay(c *C, w *Worker, etcdCli *clientv3.Client, +func (t *testWorkerFunctionalities) testEnableRelay(c *C, w *SourceWorker, etcdCli *clientv3.Client, sourceCfg *config.SourceConfig, cfg *Config) { c.Assert(w.EnableRelay(), IsNil) @@ -378,14 +378,14 @@ func (t *testWorkerFunctionalities) testEnableRelay(c *C, w *Worker, etcdCli *cl }), IsTrue) } -func (t *testWorkerFunctionalities) testDisableRelay(c *C, w *Worker) { +func (t *testWorkerFunctionalities) testDisableRelay(c *C, w *SourceWorker) { w.DisableRelay() c.Assert(w.relayEnabled.Load(), IsFalse) c.Assert(w.relayHolder, IsNil) } -func (t *testWorkerFunctionalities) testEnableHandleSubtasks(c *C, w *Worker, etcdCli *clientv3.Client, +func (t *testWorkerFunctionalities) testEnableHandleSubtasks(c *C, w *SourceWorker, etcdCli *clientv3.Client, subtaskCfg config.SubTaskConfig, sourceCfg *config.SourceConfig) { c.Assert(w.EnableHandleSubtasks(), IsNil) c.Assert(w.subTaskEnabled.Load(), IsTrue) @@ -455,7 +455,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) { sourceCfg.EnableRelay = false // step 1: start worker - w, err := NewWorker(sourceCfg, etcdCli, "") + w, err := NewSourceWorker(sourceCfg, etcdCli, "") c.Assert(err, IsNil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -574,7 +574,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) { sourceCfg.MetaDir = c.MkDir() // step 1: start worker - w, err := NewWorker(sourceCfg, etcdCli, "") + w, err := NewSourceWorker(sourceCfg, etcdCli, "") c.Assert(err, IsNil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/dm/worker/status.go b/dm/worker/status.go index 8f7a90d229..beb6e2b138 100644 --- a/dm/worker/status.go +++ b/dm/worker/status.go @@ -22,13 +22,13 @@ import ( "github.com/pingcap/dm/dm/common" "github.com/pingcap/dm/dm/pb" - "github.com/pingcap/dm/syncer" + "github.com/pingcap/dm/pkg/binlog" ) // Status returns the status of the current sub task. func (st *SubTask) Status() interface{} { if cu := st.CurrUnit(); cu != nil { - return cu.Status() + return cu.Status(nil) } return nil } @@ -46,7 +46,7 @@ func (st *SubTask) StatusJSON() string { // Status returns the status of the worker (and sub tasks) // if stName is empty, all sub task's status will be returned. -func (w *Worker) Status(stName string) []*pb.SubTaskStatus { +func (w *SourceWorker) Status(stName string, sourceStatus *binlog.SourceStatus) []*pb.SubTaskStatus { sts := w.subTaskHolder.getAllSubTasks() if len(sts) == 0 { @@ -92,7 +92,7 @@ func (w *Worker) Status(stName string) []*pb.SubTaskStatus { if cu != nil { stStatus.Unit = cu.Type() // oneof status - us := cu.Status() + us := cu.Status(sourceStatus) switch stStatus.Unit { case pb.UnitType_Check: stStatus.Status = &pb.SubTaskStatus_Check{Check: us.(*pb.CheckStatus)} @@ -101,10 +101,7 @@ func (w *Worker) Status(stName string) []*pb.SubTaskStatus { case pb.UnitType_Load: stStatus.Status = &pb.SubTaskStatus_Load{Load: us.(*pb.LoadStatus)} case pb.UnitType_Sync: - cus := cu.(*syncer.Syncer) // ss must be *syncer.Syncer - ss := us.(*pb.SyncStatus) - ss.SecondsBehindMaster = cus.GetSecondsBehindMaster() - stStatus.Status = &pb.SubTaskStatus_Sync{Sync: ss} + stStatus.Status = &pb.SubTaskStatus_Sync{Sync: us.(*pb.SyncStatus)} } } } @@ -114,13 +111,14 @@ func (w *Worker) Status(stName string) []*pb.SubTaskStatus { return status } -// StatusJSON returns the status of the worker as json string. -func (w *Worker) StatusJSON(stName string) string { - sl := &pb.SubTaskStatusList{Status: w.Status(stName)} +// GetUnitAndSourceStatusJSON returns the status of the worker and its unit as json string. +// This function will also cause every unit to print its status to log. +func (w *SourceWorker) GetUnitAndSourceStatusJSON(stName string, sourceStatus *binlog.SourceStatus) string { + sl := &pb.SubTaskStatusList{Status: w.Status(stName, sourceStatus)} mar := jsonpb.Marshaler{EmitDefaults: true, Indent: " "} s, err := mar.MarshalToString(sl) if err != nil { - w.l.Error("fail to marshal status", zap.Reflect("status", sl), zap.Error(err)) + w.l.Error("fail to marshal status", zap.Any("status", sl), zap.Error(err)) return "" } return s diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 1e9430afcd..037435cbb0 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -626,7 +626,7 @@ func (st *SubTask) unitTransWaitCondition(subTaskCtx context.Context) error { ctxWait, cancelWait := context.WithTimeout(hub.w.ctx, waitRelayCatchupTimeout) defer cancelWait() - loadStatus := pu.Status().(*pb.LoadStatus) + loadStatus := pu.Status(nil).(*pb.LoadStatus) if st.cfg.EnableGTID { gset1, err = gtid.ParserGTID(st.cfg.Flavor, loadStatus.MetaBinlogGTID) @@ -641,9 +641,7 @@ func (st *SubTask) unitTransWaitCondition(subTaskCtx context.Context) error { } for { - ctxStatus, cancelStatus := context.WithTimeout(ctxWait, utils.DefaultDBTimeout) - relayStatus := hub.w.relayHolder.Status(ctxStatus) - cancelStatus() + relayStatus := hub.w.relayHolder.Status(nil) if st.cfg.EnableGTID { gset2, err = gtid.ParserGTID(st.cfg.Flavor, relayStatus.RelayBinlogGtid) diff --git a/dm/worker/subtask_test.go b/dm/worker/subtask_test.go index 308620575c..a2f94a18c8 100644 --- a/dm/worker/subtask_test.go +++ b/dm/worker/subtask_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/dumpling" "github.com/pingcap/dm/loader" + "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/dm/syncer" @@ -124,7 +125,7 @@ func (m *MockUnit) Update(_ *config.SubTaskConfig) error { return m.errUpdate } -func (m *MockUnit) Status() interface{} { +func (m *MockUnit) Status(_ *binlog.SourceStatus) interface{} { switch m.typ { case pb.UnitType_Check: return &pb.CheckStatus{} @@ -464,7 +465,7 @@ func (t *testSubTask) TestSubtaskFastQuit(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - w := &Worker{ + w := &SourceWorker{ ctx: ctx, // loadStatus relay MetaBinlog must be greater relayHolder: NewDummyRelayHolderWithRelayBinlog(config.NewSourceConfig(), relayHolderBinlog), diff --git a/dm/worker/task_checker.go b/dm/worker/task_checker.go index b7b42041a0..17533f7f03 100644 --- a/dm/worker/task_checker.go +++ b/dm/worker/task_checker.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/retry" "github.com/pingcap/dm/pkg/terror" - "github.com/pingcap/dm/pkg/utils" ) // Backoff related constants @@ -145,12 +144,12 @@ type realTaskStatusChecker struct { cfg config.CheckerConfig l log.Logger - w *Worker + w *SourceWorker bc *backoffController } // NewRealTaskStatusChecker creates a new realTaskStatusChecker instance. -func NewRealTaskStatusChecker(cfg config.CheckerConfig, w *Worker) TaskStatusChecker { +func NewRealTaskStatusChecker(cfg config.CheckerConfig, w *SourceWorker) TaskStatusChecker { tsc := &realTaskStatusChecker{ cfg: cfg, l: log.With(zap.String("component", "task checker")), @@ -300,10 +299,7 @@ func (tsc *realTaskStatusChecker) getRelayResumeStrategy(relayStatus *pb.RelaySt } func (tsc *realTaskStatusChecker) checkRelayStatus() { - ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultDBTimeout) - defer cancel() - - relayStatus := tsc.w.relayHolder.Status(ctx) + relayStatus := tsc.w.relayHolder.Status(nil) if tsc.bc.relayBackoff == nil { tsc.bc.relayBackoff, _ = backoff.NewBackoff(tsc.cfg.BackoffFactor, tsc.cfg.BackoffJitter, tsc.cfg.BackoffMin.Duration, tsc.cfg.BackoffMax.Duration) tsc.bc.latestRelayPausedTime = time.Now() diff --git a/dm/worker/task_checker_test.go b/dm/worker/task_checker_test.go index ac3db40e38..cf4dabdfe9 100644 --- a/dm/worker/task_checker_test.go +++ b/dm/worker/task_checker_test.go @@ -89,7 +89,7 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { cfg := loadSourceConfigWithoutPassword(c) cfg.RelayDir = dir cfg.MetaDir = dir - w, err := NewWorker(cfg, nil, "") + w, err := NewSourceWorker(cfg, nil, "") c.Assert(err, check.IsNil) w.closed.Store(false) @@ -204,7 +204,7 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) { cfg := loadSourceConfigWithoutPassword(c) cfg.RelayDir = dir cfg.MetaDir = dir - w, err := NewWorker(cfg, nil, "") + w, err := NewSourceWorker(cfg, nil, "") c.Assert(err, check.IsNil) w.closed.Store(false) diff --git a/dumpling/dumpling.go b/dumpling/dumpling.go index 1d5d2ebe7d..887dc2010b 100644 --- a/dumpling/dumpling.go +++ b/dumpling/dumpling.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" + "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/conn" dutils "github.com/pingcap/dm/pkg/dumpling" "github.com/pingcap/dm/pkg/log" @@ -195,7 +196,7 @@ func (m *Dumpling) Update(cfg *config.SubTaskConfig) error { } // Status implements Unit.Status. -func (m *Dumpling) Status() interface{} { +func (m *Dumpling) Status(_ *binlog.SourceStatus) interface{} { // NOTE: try to add some status, like dumped file count return &pb.DumpStatus{} } diff --git a/loader/loader.go b/loader/loader.go index 680c4eb4e2..176c978b8b 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -445,6 +445,7 @@ type Loader struct { dbTableDataTotalSize map[string]map[string]*atomic.Int64 dbTableDataFinishedSize map[string]map[string]*atomic.Int64 dbTableDataLastFinishedSize map[string]map[string]int64 + dbTableDataLastUpdatedTime time.Time metaBinlog atomic.String metaBinlogGTID atomic.String @@ -725,12 +726,6 @@ func (l *Loader) Restore(ctx context.Context) error { return err2 } - l.wg.Add(1) - go func() { - defer l.wg.Done() - l.PrintStatus(ctx) - }() - begin := time.Now() err = l.restoreData(ctx) diff --git a/loader/status.go b/loader/status.go index a5f07c0349..764c3dbe93 100644 --- a/loader/status.go +++ b/loader/status.go @@ -14,19 +14,16 @@ package loader import ( - "context" "time" - "github.com/pingcap/failpoint" "go.uber.org/zap" "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/binlog" ) -var printStatusInterval = time.Second * 5 - // Status implements Unit.Status. -func (l *Loader) Status() interface{} { +func (l *Loader) Status(_ *binlog.SourceStatus) interface{} { finishedSize := l.finishedDataSize.Load() totalSize := l.totalDataSize.Load() progress := percent(finishedSize, totalSize, l.finish.Load()) @@ -37,56 +34,34 @@ func (l *Loader) Status() interface{} { MetaBinlog: l.metaBinlog.Load(), MetaBinlogGTID: l.metaBinlogGTID.Load(), } + go l.printStatus() return s } -// PrintStatus prints status like progress percentage. -func (l *Loader) PrintStatus(ctx context.Context) { - failpoint.Inject("PrintStatusCheckSeconds", func(val failpoint.Value) { - if seconds, ok := val.(int); ok { - printStatusInterval = time.Duration(seconds) * time.Second - l.logger.Info("set printStatusInterval", zap.String("failpoint", "PrintStatusCheckSeconds"), zap.Int("value", seconds)) - } - }) - - ticker := time.NewTicker(printStatusInterval) - defer ticker.Stop() - - newCtx, cancel := context.WithCancel(ctx) - defer cancel() - - var done bool - for { - select { - case <-newCtx.Done(): - done = true - case <-ticker.C: - } - - finishedSize := l.finishedDataSize.Load() - totalSize := l.totalDataSize.Load() - totalFileCount := l.totalFileCount.Load() +// printStatus prints status like progress percentage. +func (l *Loader) printStatus() { + finishedSize := l.finishedDataSize.Load() + totalSize := l.totalDataSize.Load() + totalFileCount := l.totalFileCount.Load() - for db, tables := range l.dbTableDataFinishedSize { - for table, size := range tables { - curFinished := size.Load() - speed := float64(curFinished-l.dbTableDataLastFinishedSize[db][table]) / printStatusInterval.Seconds() - l.dbTableDataLastFinishedSize[db][table] = curFinished - if speed > 0 { - remainingSeconds := float64(l.dbTableDataTotalSize[db][table].Load()-curFinished) / speed - remainingTimeGauge.WithLabelValues(l.cfg.Name, l.cfg.WorkerName, l.cfg.SourceID, db, table).Set(remainingSeconds) - } + interval := time.Since(l.dbTableDataLastUpdatedTime) + for db, tables := range l.dbTableDataFinishedSize { + for table, size := range tables { + curFinished := size.Load() + speed := float64(curFinished-l.dbTableDataLastFinishedSize[db][table]) / interval.Seconds() + l.dbTableDataLastFinishedSize[db][table] = curFinished + if speed > 0 { + remainingSeconds := float64(l.dbTableDataTotalSize[db][table].Load()-curFinished) / speed + remainingTimeGauge.WithLabelValues(l.cfg.Name, l.cfg.WorkerName, l.cfg.SourceID, db, table).Set(remainingSeconds) } } - - l.logger.Info("progress status of load", - zap.Int64("finished_bytes", finishedSize), - zap.Int64("total_bytes", totalSize), - zap.Int64("total_file_count", totalFileCount), - zap.String("progress", percent(finishedSize, totalSize, l.finish.Load()))) - progressGauge.WithLabelValues(l.cfg.Name, l.cfg.SourceID).Set(progress(finishedSize, totalSize, l.finish.Load())) - if done { - return - } } + l.dbTableDataLastUpdatedTime = time.Now() + + l.logger.Info("progress status of load", + zap.Int64("finished_bytes", finishedSize), + zap.Int64("total_bytes", totalSize), + zap.Int64("total_file_count", totalFileCount), + zap.String("progress", percent(finishedSize, totalSize, l.finish.Load()))) + progressGauge.WithLabelValues(l.cfg.Name, l.cfg.SourceID).Set(progress(finishedSize, totalSize, l.finish.Load())) } diff --git a/pkg/binlog/status.go b/pkg/binlog/status.go new file mode 100644 index 0000000000..fee715216e --- /dev/null +++ b/pkg/binlog/status.go @@ -0,0 +1,88 @@ +package binlog + +import ( + "context" + "database/sql" + "time" + + gmysql "github.com/go-mysql-org/go-mysql/mysql" + + "github.com/pingcap/dm/pkg/terror" +) + +// in MySQL, we can set `max_binlog_size` to control the max size of a binlog file. +// but this is not absolute: +// > A transaction is written in one chunk to the binary log, so it is never split between several binary logs. +// > Therefore, if you have big transactions, you might see binary log files larger than max_binlog_size. +// ref: https://dev.mysql.com/doc/refman/5.7/en/replication-options-binary-log.html#sysvar_max_binlog_size +// The max value of `max_binlog_size` is 1073741824 (1GB) +// but the actual file size still can be larger, and it may exceed the range of an uint32 +// so, if we use go-mysql.Position(with uint32 Pos) to store the binlog size, it may become out of range. +// ps, use go-mysql.Position to store a position of binlog event (position of the next event) is enough. +type binlogSize struct { + name string + size int64 +} + +// FileSizes is a list of binlog filename and size. +type FileSizes []binlogSize + +// GetBinaryLogs returns binlog filename and size of upstream. +func GetBinaryLogs(ctx context.Context, db *sql.DB) (FileSizes, error) { + query := "SHOW BINARY LOGS" + rows, err := db.QueryContext(ctx, query) + if err != nil { + return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) + } + defer rows.Close() + + rowColumns, err := rows.Columns() + if err != nil { + return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) + } + files := make([]binlogSize, 0, 10) + for rows.Next() { + var file string + var pos int64 + var nullPtr interface{} + if len(rowColumns) == 2 { + err = rows.Scan(&file, &pos) + } else { + err = rows.Scan(&file, &pos, &nullPtr) + } + if err != nil { + return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) + } + files = append(files, binlogSize{name: file, size: pos}) + } + if rows.Err() != nil { + return nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError) + } + return files, nil +} + +// After returns the total size of binlog after `fromFile` in FileSizes. +func (b FileSizes) After(fromFile gmysql.Position) int64 { + var total int64 + for _, file := range b { + switch gmysql.CompareBinlogFileName(file.name, fromFile.Name) { + case -1: + continue + case 1: + total += file.size + case 0: + if file.size > int64(fromFile.Pos) { + total += file.size - int64(fromFile.Pos) + } + } + } + + return total +} + +// SourceStatus collects all information of upstream. +type SourceStatus struct { + Location Location + Binlogs FileSizes + UpdateTime time.Time +} diff --git a/pkg/binlog/status_test.go b/pkg/binlog/status_test.go new file mode 100644 index 0000000000..df85ade1a3 --- /dev/null +++ b/pkg/binlog/status_test.go @@ -0,0 +1,98 @@ +package binlog + +import ( + "context" + + "github.com/DATA-DOG/go-sqlmock" + gmysql "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-sql-driver/mysql" + . "github.com/pingcap/check" +) + +var _ = Suite(&testStatusSuite{}) + +type testStatusSuite struct{} + +func (t *testStatusSuite) TestGetBinaryLogs(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + ctx := context.Background() + + cases := []struct { + rows *sqlmock.Rows + sizes FileSizes + }{ + { + sqlmock.NewRows([]string{"Log_name", "File_size"}). + AddRow("mysql-bin.000001", 52119). + AddRow("mysql-bin.000002", 114), + []binlogSize{ + { + "mysql-bin.000001", 52119, + }, + { + "mysql-bin.000002", 114, + }, + }, + }, + { + sqlmock.NewRows([]string{"Log_name", "File_size", "Encrypted"}). + AddRow("mysql-bin.000001", 52119, "No"). + AddRow("mysql-bin.000002", 114, "No"), + []binlogSize{ + { + "mysql-bin.000001", 52119, + }, + { + "mysql-bin.000002", 114, + }, + }, + }, + } + + for _, ca := range cases { + mock.ExpectQuery("SHOW BINARY LOGS").WillReturnRows(ca.rows) + sizes, err2 := GetBinaryLogs(ctx, db) + c.Assert(err2, IsNil) + c.Assert(sizes, DeepEquals, ca.sizes) + c.Assert(mock.ExpectationsWereMet(), IsNil) + } + + mock.ExpectQuery("SHOW BINARY LOGS").WillReturnError(&mysql.MySQLError{ + Number: 1227, + Message: "Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation", + }) + _, err2 := GetBinaryLogs(ctx, db) + c.Assert(err2, NotNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) +} + +func (t *testStatusSuite) TestBinlogSizesAfter(c *C) { + sizes := FileSizes{ + {name: "mysql-bin.999999", size: 1}, + {name: "mysql-bin.1000000", size: 2}, + {name: "mysql-bin.1000001", size: 4}, + } + + cases := []struct { + position gmysql.Position + expected int64 + }{ + { + gmysql.Position{Name: "mysql-bin.999999", Pos: 0}, + 7, + }, + { + gmysql.Position{Name: "mysql-bin.1000000", Pos: 1}, + 5, + }, + { + gmysql.Position{Name: "mysql-bin.1000001", Pos: 3}, + 1, + }, + } + + for _, ca := range cases { + c.Assert(sizes.After(ca.position), Equals, ca.expected) + } +} diff --git a/relay/relay.go b/relay/relay.go index 3a6430b8ab..a632485db5 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -87,8 +87,8 @@ type Process interface { Pause() // Error returns error message if having one Error() interface{} - // Status returns status of relay log process unit - Status(ctx context.Context) interface{} + // Status returns status of relay log process unit. + Status(sourceStatus *binlog.SourceStatus) interface{} // Close does some clean works Close() // IsClosed returns whether relay log process unit was closed @@ -894,32 +894,32 @@ func (r *Relay) Close() { } // Status implements the dm.Unit interface. -func (r *Relay) Status(ctx context.Context) interface{} { - masterPos, masterGTID, err := utils.GetMasterStatus(ctx, r.db.DB, r.cfg.Flavor) - if err != nil { - r.logger.Warn("get master status", zap.Error(err)) - } - +func (r *Relay) Status(sourceStatus *binlog.SourceStatus) interface{} { uuid, relayPos := r.meta.Pos() - _, relayGTIDSet := r.meta.GTID() + rs := &pb.RelayStatus{ - MasterBinlog: masterPos.String(), - RelaySubDir: uuid, - RelayBinlog: relayPos.String(), + RelaySubDir: uuid, + RelayBinlog: relayPos.String(), } - if masterGTID != nil { // masterGTID maybe a nil interface - rs.MasterBinlogGtid = masterGTID.String() - } - if relayGTIDSet != nil { + if _, relayGTIDSet := r.meta.GTID(); relayGTIDSet != nil { rs.RelayBinlogGtid = relayGTIDSet.String() } - if r.cfg.EnableGTID { - if masterGTID != nil && relayGTIDSet != nil && relayGTIDSet.Equal(masterGTID) { - rs.RelayCatchUpMaster = true + + if sourceStatus != nil { + masterPos, masterGTID := sourceStatus.Location.Position, sourceStatus.Location.GetGTID() + rs.MasterBinlog = masterPos.String() + if masterGTID != nil { // masterGTID maybe a nil interface + rs.MasterBinlogGtid = masterGTID.String() + } + + if r.cfg.EnableGTID { + // rely on sorted GTID set when String() + rs.RelayCatchUpMaster = rs.MasterBinlogGtid == rs.RelayBinlogGtid + } else { + rs.RelayCatchUpMaster = masterPos.Compare(relayPos) == 0 } - } else { - rs.RelayCatchUpMaster = masterPos.Compare(relayPos) == 0 } + return rs } diff --git a/syncer/dbconn/upstream_db.go b/syncer/dbconn/upstream_db.go index f6d343e602..982815ca64 100644 --- a/syncer/dbconn/upstream_db.go +++ b/syncer/dbconn/upstream_db.go @@ -15,7 +15,6 @@ package dbconn import ( "context" - "database/sql" "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/failpoint" @@ -33,20 +32,6 @@ import ( "github.com/pingcap/dm/pkg/utils" ) -// in MySQL, we can set `max_binlog_size` to control the max size of a binlog file. -// but this is not absolute: -// > A transaction is written in one chunk to the binary log, so it is never split between several binary logs. -// > Therefore, if you have big transactions, you might see binary log files larger than max_binlog_size. -// ref: https://dev.mysql.com/doc/refman/5.7/en/replication-options-binary-log.html#sysvar_max_binlog_size -// The max value of `max_binlog_size` is 1073741824 (1GB) -// but the actual file size still can be larger, and it may exceed the range of an uint32 -// so, if we use go-mysql.Position(with uint32 Pos) to store the binlog size, it may become out of range. -// ps, use go-mysql.Position to store a position of binlog event (position of the next event) is enough. -type binlogSize struct { - name string - size int64 -} - // UpStreamConn connect to upstream DB // Normally, we need to get some upstream information through some helper functions // these helper functions are all easy query functions, so we use a pool of connections here @@ -101,70 +86,9 @@ func (conn *UpStreamConn) FetchAllDoTables(ctx context.Context, bw *filter.Filte return utils.FetchAllDoTables(ctx, conn.BaseDB.DB, bw) } -// CountBinaryLogsSize returns the remaining size after given position. -func (conn *UpStreamConn) CountBinaryLogsSize(ctx context.Context, pos mysql.Position) (int64, error) { - return countBinaryLogsSize(ctx, pos, conn.BaseDB.DB) -} - // CloseUpstreamConn closes the UpStreamConn. func CloseUpstreamConn(tctx *tcontext.Context, conn *UpStreamConn) { if conn != nil { CloseBaseDB(tctx, conn.BaseDB) } } - -func countBinaryLogsSize(ctx context.Context, fromFile mysql.Position, db *sql.DB) (int64, error) { - files, err := getBinaryLogs(ctx, db) - if err != nil { - return 0, err - } - - var total int64 - for _, file := range files { - switch { - case file.name < fromFile.Name: - continue - case file.name > fromFile.Name: - total += file.size - case file.name == fromFile.Name: - if file.size > int64(fromFile.Pos) { - total += file.size - int64(fromFile.Pos) - } - } - } - - return total, nil -} - -func getBinaryLogs(ctx context.Context, db *sql.DB) ([]binlogSize, error) { - query := "SHOW BINARY LOGS" - rows, err := db.QueryContext(ctx, query) - if err != nil { - return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) - } - defer rows.Close() - - rowColumns, err := rows.Columns() - if err != nil { - return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) - } - files := make([]binlogSize, 0, 10) - for rows.Next() { - var file string - var pos int64 - var nullPtr interface{} - if len(rowColumns) == 2 { - err = rows.Scan(&file, &pos) - } else { - err = rows.Scan(&file, &pos, &nullPtr) - } - if err != nil { - return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) - } - files = append(files, binlogSize{name: file, size: pos}) - } - if rows.Err() != nil { - return nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError) - } - return files, nil -} diff --git a/syncer/dbconn/upstream_db_test.go b/syncer/dbconn/upstream_db_test.go index 6b84fc66b2..acd4717c25 100644 --- a/syncer/dbconn/upstream_db_test.go +++ b/syncer/dbconn/upstream_db_test.go @@ -19,7 +19,6 @@ import ( "fmt" "time" - "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" "github.com/google/uuid" . "github.com/pingcap/check" @@ -106,35 +105,3 @@ func (s *testDBSuite) TestGetServerUnixTS(c *C) { c.Assert(err, IsNil) c.Assert(id, Greater, int64(0)) } - -func (s *testDBSuite) TestBinaryLogs(c *C) { - ctx := context.Background() - files, err := getBinaryLogs(ctx, s.db) - c.Assert(err, IsNil) - c.Assert(files, Not(HasLen), 0) - - fileNum := len(files) - pos := mysql.Position{ - Name: files[fileNum-1].name, - Pos: 0, - } - - remainingSize, err := countBinaryLogsSize(ctx, pos, s.db) - c.Assert(err, IsNil) - c.Assert(remainingSize, Equals, files[fileNum-1].size) - - _, err = s.db.Exec("FLUSH BINARY LOGS") - c.Assert(err, IsNil) - files, err = getBinaryLogs(ctx, s.db) - c.Assert(err, IsNil) - c.Assert(files, HasLen, fileNum+1) - - pos = mysql.Position{ - Name: files[fileNum].name, - Pos: 0, - } - - remainingSize, err = countBinaryLogsSize(ctx, pos, s.db) - c.Assert(err, IsNil) - c.Assert(remainingSize, Equals, files[fileNum].size) -} diff --git a/syncer/status.go b/syncer/status.go index 5068f321ea..72c6de2e4a 100644 --- a/syncer/status.go +++ b/syncer/status.go @@ -22,27 +22,45 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/syncer/metrics" ) // Status implements Unit.Status // it returns status, but does not calc status. -func (s *Syncer) Status() interface{} { - total := s.count.Load() - totalTps := s.totalTps.Load() - tps := s.tps.Load() - +func (s *Syncer) Status(sourceStatus *binlog.SourceStatus) interface{} { syncerLocation := s.checkpoint.FlushedGlobalPoint() st := &pb.SyncStatus{ - TotalEvents: total, - TotalTps: totalTps, - RecentTps: tps, - SyncerBinlog: syncerLocation.Position.String(), + TotalEvents: s.count.Load(), + TotalTps: s.totalTps.Load(), + RecentTps: s.tps.Load(), + SyncerBinlog: syncerLocation.Position.String(), + SecondsBehindMaster: s.secondsBehindMaster.Load(), } if syncerLocation.GetGTID() != nil { st.SyncerBinlogGtid = syncerLocation.GetGTID().String() } + if sourceStatus != nil { + st.MasterBinlog = sourceStatus.Location.Position.String() + st.MasterBinlogGtid = sourceStatus.Location.GTIDSetStr() + + if s.cfg.EnableGTID { + // rely on sorted GTID set when String() + st.Synced = st.MasterBinlogGtid == st.SyncerBinlogGtid + } else { + syncRealPos, err := binlog.RealMySQLPos(syncerLocation.Position) + if err != nil { + s.tctx.L().Error("fail to parse real mysql position", + zap.Any("position", syncerLocation.Position), + log.ShortError(err)) + } + st.Synced = syncRealPos.Compare(sourceStatus.Location.Position) == 0 + } + } + st.BinlogType = "unknown" if s.streamerController != nil { st.BinlogType = binlogTypeToString(s.streamerController.GetBinlogType()) @@ -67,5 +85,73 @@ func (s *Syncer) Status() interface{} { time.Sleep(interval) } }) + go s.printStatus(sourceStatus) return st } + +func (s *Syncer) printStatus(sourceStatus *binlog.SourceStatus) { + if sourceStatus == nil { + // often happened when source status is not interested, such as in an unit test + return + } + now := time.Now() + s.lastTime.RLock() + seconds := now.Unix() - s.lastTime.t.Unix() + s.lastTime.RUnlock() + totalSeconds := now.Unix() - s.start.Unix() + last := s.lastCount.Load() + total := s.count.Load() + + totalBinlogSize := s.binlogSizeCount.Load() + lastBinlogSize := s.lastBinlogSizeCount.Load() + + tps, totalTps := int64(0), int64(0) + if seconds > 0 { + tps = (total - last) / seconds + totalTps = total / totalSeconds + + s.currentLocationMu.RLock() + currentLocation := s.currentLocationMu.currentLocation + s.currentLocationMu.RUnlock() + + remainingSize := sourceStatus.Binlogs.After(currentLocation.Position) + bytesPerSec := (totalBinlogSize - lastBinlogSize) / seconds + if bytesPerSec > 0 { + remainingSeconds := remainingSize / bytesPerSec + s.tctx.L().Info("binlog replication progress", + zap.Int64("total binlog size", totalBinlogSize), + zap.Int64("last binlog size", lastBinlogSize), + zap.Int64("cost time", seconds), + zap.Int64("bytes/Second", bytesPerSec), + zap.Int64("unsynced binlog size", remainingSize), + zap.Int64("estimate time to catch up", remainingSeconds)) + metrics.RemainingTimeGauge.WithLabelValues(s.cfg.Name, s.cfg.SourceID, s.cfg.WorkerName).Set(float64(remainingSeconds)) + } + } + + latestMasterPos := sourceStatus.Location.Position + latestMasterGTIDSet := sourceStatus.Location.GetGTID() + metrics.BinlogPosGauge.WithLabelValues("master", s.cfg.Name, s.cfg.SourceID).Set(float64(latestMasterPos.Pos)) + index, err := binlog.GetFilenameIndex(latestMasterPos.Name) + if err != nil { + s.tctx.L().Error("fail to parse binlog file", log.ShortError(err)) + } else { + metrics.BinlogFileGauge.WithLabelValues("master", s.cfg.Name, s.cfg.SourceID).Set(float64(index)) + } + + s.tctx.L().Info("binlog replication status", + zap.Int64("total_events", total), + zap.Int64("total_tps", totalTps), + zap.Int64("tps", tps), + zap.Stringer("master_position", latestMasterPos), + log.WrapStringerField("master_gtid", latestMasterGTIDSet), + zap.Stringer("checkpoint", s.checkpoint)) + + s.lastCount.Store(total) + s.lastBinlogSizeCount.Store(totalBinlogSize) + s.lastTime.Lock() + s.lastTime.t = time.Now() + s.lastTime.Unlock() + s.totalTps.Store(totalTps) + s.tps.Store(tps) +} diff --git a/syncer/syncer.go b/syncer/syncer.go index 61f00b8cf5..b178498a07 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -55,7 +55,6 @@ import ( "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" fr "github.com/pingcap/dm/pkg/func-rollback" - "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/ha" "github.com/pingcap/dm/pkg/log" parserpkg "github.com/pingcap/dm/pkg/parser" @@ -77,7 +76,6 @@ var ( retryTimeout = 3 * time.Second waitTime = 10 * time.Millisecond - statusTime = 30 * time.Second // MaxDDLConnectionTimeoutMinute also used by SubTask.ExecuteDDL. MaxDDLConnectionTimeoutMinute = 5 @@ -273,11 +271,6 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer { return syncer } -// GetSecondsBehindMaster returns secondsBehindMaster. -func (s *Syncer) GetSecondsBehindMaster() int64 { - return s.secondsBehindMaster.Load() -} - func (s *Syncer) newJobChans(count int) { s.closeJobChans() s.jobs = make([]chan *job, 0, count) @@ -707,10 +700,6 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { } } -func (s *Syncer) getMasterStatus(ctx context.Context) (mysql.Position, gtid.Set, error) { - return s.fromDB.GetMasterStatus(ctx, s.cfg.Flavor) -} - func (s *Syncer) getTable(tctx *tcontext.Context, origSchema, origTable, renamedSchema, renamedTable string) (*model.TableInfo, error) { ti, err := s.schemaTracker.GetTable(origSchema, origTable) if err == nil { @@ -1646,11 +1635,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.syncDDL(tctx, adminQueueName, s.ddlDBConn, s.jobs[s.cfg.WorkerCount]) }() - s.wg.Add(1) - go func() { - s.printStatus(runCtx) - }() - s.wg.Add(1) go func() { defer s.wg.Done() @@ -3035,107 +3019,6 @@ func (s *Syncer) loadTableStructureFromDump(ctx context.Context) error { return firstErr } -func (s *Syncer) printStatus(ctx context.Context) { - defer s.wg.Done() - - failpoint.Inject("PrintStatusCheckSeconds", func(val failpoint.Value) { - if seconds, ok := val.(int); ok { - statusTime = time.Duration(seconds) * time.Second - s.tctx.L().Info("set printStatusInterval", zap.Int("value", seconds), zap.String("failpoint", "PrintStatusCheckSeconds")) - } - }) - - timer := time.NewTicker(statusTime) - defer timer.Stop() - - var ( - err error - latestMasterPos mysql.Position - latestmasterGTIDSet gtid.Set - ) - - for { - select { - case <-ctx.Done(): - s.tctx.L().Info("print status routine exits", log.ShortError(ctx.Err())) - return - case <-timer.C: - now := time.Now() - s.lastTime.RLock() - seconds := now.Unix() - s.lastTime.t.Unix() - s.lastTime.RUnlock() - totalSeconds := now.Unix() - s.start.Unix() - last := s.lastCount.Load() - total := s.count.Load() - - totalBinlogSize := s.binlogSizeCount.Load() - lastBinlogSize := s.lastBinlogSizeCount.Load() - - tps, totalTps := int64(0), int64(0) - if seconds > 0 { - tps = (total - last) / seconds - totalTps = total / totalSeconds - - s.currentLocationMu.RLock() - currentLocation := s.currentLocationMu.currentLocation - s.currentLocationMu.RUnlock() - - ctx2, cancel2 := context.WithTimeout(ctx, utils.DefaultDBTimeout) - remainingSize, err2 := s.fromDB.CountBinaryLogsSize(ctx2, currentLocation.Position) - cancel2() - if err2 != nil { - // log the error, but still handle the rest operation - s.tctx.L().Error("fail to estimate unreplicated binlog size", zap.Error(err2)) - } else { - bytesPerSec := (totalBinlogSize - lastBinlogSize) / seconds - if bytesPerSec > 0 { - remainingSeconds := remainingSize / bytesPerSec - s.tctx.L().Info("binlog replication progress", - zap.Int64("total binlog size", totalBinlogSize), - zap.Int64("last binlog size", lastBinlogSize), - zap.Int64("cost time", seconds), - zap.Int64("bytes/Second", bytesPerSec), - zap.Int64("unsynced binlog size", remainingSize), - zap.Int64("estimate time to catch up", remainingSeconds)) - metrics.RemainingTimeGauge.WithLabelValues(s.cfg.Name, s.cfg.SourceID, s.cfg.WorkerName).Set(float64(remainingSeconds)) - } - } - } - - ctx2, cancel2 := context.WithTimeout(ctx, utils.DefaultDBTimeout) - latestMasterPos, latestmasterGTIDSet, err = s.getMasterStatus(ctx2) - cancel2() - if err != nil { - s.tctx.L().Error("fail to get master status", log.ShortError(err)) - } else { - metrics.BinlogPosGauge.WithLabelValues("master", s.cfg.Name, s.cfg.SourceID).Set(float64(latestMasterPos.Pos)) - index, err := binlog.GetFilenameIndex(latestMasterPos.Name) - if err != nil { - s.tctx.L().Error("fail to parse binlog file", log.ShortError(err)) - } else { - metrics.BinlogFileGauge.WithLabelValues("master", s.cfg.Name, s.cfg.SourceID).Set(float64(index)) - } - } - - s.tctx.L().Info("binlog replication status", - zap.Int64("total_events", total), - zap.Int64("total_tps", totalTps), - zap.Int64("tps", tps), - zap.Stringer("master_position", latestMasterPos), - log.WrapStringerField("master_gtid", latestmasterGTIDSet), - zap.Stringer("checkpoint", s.checkpoint)) - - s.lastCount.Store(total) - s.lastBinlogSizeCount.Store(totalBinlogSize) - s.lastTime.Lock() - s.lastTime.t = time.Now() - s.lastTime.Unlock() - s.totalTps.Store(totalTps) - s.tps.Store(tps) - } - } -} - func (s *Syncer) createDBs(ctx context.Context) error { var err error dbCfg := s.cfg.From diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index ea9aaf700c..c3b49a80d2 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1216,7 +1216,7 @@ func (s *testSyncerSuite) TestRun(c *C) { } executeSQLAndWait(len(expectJobs1)) - c.Assert(syncer.Status().(*pb.SyncStatus).TotalEvents, Equals, int64(0)) + c.Assert(syncer.Status(nil).(*pb.SyncStatus).TotalEvents, Equals, int64(0)) syncer.mockFinishJob(expectJobs1) testJobs.Lock() @@ -1276,9 +1276,9 @@ func (s *testSyncerSuite) TestRun(c *C) { } executeSQLAndWait(len(expectJobs2)) - c.Assert(syncer.Status().(*pb.SyncStatus).TotalEvents, Equals, int64(len(expectJobs1))) + c.Assert(syncer.Status(nil).(*pb.SyncStatus).TotalEvents, Equals, int64(len(expectJobs1))) syncer.mockFinishJob(expectJobs2) - c.Assert(syncer.Status().(*pb.SyncStatus).TotalEvents, Equals, int64(len(expectJobs1)+len(expectJobs2))) + c.Assert(syncer.Status(nil).(*pb.SyncStatus).TotalEvents, Equals, int64(len(expectJobs1)+len(expectJobs2))) testJobs.RLock() checkJobs(c, testJobs.jobs, expectJobs2) @@ -1434,7 +1434,7 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { } executeSQLAndWait(len(expectJobs)) - c.Assert(syncer.Status().(*pb.SyncStatus).TotalEvents, Equals, int64(0)) + c.Assert(syncer.Status(nil).(*pb.SyncStatus).TotalEvents, Equals, int64(0)) syncer.mockFinishJob(expectJobs) testJobs.Lock() diff --git a/tests/print_status/run.sh b/tests/print_status/run.sh index ceeeabdcca..0b2aa3f96d 100755 --- a/tests/print_status/run.sh +++ b/tests/print_status/run.sh @@ -14,8 +14,7 @@ function run() { # TableMapEvent, QueryEvent, GTIDEvent, and a specific Event in each group. # so we slow down 460 * 4 ms. Besides the log may be not flushed to disk asap, # we need to add some retry mechanism - inject_points=("github.com/pingcap/dm/loader/PrintStatusCheckSeconds=return(1)" - "github.com/pingcap/dm/syncer/PrintStatusCheckSeconds=return(1)" + inject_points=("github.com/pingcap/dm/dm/worker/PrintStatusCheckSeconds=return(1)" "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(100)" "github.com/pingcap/dm/syncer/ProcessBinlogSlowDown=sleep(4)") export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"