Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker: add metrics #772

Merged
merged 6 commits into from
Jul 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions dm/worker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,28 @@ import (
"github.com/pingcap/dm/syncer"
)

const (
opErrTypeBeforeOp = "BeforeAnyOp"
opErrTypeSourceBound = "SourceBound"
)

var (
taskState = metricsproxy.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "worker",
Name: "task_state",
Help: "state of task, 0 - invalidStage, 1 - New, 2 - Running, 3 - Paused, 4 - Stopped, 5 - Finished",
}, []string{"task"})
}, []string{"task", "source_id"})

// opErrCounter cleans on worker close, which is the same time dm-worker exits, so no explicit clean
opErrCounter = metricsproxy.NewCounterVec(
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "worker",
Name: "operate_error",
Help: "number of different operate error",
}, []string{"worker", "type"})

cpuUsageGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -93,6 +107,7 @@ func RegistryMetrics() {
registry.MustRegister(prometheus.NewGoCollector())

registry.MustRegister(taskState)
registry.MustRegister(opErrCounter)
registry.MustRegister(cpuUsageGauge)

relay.RegisterMetrics(registry)
Expand Down Expand Up @@ -123,6 +138,6 @@ func InitStatus(lis net.Listener) {
}
}

func (st *SubTask) removeLabelValuesWithTaskInMetrics(task string) {
taskState.DeleteAllAboutLabels(prometheus.Labels{"task": task})
func (st *SubTask) removeLabelValuesWithTaskInMetrics(task string, source string) {
taskState.DeleteAllAboutLabels(prometheus.Labels{"task": task, "source_id": source})
}
4 changes: 2 additions & 2 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (s *Server) handleSourceBound(ctx context.Context, boundCh chan ha.SourceBo
s.setSourceStatus(bound.Source, err, true)
if err != nil {
// record the reason for operating source bound
// TODO: add better metrics
opErrCounter.WithLabelValues(s.cfg.Name, opErrTypeSourceBound).Inc()
log.L().Error("fail to operate sourceBound on worker", zap.String("worker", s.cfg.Name),
zap.Stringer("bound", bound), zap.Error(err))
if etcdutil.IsRetryableError(err) {
Expand Down Expand Up @@ -743,7 +743,7 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {

log.L().Info("start worker", zap.String("sourceCfg", cfg.String()), zap.Reflect("subTasks", subTaskCfgs))

w, err := NewWorker(cfg, s.etcdClient)
w, err := NewWorker(cfg, s.etcdClient, s.cfg.Name)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (t *testServer) TestQueryError(c *C) {

sourceCfg := loadSourceConfigWithoutPassword(c)
sourceCfg.EnableRelay = false
w, err := NewWorker(&sourceCfg, nil)
w, err := NewWorker(&sourceCfg, nil, "")
c.Assert(err, IsNil)
w.closed.Set(closedFalse)

Expand Down
10 changes: 5 additions & 5 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient *
cancel: cancel,
etcdClient: etcdClient,
}
taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage))
taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage))
return &st
}

Expand Down Expand Up @@ -371,7 +371,7 @@ func (st *SubTask) setStage(stage pb.Stage) {
st.Lock()
defer st.Unlock()
st.stage = stage
taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage))
taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage))
}

