diff --git a/dm/worker/metrics.go b/dm/worker/metrics.go index 532a61e913..33d80a9a0d 100644 --- a/dm/worker/metrics.go +++ b/dm/worker/metrics.go @@ -34,6 +34,11 @@ import ( "github.com/pingcap/dm/syncer" ) +const ( + opErrTypeBeforeOp = "BeforeAnyOp" + opErrTypeSourceBound = "SourceBound" +) + var ( taskState = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ @@ -41,7 +46,16 @@ var ( Subsystem: "worker", Name: "task_state", Help: "state of task, 0 - invalidStage, 1 - New, 2 - Running, 3 - Paused, 4 - Stopped, 5 - Finished", - }, []string{"task"}) + }, []string{"task", "source_id"}) + + // opErrCounter cleans on worker close, which is the same time dm-worker exits, so no explicit clean + opErrCounter = metricsproxy.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "dm", + Subsystem: "worker", + Name: "operate_error", + Help: "number of different operate error", + }, []string{"worker", "type"}) cpuUsageGauge = prometheus.NewGauge( prometheus.GaugeOpts{ @@ -93,6 +107,7 @@ func RegistryMetrics() { registry.MustRegister(prometheus.NewGoCollector()) registry.MustRegister(taskState) + registry.MustRegister(opErrCounter) registry.MustRegister(cpuUsageGauge) relay.RegisterMetrics(registry) @@ -123,6 +138,6 @@ func InitStatus(lis net.Listener) { } } -func (st *SubTask) removeLabelValuesWithTaskInMetrics(task string) { - taskState.DeleteAllAboutLabels(prometheus.Labels{"task": task}) +func (st *SubTask) removeLabelValuesWithTaskInMetrics(task string, source string) { + taskState.DeleteAllAboutLabels(prometheus.Labels{"task": task, "source_id": source}) } diff --git a/dm/worker/server.go b/dm/worker/server.go index 73afe8b9ec..1f97488505 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -348,7 +348,7 @@ func (s *Server) handleSourceBound(ctx context.Context, boundCh chan ha.SourceBo s.setSourceStatus(bound.Source, err, true) if err != nil { // record the reason for operating source bound - // TODO: add better metrics + opErrCounter.WithLabelValues(s.cfg.Name, opErrTypeSourceBound).Inc() log.L().Error("fail to operate sourceBound on worker", zap.String("worker", s.cfg.Name), zap.Stringer("bound", bound), zap.Error(err)) if etcdutil.IsRetryableError(err) { @@ -743,7 +743,7 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error { log.L().Info("start worker", zap.String("sourceCfg", cfg.String()), zap.Reflect("subTasks", subTaskCfgs)) - w, err := NewWorker(cfg, s.etcdClient) + w, err := NewWorker(cfg, s.etcdClient, s.cfg.Name) if err != nil { return err } diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index 65f8a89e6a..9565056167 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -427,7 +427,7 @@ func (t *testServer) TestQueryError(c *C) { sourceCfg := loadSourceConfigWithoutPassword(c) sourceCfg.EnableRelay = false - w, err := NewWorker(&sourceCfg, nil) + w, err := NewWorker(&sourceCfg, nil, "") c.Assert(err, IsNil) w.closed.Set(closedFalse) diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 1c099f89c0..1104a042ec 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -111,7 +111,7 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient * cancel: cancel, etcdClient: etcdClient, } - taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage)) + taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage)) return &st } @@ -371,7 +371,7 @@ func (st *SubTask) setStage(stage pb.Stage) { st.Lock() defer st.Unlock() st.stage = stage - taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage)) + taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage)) } // stageCAS sets stage to newStage if its current value is oldStage @@ -381,7 +381,7 @@ func (st *SubTask) stageCAS(oldStage, newStage pb.Stage) bool { if st.stage == oldStage { st.stage = newStage - taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage)) + taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage)) return true } return false @@ -393,7 +393,7 @@ func (st *SubTask) setStageIfNot(oldStage, newStage pb.Stage) bool { defer st.Unlock() if st.stage != oldStage { st.stage = newStage - taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage)) + taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage)) return true } return false @@ -429,7 +429,7 @@ func (st *SubTask) Close() { st.cancel() st.closeUnits() // close all un-closed units - st.removeLabelValuesWithTaskInMetrics(st.cfg.Name) + st.removeLabelValuesWithTaskInMetrics(st.cfg.Name, st.cfg.SourceID) st.wg.Wait() st.setStageIfNot(pb.Stage_Finished, pb.Stage_Stopped) } diff --git a/dm/worker/task_checker_test.go b/dm/worker/task_checker_test.go index 835af6edda..c65274d996 100644 --- a/dm/worker/task_checker_test.go +++ b/dm/worker/task_checker_test.go @@ -90,7 +90,7 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { cfg := loadSourceConfigWithoutPassword(c) cfg.RelayDir = dir cfg.MetaDir = dir - w, err := NewWorker(&cfg, nil) + w, err := NewWorker(&cfg, nil, "") c.Assert(err, check.IsNil) w.closed.Set(closedFalse) @@ -207,7 +207,7 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) { cfg := loadSourceConfigWithoutPassword(c) cfg.RelayDir = dir cfg.MetaDir = dir - w, err := NewWorker(&cfg, nil) + w, err := NewWorker(&cfg, nil, "") c.Assert(err, check.IsNil) w.closed.Set(closedFalse) diff --git a/dm/worker/worker.go b/dm/worker/worker.go index b032660093..bcb5ee0031 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -66,16 +66,19 @@ type Worker struct { configFile string etcdClient *clientv3.Client + + name string } // NewWorker creates a new Worker -func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client) (w *Worker, err error) { +func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client, name string) (w *Worker, err error) { w = &Worker{ cfg: cfg, tracer: tracing.InitTracerHub(cfg.Tracer), subTaskHolder: newSubTaskHolder(), l: log.With(zap.String("component", "worker controller")), etcdClient: etcdClient, + name: name, } w.ctx, w.cancel = context.WithCancel(context.Background()) w.closed.Set(closedTrue) @@ -330,11 +333,12 @@ func (w *Worker) resetSubtaskStage(etcdCli *clientv3.Client) (int64, error) { // TODO: right operation sequences may get error when we get etcdErrCompact, need to handle it later // For example, Expect: Running -(pause)-> Paused -(resume)-> Running // we get an etcd compact error at the first running. If we try to "resume" it now, we will get an error - err = w.operateSubTaskStage(stage, subtaskCfg) + opType, err := w.operateSubTaskStage(stage, subtaskCfg) if err != nil { - // TODO: add better metrics + opErrCounter.WithLabelValues(w.name, opType).Inc() log.L().Error("fail to operate subtask stage", zap.Stringer("stage", stage), zap.String("task", subtaskCfg.Name), zap.Error(err)) + } delete(sts, name) } @@ -343,7 +347,7 @@ func (w *Worker) resetSubtaskStage(etcdCli *clientv3.Client) (int64, error) { for name := range sts { err = w.OperateSubTask(name, pb.TaskOp_Stop) if err != nil { - // TODO: add better metrics + opErrCounter.WithLabelValues(w.name, pb.TaskOp_Stop.String()).Inc() log.L().Error("fail to stop subtask", zap.String("task", name), zap.Error(err)) } } @@ -404,9 +408,9 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage, log.L().Info("worker is closed, handleSubTaskStage will quit now") return nil case stage := <-stageCh: - err := w.operateSubTaskStageWithoutConfig(stage) + opType, err := w.operateSubTaskStageWithoutConfig(stage) if err != nil { - // TODO: add better metrics + opErrCounter.WithLabelValues(w.name, opType).Inc() log.L().Error("fail to operate subtask stage", zap.Stringer("stage", stage), zap.Error(err)) if etcdutil.IsRetryableError(err) { return err @@ -422,14 +426,16 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage, } } -func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskConfig) error { +// operateSubTaskStage returns TaskOp.String() additionally to record metrics +func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskConfig) (string, error) { var op pb.TaskOp switch { case stage.Expect == pb.Stage_Running: if st := w.subTaskHolder.findSubTask(stage.Task); st == nil { w.StartSubTask(&subTaskCfg) log.L().Info("load subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) - return nil + // error is nil, opErrTypeBeforeOp will be ignored + return opErrTypeBeforeOp, nil } op = pb.TaskOp_Resume case stage.Expect == pb.Stage_Paused: @@ -437,21 +443,22 @@ func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskCo case stage.IsDeleted: op = pb.TaskOp_Stop } - return w.OperateSubTask(stage.Task, op) + return op.String(), w.OperateSubTask(stage.Task, op) } -func (w *Worker) operateSubTaskStageWithoutConfig(stage ha.Stage) error { +// operateSubTaskStageWithoutConfig returns TaskOp additionally to record metrics +func (w *Worker) operateSubTaskStageWithoutConfig(stage ha.Stage) (string, error) { var subTaskCfg config.SubTaskConfig if stage.Expect == pb.Stage_Running { if st := w.subTaskHolder.findSubTask(stage.Task); st == nil { tsm, _, err := ha.GetSubTaskCfg(w.etcdClient, stage.Source, stage.Task, stage.Revision) if err != nil { // TODO: need retry - return terror.Annotate(err, "fail to get subtask config from etcd") + return opErrTypeBeforeOp, terror.Annotate(err, "fail to get subtask config from etcd") } var ok bool if subTaskCfg, ok = tsm[stage.Task]; !ok { - return terror.ErrWorkerFailToGetSubtaskConfigFromEtcd.Generate(stage.Task) + return opErrTypeBeforeOp, terror.ErrWorkerFailToGetSubtaskConfigFromEtcd.Generate(stage.Task) } } } @@ -495,9 +502,9 @@ func (w *Worker) observeRelayStage(ctx context.Context, etcdCli *clientv3.Client if stage.IsEmpty() { stage.IsDeleted = true } - err1 = w.operateRelayStage(ctx, stage) + opType, err1 := w.operateRelayStage(ctx, stage) if err1 != nil { - // TODO: add better metrics + opErrCounter.WithLabelValues(w.name, opType).Inc() log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err1)) } } @@ -521,9 +528,9 @@ func (w *Worker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, er log.L().Info("worker is closed, handleRelayStage will quit now") return nil case stage := <-stageCh: - err := w.operateRelayStage(ctx, stage) + opType, err := w.operateRelayStage(ctx, stage) if err != nil { - // TODO: add better metrics + opErrCounter.WithLabelValues(w.name, opType).Inc() log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err)) } case err := <-errCh: @@ -535,14 +542,16 @@ func (w *Worker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, er } } -func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) error { +// operateRelayStage returns RelayOp.String() additionally to record metrics +// *RelayOp is nil only when error is nil, so record on error will not meet nil-pointer deference +func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) (string, error) { var op pb.RelayOp switch { case stage.Expect == pb.Stage_Running: if w.relayHolder.Stage() == pb.Stage_New { w.relayHolder.Start() w.relayPurger.Start() - return nil + return opErrTypeBeforeOp, nil } op = pb.RelayOp_ResumeRelay case stage.Expect == pb.Stage_Paused: @@ -550,7 +559,7 @@ func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) error { case stage.IsDeleted: op = pb.RelayOp_StopRelay } - return w.OperateRelay(ctx, &pb.OperateRelayRequest{Op: op}) + return op.String(), w.OperateRelay(ctx, &pb.OperateRelayRequest{Op: op}) } // HandleSQLs implements Handler.HandleSQLs. diff --git a/dm/worker/worker_test.go b/dm/worker/worker_test.go index a7a9d88e2b..e0a77daaee 100644 --- a/dm/worker/worker_test.go +++ b/dm/worker/worker_test.go @@ -48,11 +48,11 @@ func (t *testServer) testWorker(c *C) { NewRelayHolder = NewRealRelayHolder }() - _, err := NewWorker(&cfg, nil) + _, err := NewWorker(&cfg, nil, "") c.Assert(err, ErrorMatches, "init error") NewRelayHolder = NewDummyRelayHolder - w, err := NewWorker(&cfg, nil) + w, err := NewWorker(&cfg, nil, "") c.Assert(err, IsNil) c.Assert(w.StatusJSON(""), HasLen, emptyWorkerStatusInfoJSONLength) //c.Assert(w.closed.Get(), Equals, closedFalse) @@ -226,7 +226,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) { sourceCfg.EnableRelay = false // step 1: start worker - w, err := NewWorker(&sourceCfg, etcdCli) + w, err := NewWorker(&sourceCfg, etcdCli, "") c.Assert(err, IsNil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -339,7 +339,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) { sourceCfg.MetaDir = c.MkDir() // step 1: start worker - w, err := NewWorker(&sourceCfg, etcdCli) + w, err := NewWorker(&sourceCfg, etcdCli, "") c.Assert(err, IsNil) ctx, cancel := context.WithCancel(context.Background()) defer cancel()