Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker: fix panic in handle task #225

Merged
merged 3 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (t *testServer) TestServer(c *C) {

// test worker, just make sure testing sort
t.testWorker(c)
t.testWorkerHandleTask(c)
}

func (t *testServer) testHTTPInterface(c *C, uri string) {
Expand Down
16 changes: 14 additions & 2 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/siddontang/go/sync2"
"github.com/syndtr/goleveldb/leveldb"
"go.uber.org/zap"
Expand Down Expand Up @@ -707,7 +708,14 @@ func (w *Worker) restoreSubTask() error {
var maxRetryCount = 10

func (w *Worker) handleTask() {
ticker := time.NewTicker(time.Second)
var handleTaskInterval = time.Second
failpoint.Inject("handleTaskInternal", func(val failpoint.Value) {
if milliseconds, ok := val.(int); ok {
handleTaskInterval = time.Duration(milliseconds) * time.Millisecond
w.l.Info("set handleTaskInterval", zap.String("failpoint", "handleTaskInternal"), zap.Int("value", milliseconds))
}
})
ticker := time.NewTicker(handleTaskInterval)
defer ticker.Stop()

retryCnt := 0
Expand Down Expand Up @@ -826,7 +834,11 @@ Loop:
// fill current task config
if len(opLog.Task.Task) == 0 {
tm := w.meta.GetTask(opLog.Task.Name)
opLog.Task.Task = append([]byte{}, tm.Task...)
if tm == nil {
w.l.Warn("task meta not found", zap.String("task", opLog.Task.Name))
} else {
opLog.Task.Task = append([]byte{}, tm.Task...)
}
}

err = w.meta.MarkOperation(opLog)
Expand Down
49 changes: 49 additions & 0 deletions dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@
package worker

import (
"sync"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/utils"
)

var emptyWorkerStatusInfoJSONLength = 25
Expand Down Expand Up @@ -65,3 +70,47 @@ func (t *testServer) testWorker(c *C) {
_, err = w.OperateSubTask("testSubTask", pb.TaskOp_Stop)
c.Assert(err, ErrorMatches, "worker already closed.*")
}

func (t *testServer) testWorkerHandleTask(c *C) {
var (
wg sync.WaitGroup
taskName = "test"
)

NewRelayHolder = NewDummyRelayHolder
dir := c.MkDir()
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil)
cfg.RelayDir = dir
cfg.MetaDir = dir
w, err := NewWorker(cfg)
c.Assert(err, IsNil)

tasks := []*pb.TaskMeta{
{Op: pb.TaskOp_Stop, Name: taskName, Stage: pb.Stage_New},
{Op: pb.TaskOp_Pause, Name: taskName, Stage: pb.Stage_New},
{Op: pb.TaskOp_Resume, Name: taskName, Stage: pb.Stage_New},
}
for _, task := range tasks {
_, err := w.meta.AppendOperation(task)
c.Assert(err, IsNil)
}
c.Assert(len(w.meta.logs), Equals, len(tasks))

c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/worker/handleTaskInternal", `return(10)`), IsNil)
defer failpoint.Disable("github.com/pingcap/dm/dm/worker/handleTaskInternal")
wg.Add(1)
go func() {
defer wg.Done()
w.handleTask()
}()

c.Assert(utils.WaitSomething(5, 10*time.Millisecond, func() bool {
w.meta.Lock()
defer w.meta.Unlock()
return len(w.meta.logs) == 0
}), IsTrue)

w.Close()
wg.Wait()
}