diff --git a/dm/worker/worker.go b/dm/worker/worker.go index fe53461c9b..0b8474cc7a 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -71,14 +71,29 @@ type Worker struct { } // NewWorker creates a new Worker -func NewWorker(cfg *Config) (*Worker, error) { - w := &Worker{ +func NewWorker(cfg *Config) (w *Worker, err error) { + w = &Worker{ cfg: cfg, relayHolder: NewRelayHolder(cfg), tracer: tracing.InitTracerHub(cfg.Tracer), subTaskHolder: newSubTaskHolder(), l: log.With(zap.String("component", "worker controller")), } + w.ctx, w.cancel = context.WithCancel(context.Background()) + + defer func(w2 *Worker) { + if err != nil { // when err != nil, `w` will become nil in this func, so we pass `w` in defer. + // release resources, NOTE: we need to refactor New/Init/Start/Close for components later. + w2.cancel() + w2.subTaskHolder.closeAllSubTasks() + if w2.meta != nil { + w2.meta.Close() + } + if w2.db != nil { + w2.db.Close() + } + } + }(w) // initial relay holder purger, err := w.relayHolder.Init([]purger.PurgeInterceptor{ @@ -107,16 +122,18 @@ func NewWorker(cfg *Config) (*Worker, error) { } // open kv db - w.db, err = openDB(dbDir, defaultKVConfig) + metaDB, err := openDB(dbDir, defaultKVConfig) if err != nil { return nil, err } + w.db = metaDB // initial metadata - w.meta, err = NewMetadata(dbDir, w.db) + meta, err := NewMetadata(dbDir, w.db) if err != nil { return nil, err } + w.meta = meta InitConditionHub(w) @@ -125,6 +142,18 @@ func NewWorker(cfg *Config) (*Worker, error) { return nil, err } + w.l.Info("initialized") + + return w, nil +} + +// Start starts working +func (w *Worker) Start() { + if w.closed.Get() == closedTrue { + w.l.Warn("already closed") + return + } + // start relay w.relayHolder.Start() @@ -141,20 +170,6 @@ func NewWorker(cfg *Config) (*Worker, error) { w.tracer.Start() } - w.ctx, w.cancel = context.WithCancel(context.Background()) - - w.l.Info("initialzed") - - return w, nil -} - -// Start starts working -func (w *Worker) Start() { - if w.closed.Get() == closedTrue { - w.l.Warn("already closed") - return - } - w.wg.Add(2) defer w.wg.Done() diff --git a/dm/worker/worker_test.go b/dm/worker/worker_test.go index 4a27734a4b..464d6edec5 100644 --- a/dm/worker/worker_test.go +++ b/dm/worker/worker_test.go @@ -108,7 +108,7 @@ func (t *testServer) testWorkerHandleTask(c *C) { w.handleTask() }() - c.Assert(utils.WaitSomething(5, 10*time.Millisecond, func() bool { + c.Assert(utils.WaitSomething(10, 100*time.Millisecond, func() bool { w.meta.Lock() defer w.meta.Unlock() return len(w.meta.logs) == 0 @@ -154,7 +154,7 @@ func (t *testServer) TestTaskAutoResume(c *C) { defer s.Close() c.Assert(s.Start(), IsNil) }() - c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool { + c.Assert(utils.WaitSomething(10, 100*time.Millisecond, func() bool { return !s.closed.Get() }), IsTrue) @@ -165,7 +165,7 @@ func (t *testServer) TestTaskAutoResume(c *C) { c.Assert(err, IsNil) // check task in paused state - c.Assert(utils.WaitSomething(10, 10*time.Millisecond, func() bool { + c.Assert(utils.WaitSomething(10, 100*time.Millisecond, func() bool { for _, st := range s.worker.QueryStatus(taskName) { if st.Name == taskName && st.Stage == pb.Stage_Paused { return true