Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: add metrics for optimistic shard DDL lock #831

Merged
merged 2 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions dm/master/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,18 @@ func GetMetricsHandler() http.Handler {
return promhttp.Handler()
}

// ReportWorkerStageToMetrics is a setter for workerState, this name is easy to understand to caller
func ReportWorkerStageToMetrics(name string, state float64) {
// ReportWorkerStage is a setter for workerState
func ReportWorkerStage(name string, state float64) {
workerState.WithLabelValues(name).Set(state)
}

// RemoveWorkerStateInMetrics cleans state of deleted worker
func RemoveWorkerStateInMetrics(name string) {
// RemoveWorkerState cleans state of deleted worker
func RemoveWorkerState(name string) {
workerState.DeleteAllAboutLabels(prometheus.Labels{"worker": name})
}

// ReportDDLPendingToMetrics inc/dec by 1 to ddlPendingCounter
func ReportDDLPendingToMetrics(task, old, new string) {
// ReportDDLPending inc/dec by 1 to ddlPendingCounter
func ReportDDLPending(task, old, new string) {
if old != DDLPendingNone {
ddlPendingCounter.WithLabelValues(task, old).Dec()
}
Expand All @@ -136,8 +136,8 @@ func ReportDDLPendingToMetrics(task, old, new string) {
}
}

// ReportDDLErrorToMetrics is a setter for ddlErrCounter
func ReportDDLErrorToMetrics(task, errType string) {
// ReportDDLError is a setter for ddlErrCounter
func ReportDDLError(task, errType string) {
ddlErrCounter.WithLabelValues(task, errType).Inc()
}

Expand Down
2 changes: 1 addition & 1 deletion dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1389,7 +1389,7 @@ func (s *Scheduler) deleteWorker(name string) {
}
w.Close()
delete(s.workers, name)
metrics.RemoveWorkerStateInMetrics(w.baseInfo.Name)
metrics.RemoveWorkerState(w.baseInfo.Name)
}

// updateStatusForBound updates the in-memory status for bound, including:
Expand Down
2 changes: 1 addition & 1 deletion dm/master/scheduler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,5 @@ func (w *Worker) reportMetrics() {
if n, ok := workerStage2Num[w.stage]; ok {
s = n
}
metrics.ReportWorkerStageToMetrics(w.baseInfo.Name, s)
metrics.ReportWorkerStage(w.baseInfo.Name, s)
}
5 changes: 3 additions & 2 deletions dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (o *Optimist) handleInfo(ctx context.Context, infoCh <-chan optimism.Info)
err := o.handleLock(info, tts, false)
if err != nil {
o.logger.Error("fail to handle the shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
metrics.ReportDDLErrorToMetrics(info.Task, metrics.InfoErrHandleLock)
metrics.ReportDDLError(info.Task, metrics.InfoErrHandleLock)
continue
}
}
Expand Down Expand Up @@ -467,7 +467,7 @@ func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism.
err := o.removeLock(lock)
if err != nil {
o.logger.Error("fail to delete the shard DDL infos and lock operations", zap.String("lock", lock.ID), log.ShortError(err))
metrics.ReportDDLErrorToMetrics(op.Task, metrics.OpErrRemoveLock)
metrics.ReportDDLError(op.Task, metrics.OpErrRemoveLock)
}
o.logger.Info("the shard DDL infos and lock operations have been cleared", zap.Stringer("operation", op))
}
Expand Down Expand Up @@ -533,6 +533,7 @@ func (o *Optimist) removeLock(lock *optimism.Lock) error {
return err
}
o.lk.RemoveLock(lock.ID)
metrics.ReportDDLPending(lock.Task, metrics.DDLPendingSynced, metrics.DDLPendingNone)
return nil
}

Expand Down
12 changes: 6 additions & 6 deletions dm/master/shardddl/pessimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I
// then the DM-worker should not put any new DDL info until the old operation has been deleted.
p.logger.Error("fail to try sync shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
// currently, only DDL mismatch will cause error
metrics.ReportDDLErrorToMetrics(info.Task, metrics.InfoErrSyncLock)
metrics.ReportDDLError(info.Task, metrics.InfoErrSyncLock)
continue
} else if !synced {
p.logger.Info("the shard DDL lock has not synced", zap.String("lock", lockID), zap.Int("remain", remain))
Expand All @@ -479,7 +479,7 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I
err = p.handleLock(lockID, info.Source)
if err != nil {
p.logger.Error("fail to handle the shard DDL lock", zap.String("lock", lockID), log.ShortError(err))
metrics.ReportDDLErrorToMetrics(info.Task, metrics.InfoErrHandleLock)
metrics.ReportDDLError(info.Task, metrics.InfoErrHandleLock)
continue
}
}
Expand Down Expand Up @@ -509,7 +509,7 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis
} else if synced, _ := lock.IsSynced(); !synced {
// this should not happen in normal case.
p.logger.Warn("the lock for the shard DDL lock operation has not synced", zap.Stringer("operation", op))
metrics.ReportDDLErrorToMetrics(op.Task, metrics.OpErrLockUnSynced)
metrics.ReportDDLError(op.Task, metrics.OpErrLockUnSynced)
continue
}

