From 97f8db1af851bf1d6bf3fd9e2d010c574b7464ee Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 1 Jul 2020 13:42:08 +0800 Subject: [PATCH 1/5] worker: add metrics --- dm/worker/metrics.go | 16 ++++++++- dm/worker/server.go | 4 +-- dm/worker/server_test.go | 2 +- dm/worker/subtask.go | 8 ++--- dm/worker/task_checker_test.go | 4 +-- dm/worker/worker.go | 62 +++++++++++++++++++++++----------- dm/worker/worker_test.go | 8 ++--- 7 files changed, 71 insertions(+), 33 deletions(-) diff --git a/dm/worker/metrics.go b/dm/worker/metrics.go index 532a61e913..51cf424bd9 100644 --- a/dm/worker/metrics.go +++ b/dm/worker/metrics.go @@ -34,6 +34,11 @@ import ( "github.com/pingcap/dm/syncer" ) +const ( + opErrTypeBeforeOp = "BeforeAnyOp" + opErrTypeSourceBound = "SourceBound" +) + var ( taskState = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ @@ -41,7 +46,15 @@ var ( Subsystem: "worker", Name: "task_state", Help: "state of task, 0 - invalidStage, 1 - New, 2 - Running, 3 - Paused, 4 - Stopped, 5 - Finished", - }, []string{"task"}) + }, []string{"task", "source_id"}) + + opErrCounter = metricsproxy.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "dm", + Subsystem: "worker", + Name: "operate_error", + Help: "number of different oprate error", + }, []string{"worker", "type"}) cpuUsageGauge = prometheus.NewGauge( prometheus.GaugeOpts{ @@ -93,6 +106,7 @@ func RegistryMetrics() { registry.MustRegister(prometheus.NewGoCollector()) registry.MustRegister(taskState) + registry.MustRegister(opErrCounter) registry.MustRegister(cpuUsageGauge) relay.RegisterMetrics(registry) diff --git a/dm/worker/server.go b/dm/worker/server.go index 73afe8b9ec..1f97488505 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -348,7 +348,7 @@ func (s *Server) handleSourceBound(ctx context.Context, boundCh chan ha.SourceBo s.setSourceStatus(bound.Source, err, true) if err != nil { // record the reason for operating source bound - // TODO: add better metrics + opErrCounter.WithLabelValues(s.cfg.Name, opErrTypeSourceBound).Inc() log.L().Error("fail to operate sourceBound on worker", zap.String("worker", s.cfg.Name), zap.Stringer("bound", bound), zap.Error(err)) if etcdutil.IsRetryableError(err) { @@ -743,7 +743,7 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error { log.L().Info("start worker", zap.String("sourceCfg", cfg.String()), zap.Reflect("subTasks", subTaskCfgs)) - w, err := NewWorker(cfg, s.etcdClient) + w, err := NewWorker(cfg, s.etcdClient, s.cfg.Name) if err != nil { return err } diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index 65f8a89e6a..9565056167 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -427,7 +427,7 @@ func (t *testServer) TestQueryError(c *C) { sourceCfg := loadSourceConfigWithoutPassword(c) sourceCfg.EnableRelay = false - w, err := NewWorker(&sourceCfg, nil) + w, err := NewWorker(&sourceCfg, nil, "") c.Assert(err, IsNil) w.closed.Set(closedFalse) diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 1c099f89c0..555529ae7c 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -111,7 +111,7 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient * cancel: cancel, etcdClient: etcdClient, } - taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage)) + taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage)) return &st } @@ -371,7 +371,7 @@ func (st *SubTask) setStage(stage pb.Stage) { st.Lock() defer st.Unlock() st.stage = stage - taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage)) + taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage)) } // stageCAS sets stage to newStage if its current value is oldStage @@ -381,7 +381,7 @@ func (st *SubTask) stageCAS(oldStage, newStage pb.Stage) bool { if st.stage == oldStage { st.stage = newStage - taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage)) + taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage)) return true } return false @@ -393,7 +393,7 @@ func (st *SubTask) setStageIfNot(oldStage, newStage pb.Stage) bool { defer st.Unlock() if st.stage != oldStage { st.stage = newStage - taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage)) + taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage)) return true } return false diff --git a/dm/worker/task_checker_test.go b/dm/worker/task_checker_test.go index 835af6edda..c65274d996 100644 --- a/dm/worker/task_checker_test.go +++ b/dm/worker/task_checker_test.go @@ -90,7 +90,7 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { cfg := loadSourceConfigWithoutPassword(c) cfg.RelayDir = dir cfg.MetaDir = dir - w, err := NewWorker(&cfg, nil) + w, err := NewWorker(&cfg, nil, "") c.Assert(err, check.IsNil) w.closed.Set(closedFalse) @@ -207,7 +207,7 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) { cfg := loadSourceConfigWithoutPassword(c) cfg.RelayDir = dir cfg.MetaDir = dir - w, err := NewWorker(&cfg, nil) + w, err := NewWorker(&cfg, nil, "") c.Assert(err, check.IsNil) w.closed.Set(closedFalse) diff --git a/dm/worker/worker.go b/dm/worker/worker.go index b032660093..51844357eb 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -66,16 +66,19 @@ type Worker struct { configFile string etcdClient *clientv3.Client + + name string } // NewWorker creates a new Worker -func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client) (w *Worker, err error) { +func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client, name string) (w *Worker, err error) { w = &Worker{ cfg: cfg, tracer: tracing.InitTracerHub(cfg.Tracer), subTaskHolder: newSubTaskHolder(), l: log.With(zap.String("component", "worker controller")), etcdClient: etcdClient, + name: name, } w.ctx, w.cancel = context.WithCancel(context.Background()) w.closed.Set(closedTrue) @@ -330,11 +333,16 @@ func (w *Worker) resetSubtaskStage(etcdCli *clientv3.Client) (int64, error) { // TODO: right operation sequences may get error when we get etcdErrCompact, need to handle it later // For example, Expect: Running -(pause)-> Paused -(resume)-> Running // we get an etcd compact error at the first running. If we try to "resume" it now, we will get an error - err = w.operateSubTaskStage(stage, subtaskCfg) + op, err := w.operateSubTaskStage(stage, subtaskCfg) if err != nil { - // TODO: add better metrics + opType := opErrTypeBeforeOp + if op != nil { + opType = op.String() + } + opErrCounter.WithLabelValues(w.name, opType).Inc() log.L().Error("fail to operate subtask stage", zap.Stringer("stage", stage), zap.String("task", subtaskCfg.Name), zap.Error(err)) + } delete(sts, name) } @@ -343,7 +351,7 @@ func (w *Worker) resetSubtaskStage(etcdCli *clientv3.Client) (int64, error) { for name := range sts { err = w.OperateSubTask(name, pb.TaskOp_Stop) if err != nil { - // TODO: add better metrics + opErrCounter.WithLabelValues(w.name, pb.TaskOp_Stop.String()).Inc() log.L().Error("fail to stop subtask", zap.String("task", name), zap.Error(err)) } } @@ -404,9 +412,13 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage, log.L().Info("worker is closed, handleSubTaskStage will quit now") return nil case stage := <-stageCh: - err := w.operateSubTaskStageWithoutConfig(stage) + op, err := w.operateSubTaskStageWithoutConfig(stage) if err != nil { - // TODO: add better metrics + opType := opErrTypeBeforeOp + if op != nil { + opType = op.String() + } + opErrCounter.WithLabelValues(w.name, opType).Inc() log.L().Error("fail to operate subtask stage", zap.Stringer("stage", stage), zap.Error(err)) if etcdutil.IsRetryableError(err) { return err @@ -422,14 +434,15 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage, } } -func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskConfig) error { +// operateSubTaskStage returns TaskOp additionally to record metrics +func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskConfig) (*pb.TaskOp, error) { var op pb.TaskOp switch { case stage.Expect == pb.Stage_Running: if st := w.subTaskHolder.findSubTask(stage.Task); st == nil { w.StartSubTask(&subTaskCfg) log.L().Info("load subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) - return nil + return nil, nil } op = pb.TaskOp_Resume case stage.Expect == pb.Stage_Paused: @@ -437,21 +450,22 @@ func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskCo case stage.IsDeleted: op = pb.TaskOp_Stop } - return w.OperateSubTask(stage.Task, op) + return &op, w.OperateSubTask(stage.Task, op) } -func (w *Worker) operateSubTaskStageWithoutConfig(stage ha.Stage) error { +// operateSubTaskStageWithoutConfig returns TaskOp additionally to record metrics +func (w *Worker) operateSubTaskStageWithoutConfig(stage ha.Stage) (*pb.TaskOp, error) { var subTaskCfg config.SubTaskConfig if stage.Expect == pb.Stage_Running { if st := w.subTaskHolder.findSubTask(stage.Task); st == nil { tsm, _, err := ha.GetSubTaskCfg(w.etcdClient, stage.Source, stage.Task, stage.Revision) if err != nil { // TODO: need retry - return terror.Annotate(err, "fail to get subtask config from etcd") + return nil, terror.Annotate(err, "fail to get subtask config from etcd") } var ok bool if subTaskCfg, ok = tsm[stage.Task]; !ok { - return terror.ErrWorkerFailToGetSubtaskConfigFromEtcd.Generate(stage.Task) + return nil, terror.ErrWorkerFailToGetSubtaskConfigFromEtcd.Generate(stage.Task) } } } @@ -495,9 +509,13 @@ func (w *Worker) observeRelayStage(ctx context.Context, etcdCli *clientv3.Client if stage.IsEmpty() { stage.IsDeleted = true } - err1 = w.operateRelayStage(ctx, stage) + op, err1 := w.operateRelayStage(ctx, stage) if err1 != nil { - // TODO: add better metrics + opType := opErrTypeBeforeOp + if op != nil { + opType = op.String() + } + opErrCounter.WithLabelValues(w.name, opType).Inc() log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err1)) } } @@ -521,9 +539,13 @@ func (w *Worker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, er log.L().Info("worker is closed, handleRelayStage will quit now") return nil case stage := <-stageCh: - err := w.operateRelayStage(ctx, stage) + op, err := w.operateRelayStage(ctx, stage) if err != nil { - // TODO: add better metrics + opType := opErrTypeBeforeOp + if op != nil { + opType = op.String() + } + opErrCounter.WithLabelValues(w.name, opType).Inc() log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err)) } case err := <-errCh: @@ -535,14 +557,16 @@ func (w *Worker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, er } } -func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) error { +// operateRelayStage returns RelayOp additionally to record metrics +// *RelayOp is nil only when error is nil, so record on error will not meet nil-pointer deference +func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) (*pb.RelayOp, error) { var op pb.RelayOp switch { case stage.Expect == pb.Stage_Running: if w.relayHolder.Stage() == pb.Stage_New { w.relayHolder.Start() w.relayPurger.Start() - return nil + return nil, nil } op = pb.RelayOp_ResumeRelay case stage.Expect == pb.Stage_Paused: @@ -550,7 +574,7 @@ func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) error { case stage.IsDeleted: op = pb.RelayOp_StopRelay } - return w.OperateRelay(ctx, &pb.OperateRelayRequest{Op: op}) + return &op, w.OperateRelay(ctx, &pb.OperateRelayRequest{Op: op}) } // HandleSQLs implements Handler.HandleSQLs. diff --git a/dm/worker/worker_test.go b/dm/worker/worker_test.go index a7a9d88e2b..e0a77daaee 100644 --- a/dm/worker/worker_test.go +++ b/dm/worker/worker_test.go @@ -48,11 +48,11 @@ func (t *testServer) testWorker(c *C) { NewRelayHolder = NewRealRelayHolder }() - _, err := NewWorker(&cfg, nil) + _, err := NewWorker(&cfg, nil, "") c.Assert(err, ErrorMatches, "init error") NewRelayHolder = NewDummyRelayHolder - w, err := NewWorker(&cfg, nil) + w, err := NewWorker(&cfg, nil, "") c.Assert(err, IsNil) c.Assert(w.StatusJSON(""), HasLen, emptyWorkerStatusInfoJSONLength) //c.Assert(w.closed.Get(), Equals, closedFalse) @@ -226,7 +226,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) { sourceCfg.EnableRelay = false // step 1: start worker - w, err := NewWorker(&sourceCfg, etcdCli) + w, err := NewWorker(&sourceCfg, etcdCli, "") c.Assert(err, IsNil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -339,7 +339,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) { sourceCfg.MetaDir = c.MkDir() // step 1: start worker - w, err := NewWorker(&sourceCfg, etcdCli) + w, err := NewWorker(&sourceCfg, etcdCli, "") c.Assert(err, IsNil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 9d4059bf36b00ad2b8c5c8796d3306dab0632534 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 1 Jul 2020 13:55:04 +0800 Subject: [PATCH 2/5] worker: fix typo --- dm/worker/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/worker/metrics.go b/dm/worker/metrics.go index 51cf424bd9..6eea6a71d7 100644 --- a/dm/worker/metrics.go +++ b/dm/worker/metrics.go @@ -53,7 +53,7 @@ var ( Namespace: "dm", Subsystem: "worker", Name: "operate_error", - Help: "number of different oprate error", + Help: "number of different operate error", }, []string{"worker", "type"}) cpuUsageGauge = prometheus.NewGauge( From b5e1e37fa0b8b5c7eaae6f5c4c511b22014b159e Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 1 Jul 2020 14:00:55 +0800 Subject: [PATCH 3/5] worker: pass make check --- dm/worker/worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/worker/worker.go b/dm/worker/worker.go index 51844357eb..a17473ca91 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -78,7 +78,7 @@ func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client, name strin subTaskHolder: newSubTaskHolder(), l: log.With(zap.String("component", "worker controller")), etcdClient: etcdClient, - name: name, + name: name, } w.ctx, w.cancel = context.WithCancel(context.Background()) w.closed.Set(closedTrue) From d1c750218630693d58469788553f7e9ea6da99ac Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 2 Jul 2020 12:09:22 +0800 Subject: [PATCH 4/5] worker: fix broken code --- dm/worker/metrics.go | 6 ++++-- dm/worker/subtask.go | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/dm/worker/metrics.go b/dm/worker/metrics.go index 6eea6a71d7..f7f48eb53e 100644 --- a/dm/worker/metrics.go +++ b/dm/worker/metrics.go @@ -137,6 +137,8 @@ func InitStatus(lis net.Listener) { } } -func (st *SubTask) removeLabelValuesWithTaskInMetrics(task string) { - taskState.DeleteAllAboutLabels(prometheus.Labels{"task": task}) +func (st *SubTask) removeLabelValuesWithTaskInMetrics(task string, source string) { + taskState.DeleteAllAboutLabels(prometheus.Labels{"task": task, "source_id": source}) } + +// opErrCounter cleans on worker close, which is the same time dm-worker exits, so no explicit clean diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 555529ae7c..1104a042ec 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -429,7 +429,7 @@ func (st *SubTask) Close() { st.cancel() st.closeUnits() // close all un-closed units - st.removeLabelValuesWithTaskInMetrics(st.cfg.Name) + st.removeLabelValuesWithTaskInMetrics(st.cfg.Name, st.cfg.SourceID) st.wg.Wait() st.setStageIfNot(pb.Stage_Finished, pb.Stage_Stopped) } From e67770f3a9418070743352c742a5082726690482 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 2 Jul 2020 19:39:34 +0800 Subject: [PATCH 5/5] worker: address comments --- dm/worker/metrics.go | 3 +-- dm/worker/worker.go | 47 +++++++++++++++----------------------------- 2 files changed, 17 insertions(+), 33 deletions(-) diff --git a/dm/worker/metrics.go b/dm/worker/metrics.go index f7f48eb53e..33d80a9a0d 100644 --- a/dm/worker/metrics.go +++ b/dm/worker/metrics.go @@ -48,6 +48,7 @@ var ( Help: "state of task, 0 - invalidStage, 1 - New, 2 - Running, 3 - Paused, 4 - Stopped, 5 - Finished", }, []string{"task", "source_id"}) + // opErrCounter cleans on worker close, which is the same time dm-worker exits, so no explicit clean opErrCounter = metricsproxy.NewCounterVec( prometheus.CounterOpts{ Namespace: "dm", @@ -140,5 +141,3 @@ func InitStatus(lis net.Listener) { func (st *SubTask) removeLabelValuesWithTaskInMetrics(task string, source string) { taskState.DeleteAllAboutLabels(prometheus.Labels{"task": task, "source_id": source}) } - -// opErrCounter cleans on worker close, which is the same time dm-worker exits, so no explicit clean diff --git a/dm/worker/worker.go b/dm/worker/worker.go index a17473ca91..bcb5ee0031 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -333,12 +333,8 @@ func (w *Worker) resetSubtaskStage(etcdCli *clientv3.Client) (int64, error) { // TODO: right operation sequences may get error when we get etcdErrCompact, need to handle it later // For example, Expect: Running -(pause)-> Paused -(resume)-> Running // we get an etcd compact error at the first running. If we try to "resume" it now, we will get an error - op, err := w.operateSubTaskStage(stage, subtaskCfg) + opType, err := w.operateSubTaskStage(stage, subtaskCfg) if err != nil { - opType := opErrTypeBeforeOp - if op != nil { - opType = op.String() - } opErrCounter.WithLabelValues(w.name, opType).Inc() log.L().Error("fail to operate subtask stage", zap.Stringer("stage", stage), zap.String("task", subtaskCfg.Name), zap.Error(err)) @@ -412,12 +408,8 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage, log.L().Info("worker is closed, handleSubTaskStage will quit now") return nil case stage := <-stageCh: - op, err := w.operateSubTaskStageWithoutConfig(stage) + opType, err := w.operateSubTaskStageWithoutConfig(stage) if err != nil { - opType := opErrTypeBeforeOp - if op != nil { - opType = op.String() - } opErrCounter.WithLabelValues(w.name, opType).Inc() log.L().Error("fail to operate subtask stage", zap.Stringer("stage", stage), zap.Error(err)) if etcdutil.IsRetryableError(err) { @@ -434,15 +426,16 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage, } } -// operateSubTaskStage returns TaskOp additionally to record metrics -func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskConfig) (*pb.TaskOp, error) { +// operateSubTaskStage returns TaskOp.String() additionally to record metrics +func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskConfig) (string, error) { var op pb.TaskOp switch { case stage.Expect == pb.Stage_Running: if st := w.subTaskHolder.findSubTask(stage.Task); st == nil { w.StartSubTask(&subTaskCfg) log.L().Info("load subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) - return nil, nil + // error is nil, opErrTypeBeforeOp will be ignored + return opErrTypeBeforeOp, nil } op = pb.TaskOp_Resume case stage.Expect == pb.Stage_Paused: @@ -450,22 +443,22 @@ func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskCo case stage.IsDeleted: op = pb.TaskOp_Stop } - return &op, w.OperateSubTask(stage.Task, op) + return op.String(), w.OperateSubTask(stage.Task, op) } // operateSubTaskStageWithoutConfig returns TaskOp additionally to record metrics -func (w *Worker) operateSubTaskStageWithoutConfig(stage ha.Stage) (*pb.TaskOp, error) { +func (w *Worker) operateSubTaskStageWithoutConfig(stage ha.Stage) (string, error) { var subTaskCfg config.SubTaskConfig if stage.Expect == pb.Stage_Running { if st := w.subTaskHolder.findSubTask(stage.Task); st == nil { tsm, _, err := ha.GetSubTaskCfg(w.etcdClient, stage.Source, stage.Task, stage.Revision) if err != nil { // TODO: need retry - return nil, terror.Annotate(err, "fail to get subtask config from etcd") + return opErrTypeBeforeOp, terror.Annotate(err, "fail to get subtask config from etcd") } var ok bool if subTaskCfg, ok = tsm[stage.Task]; !ok { - return nil, terror.ErrWorkerFailToGetSubtaskConfigFromEtcd.Generate(stage.Task) + return opErrTypeBeforeOp, terror.ErrWorkerFailToGetSubtaskConfigFromEtcd.Generate(stage.Task) } } } @@ -509,12 +502,8 @@ func (w *Worker) observeRelayStage(ctx context.Context, etcdCli *clientv3.Client if stage.IsEmpty() { stage.IsDeleted = true } - op, err1 := w.operateRelayStage(ctx, stage) + opType, err1 := w.operateRelayStage(ctx, stage) if err1 != nil { - opType := opErrTypeBeforeOp - if op != nil { - opType = op.String() - } opErrCounter.WithLabelValues(w.name, opType).Inc() log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err1)) } @@ -539,12 +528,8 @@ func (w *Worker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, er log.L().Info("worker is closed, handleRelayStage will quit now") return nil case stage := <-stageCh: - op, err := w.operateRelayStage(ctx, stage) + opType, err := w.operateRelayStage(ctx, stage) if err != nil { - opType := opErrTypeBeforeOp - if op != nil { - opType = op.String() - } opErrCounter.WithLabelValues(w.name, opType).Inc() log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err)) } @@ -557,16 +542,16 @@ func (w *Worker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, er } } -// operateRelayStage returns RelayOp additionally to record metrics +// operateRelayStage returns RelayOp.String() additionally to record metrics // *RelayOp is nil only when error is nil, so record on error will not meet nil-pointer deference -func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) (*pb.RelayOp, error) { +func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) (string, error) { var op pb.RelayOp switch { case stage.Expect == pb.Stage_Running: if w.relayHolder.Stage() == pb.Stage_New { w.relayHolder.Start() w.relayPurger.Start() - return nil, nil + return opErrTypeBeforeOp, nil } op = pb.RelayOp_ResumeRelay case stage.Expect == pb.Stage_Paused: @@ -574,7 +559,7 @@ func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) (*pb.Rel case stage.IsDeleted: op = pb.RelayOp_StopRelay } - return &op, w.OperateRelay(ctx, &pb.OperateRelayRequest{Op: op}) + return op.String(), w.OperateRelay(ctx, &pb.OperateRelayRequest{Op: op}) } // HandleSQLs implements Handler.HandleSQLs.