diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index 5dead67dce..051339ab46 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -132,6 +132,7 @@ func (t *testServer) TestServer(c *C) { // test worker, just make sure testing sort t.testWorker(c) + t.testWorkerHandleTask(c) } func (t *testServer) testHTTPInterface(c *C, uri string) { diff --git a/dm/worker/worker.go b/dm/worker/worker.go index c35f7016ec..f0191ba7ab 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/siddontang/go/sync2" "github.com/syndtr/goleveldb/leveldb" "go.uber.org/zap" @@ -707,7 +708,14 @@ func (w *Worker) restoreSubTask() error { var maxRetryCount = 10 func (w *Worker) handleTask() { - ticker := time.NewTicker(time.Second) + var handleTaskInterval = time.Second + failpoint.Inject("handleTaskInternal", func(val failpoint.Value) { + if milliseconds, ok := val.(int); ok { + handleTaskInterval = time.Duration(milliseconds) * time.Millisecond + w.l.Info("set handleTaskInterval", zap.String("failpoint", "handleTaskInternal"), zap.Int("value", milliseconds)) + } + }) + ticker := time.NewTicker(handleTaskInterval) defer ticker.Stop() retryCnt := 0 @@ -826,7 +834,11 @@ Loop: // fill current task config if len(opLog.Task.Task) == 0 { tm := w.meta.GetTask(opLog.Task.Name) - opLog.Task.Task = append([]byte{}, tm.Task...) + if tm == nil { + w.l.Warn("task meta not found", zap.String("task", opLog.Task.Name)) + } else { + opLog.Task.Task = append([]byte{}, tm.Task...) + } } err = w.meta.MarkOperation(opLog) diff --git a/dm/worker/worker_test.go b/dm/worker/worker_test.go index fb679742da..bd11b57f16 100644 --- a/dm/worker/worker_test.go +++ b/dm/worker/worker_test.go @@ -14,10 +14,15 @@ package worker import ( + "sync" + "time" + . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/utils" ) var emptyWorkerStatusInfoJSONLength = 25 @@ -65,3 +70,47 @@ func (t *testServer) testWorker(c *C) { _, err = w.OperateSubTask("testSubTask", pb.TaskOp_Stop) c.Assert(err, ErrorMatches, "worker already closed.*") } + +func (t *testServer) testWorkerHandleTask(c *C) { + var ( + wg sync.WaitGroup + taskName = "test" + ) + + NewRelayHolder = NewDummyRelayHolder + dir := c.MkDir() + cfg := NewConfig() + c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil) + cfg.RelayDir = dir + cfg.MetaDir = dir + w, err := NewWorker(cfg) + c.Assert(err, IsNil) + + tasks := []*pb.TaskMeta{ + {Op: pb.TaskOp_Stop, Name: taskName, Stage: pb.Stage_New}, + {Op: pb.TaskOp_Pause, Name: taskName, Stage: pb.Stage_New}, + {Op: pb.TaskOp_Resume, Name: taskName, Stage: pb.Stage_New}, + } + for _, task := range tasks { + _, err := w.meta.AppendOperation(task) + c.Assert(err, IsNil) + } + c.Assert(len(w.meta.logs), Equals, len(tasks)) + + c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/worker/handleTaskInternal", `return(10)`), IsNil) + defer failpoint.Disable("github.com/pingcap/dm/dm/worker/handleTaskInternal") + wg.Add(1) + go func() { + defer wg.Done() + w.handleTask() + }() + + c.Assert(utils.WaitSomething(5, 10*time.Millisecond, func() bool { + w.meta.Lock() + defer w.meta.Unlock() + return len(w.meta.logs) == 0 + }), IsTrue) + + w.Close() + wg.Wait() +}