Expand All @@ -521,7 +521,7 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis
err := p.removeLock(lock)
if err != nil {
p.logger.Error("fail to delete the shard DDL lock operations", zap.String("lock", lock.ID), log.ShortError(err))
metrics.ReportDDLErrorToMetrics(op.Task, metrics.OpErrRemoveLock)
metrics.ReportDDLError(op.Task, metrics.OpErrRemoveLock)
}
p.logger.Info("the lock info for the shard DDL lock operation has been cleared", zap.Stringer("operation", op))
continue
Expand All @@ -539,7 +539,7 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis
err := p.putOpsForNonOwner(lock, "", false)
if err != nil {
p.logger.Error("fail to put skip shard DDL lock operations for non-owner", zap.String("lock", lock.ID), log.ShortError(err))
metrics.ReportDDLErrorToMetrics(op.Task, metrics.OpErrPutNonOwnerOp)
metrics.ReportDDLError(op.Task, metrics.OpErrPutNonOwnerOp)
}
}
}
Expand Down Expand Up @@ -629,7 +629,7 @@ func (p *Pessimist) removeLock(lock *pessimism.Lock) error {
return err
}
p.lk.RemoveLock(lock.ID)
metrics.ReportDDLPendingToMetrics(lock.Task, metrics.DDLPendingSynced, metrics.DDLPendingNone)
metrics.ReportDDLPending(lock.Task, metrics.DDLPendingSynced, metrics.DDLPendingNone)
return nil
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/shardddl/optimism/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb-tools/pkg/schemacmp"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/master/metrics"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
)
Expand Down Expand Up @@ -66,6 +67,8 @@ func NewLock(ID, task, downSchema, downTable string, ti *model.TableInfo, tts []
synced: true,
}
l.addTables(tts)
metrics.ReportDDLPending(task, metrics.DDLPendingNone, metrics.DDLPendingSynced)

return l
}

Expand Down Expand Up @@ -118,9 +121,17 @@ func (l *Lock) TrySync(callerSource, callerSchema, callerTable string,
log.L().Info("update table info", zap.String("lock", l.ID), zap.String("source", callerSource), zap.String("schema", callerSchema), zap.String("table", callerTable),
zap.Stringer("from", oldTable), zap.Stringer("to", newTable), zap.Strings("ddls", ddls))

oldSynced := l.synced
defer func() {
_, remain := l.syncStatus()
l.synced = remain == 0
if oldSynced != l.synced {
if oldSynced {
metrics.ReportDDLPending(l.Task, metrics.DDLPendingSynced, metrics.DDLPendingUnSynced)
} else {
metrics.ReportDDLPending(l.Task, metrics.DDLPendingUnSynced, metrics.DDLPendingSynced)
}
}
}()

// special case: if the DDL does not affect the schema at all, assume it is
Expand Down
8 changes: 4 additions & 4 deletions pkg/shardddl/pessimism/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewLock(ID, task, owner string, DDLs, sources []string) *Lock {
l.ready[s] = false
l.done[s] = false
}
metrics.ReportDDLPendingToMetrics(task, metrics.DDLPendingNone, metrics.DDLPendingUnSynced)
metrics.ReportDDLPending(task, metrics.DDLPendingNone, metrics.DDLPendingUnSynced)

return l
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func (l *Lock) TrySync(caller string, DDLs, sources []string) (bool, int, error)
l.remain--
l.ready[caller] = true
if l.remain == 0 {
metrics.ReportDDLPendingToMetrics(l.Task, metrics.DDLPendingUnSynced, metrics.DDLPendingSynced)
metrics.ReportDDLPending(l.Task, metrics.DDLPendingUnSynced, metrics.DDLPendingSynced)
}
}

Expand All @@ -100,7 +100,7 @@ func (l *Lock) ForceSynced() {
defer l.mu.Unlock()

if l.remain > 0 {
metrics.ReportDDLPendingToMetrics(l.Task, metrics.DDLPendingUnSynced, metrics.DDLPendingSynced)
metrics.ReportDDLPending(l.Task, metrics.DDLPendingUnSynced, metrics.DDLPendingSynced)
}
for source := range l.ready {
l.ready[source] = true
Expand All @@ -114,7 +114,7 @@ func (l *Lock) RevertSynced(sources []string) {
defer l.mu.Unlock()

if l.remain == 0 && len(sources) > 0 {
metrics.ReportDDLPendingToMetrics(l.Task, metrics.DDLPendingSynced, metrics.DDLPendingUnSynced)
metrics.ReportDDLPending(l.Task, metrics.DDLPendingSynced, metrics.DDLPendingUnSynced)
}
for _, source := range sources {
if synced, ok := l.ready[source]; ok && synced {
Expand Down