// stageCAS sets stage to newStage if its current value is oldStage
Expand All @@ -381,7 +381,7 @@ func (st *SubTask) stageCAS(oldStage, newStage pb.Stage) bool {

if st.stage == oldStage {
st.stage = newStage
taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage))
taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage))
return true
}
return false
Expand All @@ -393,7 +393,7 @@ func (st *SubTask) setStageIfNot(oldStage, newStage pb.Stage) bool {
defer st.Unlock()
if st.stage != oldStage {
st.stage = newStage
taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage))
taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage))
return true
}
return false
Expand Down Expand Up @@ -429,7 +429,7 @@ func (st *SubTask) Close() {

st.cancel()
st.closeUnits() // close all un-closed units
st.removeLabelValuesWithTaskInMetrics(st.cfg.Name)
st.removeLabelValuesWithTaskInMetrics(st.cfg.Name, st.cfg.SourceID)
st.wg.Wait()
st.setStageIfNot(pb.Stage_Finished, pb.Stage_Stopped)
}
Expand Down
4 changes: 2 additions & 2 deletions dm/worker/task_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) {
cfg := loadSourceConfigWithoutPassword(c)
cfg.RelayDir = dir
cfg.MetaDir = dir
w, err := NewWorker(&cfg, nil)
w, err := NewWorker(&cfg, nil, "")
c.Assert(err, check.IsNil)
w.closed.Set(closedFalse)

Expand Down Expand Up @@ -207,7 +207,7 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) {
cfg := loadSourceConfigWithoutPassword(c)
cfg.RelayDir = dir
cfg.MetaDir = dir
w, err := NewWorker(&cfg, nil)
w, err := NewWorker(&cfg, nil, "")
c.Assert(err, check.IsNil)
w.closed.Set(closedFalse)

Expand Down
47 changes: 28 additions & 19 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,19 @@ type Worker struct {
configFile string

etcdClient *clientv3.Client

name string
}

// NewWorker creates a new Worker
func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client) (w *Worker, err error) {
func NewWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client, name string) (w *Worker, err error) {
w = &Worker{
cfg: cfg,
tracer: tracing.InitTracerHub(cfg.Tracer),
subTaskHolder: newSubTaskHolder(),
l: log.With(zap.String("component", "worker controller")),
etcdClient: etcdClient,
name: name,
}
w.ctx, w.cancel = context.WithCancel(context.Background())
w.closed.Set(closedTrue)
Expand Down Expand Up @@ -330,11 +333,12 @@ func (w *Worker) resetSubtaskStage(etcdCli *clientv3.Client) (int64, error) {
// TODO: right operation sequences may get error when we get etcdErrCompact, need to handle it later
// For example, Expect: Running -(pause)-> Paused -(resume)-> Running
// we get an etcd compact error at the first running. If we try to "resume" it now, we will get an error
err = w.operateSubTaskStage(stage, subtaskCfg)
opType, err := w.operateSubTaskStage(stage, subtaskCfg)
if err != nil {
// TODO: add better metrics
opErrCounter.WithLabelValues(w.name, opType).Inc()
log.L().Error("fail to operate subtask stage", zap.Stringer("stage", stage),
zap.String("task", subtaskCfg.Name), zap.Error(err))

}
delete(sts, name)
}
Expand All @@ -343,7 +347,7 @@ func (w *Worker) resetSubtaskStage(etcdCli *clientv3.Client) (int64, error) {
for name := range sts {
err = w.OperateSubTask(name, pb.TaskOp_Stop)
if err != nil {
// TODO: add better metrics
opErrCounter.WithLabelValues(w.name, pb.TaskOp_Stop.String()).Inc()
log.L().Error("fail to stop subtask", zap.String("task", name), zap.Error(err))
}
}
Expand Down Expand Up @@ -404,9 +408,9 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage,
log.L().Info("worker is closed, handleSubTaskStage will quit now")
return nil
case stage := <-stageCh:
err := w.operateSubTaskStageWithoutConfig(stage)
opType, err := w.operateSubTaskStageWithoutConfig(stage)
if err != nil {
// TODO: add better metrics
opErrCounter.WithLabelValues(w.name, opType).Inc()
log.L().Error("fail to operate subtask stage", zap.Stringer("stage", stage), zap.Error(err))
if etcdutil.IsRetryableError(err) {
return err
Expand All @@ -422,36 +426,39 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage,
}
}

func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskConfig) error {
// operateSubTaskStage returns TaskOp.String() additionally to record metrics
func (w *Worker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskConfig) (string, error) {
var op pb.TaskOp
switch {
case stage.Expect == pb.Stage_Running:
if st := w.subTaskHolder.findSubTask(stage.Task); st == nil {
w.StartSubTask(&subTaskCfg)
log.L().Info("load subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name))
return nil
// error is nil, opErrTypeBeforeOp will be ignored
return opErrTypeBeforeOp, nil
}
op = pb.TaskOp_Resume
case stage.Expect == pb.Stage_Paused:
op = pb.TaskOp_Pause
case stage.IsDeleted:
op = pb.TaskOp_Stop
}
return w.OperateSubTask(stage.Task, op)
return op.String(), w.OperateSubTask(stage.Task, op)
}

func (w *Worker) operateSubTaskStageWithoutConfig(stage ha.Stage) error {
// operateSubTaskStageWithoutConfig returns TaskOp additionally to record metrics
func (w *Worker) operateSubTaskStageWithoutConfig(stage ha.Stage) (string, error) {
var subTaskCfg config.SubTaskConfig
if stage.Expect == pb.Stage_Running {
if st := w.subTaskHolder.findSubTask(stage.Task); st == nil {
tsm, _, err := ha.GetSubTaskCfg(w.etcdClient, stage.Source, stage.Task, stage.Revision)
if err != nil {
// TODO: need retry
return terror.Annotate(err, "fail to get subtask config from etcd")
return opErrTypeBeforeOp, terror.Annotate(err, "fail to get subtask config from etcd")
}
var ok bool
if subTaskCfg, ok = tsm[stage.Task]; !ok {
return terror.ErrWorkerFailToGetSubtaskConfigFromEtcd.Generate(stage.Task)
return opErrTypeBeforeOp, terror.ErrWorkerFailToGetSubtaskConfigFromEtcd.Generate(stage.Task)
}
}
}
Expand Down Expand Up @@ -495,9 +502,9 @@ func (w *Worker) observeRelayStage(ctx context.Context, etcdCli *clientv3.Client
if stage.IsEmpty() {
stage.IsDeleted = true
}
err1 = w.operateRelayStage(ctx, stage)
opType, err1 := w.operateRelayStage(ctx, stage)
if err1 != nil {
// TODO: add better metrics
opErrCounter.WithLabelValues(w.name, opType).Inc()
log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err1))
}
}
Expand All @@ -521,9 +528,9 @@ func (w *Worker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, er
log.L().Info("worker is closed, handleRelayStage will quit now")
return nil
case stage := <-stageCh:
err := w.operateRelayStage(ctx, stage)
opType, err := w.operateRelayStage(ctx, stage)
if err != nil {
// TODO: add better metrics
opErrCounter.WithLabelValues(w.name, opType).Inc()
log.L().Error("fail to operate relay", zap.Stringer("stage", stage), zap.Error(err))
}
case err := <-errCh:
Expand All @@ -535,22 +542,24 @@ func (w *Worker) handleRelayStage(ctx context.Context, stageCh chan ha.Stage, er
}
}

func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) error {
// operateRelayStage returns RelayOp.String() additionally to record metrics
// *RelayOp is nil only when error is nil, so record on error will not meet nil-pointer deference
func (w *Worker) operateRelayStage(ctx context.Context, stage ha.Stage) (string, error) {
var op pb.RelayOp
switch {
case stage.Expect == pb.Stage_Running:
if w.relayHolder.Stage() == pb.Stage_New {
w.relayHolder.Start()
w.relayPurger.Start()
return nil
return opErrTypeBeforeOp, nil
}
op = pb.RelayOp_ResumeRelay
case stage.Expect == pb.Stage_Paused:
op = pb.RelayOp_PauseRelay
case stage.IsDeleted:
op = pb.RelayOp_StopRelay
}
return w.OperateRelay(ctx, &pb.OperateRelayRequest{Op: op})
return op.String(), w.OperateRelay(ctx, &pb.OperateRelayRequest{Op: op})
}

// HandleSQLs implements Handler.HandleSQLs.
Expand Down
8 changes: 4 additions & 4 deletions dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func (t *testServer) testWorker(c *C) {
NewRelayHolder = NewRealRelayHolder
}()

_, err := NewWorker(&cfg, nil)
_, err := NewWorker(&cfg, nil, "")
c.Assert(err, ErrorMatches, "init error")

NewRelayHolder = NewDummyRelayHolder
w, err := NewWorker(&cfg, nil)
w, err := NewWorker(&cfg, nil, "")
c.Assert(err, IsNil)
c.Assert(w.StatusJSON(""), HasLen, emptyWorkerStatusInfoJSONLength)
//c.Assert(w.closed.Get(), Equals, closedFalse)
Expand Down Expand Up @@ -226,7 +226,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) {
sourceCfg.EnableRelay = false

// step 1: start worker
w, err := NewWorker(&sourceCfg, etcdCli)
w, err := NewWorker(&sourceCfg, etcdCli, "")
c.Assert(err, IsNil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -339,7 +339,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) {
sourceCfg.MetaDir = c.MkDir()

// step 1: start worker
w, err := NewWorker(&sourceCfg, etcdCli)
w, err := NewWorker(&sourceCfg, etcdCli, "")
c.Assert(err, IsNil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down