Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: add panel for start leader and heartbeat update error #853

Merged
merged 9 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions dm/dm-ansible/scripts/dm.json
Original file line number Diff line number Diff line change
Expand Up @@ -5343,6 +5343,94 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "Is waiting shard DDL lock to be resolved, >0 means waiting",
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
"fill": 1,
"gridPos": {
"h": 7,
"w": 6,
"x": 0,
"y": 47
},
"id": 90,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "dm_syncer_heartbeat_update_error",
"format": "time_series",
"instant": false,
"intervalFactor": 2,
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "heartbeat update error",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"transparent": true,
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"decimals": 0,
"format": "short",
"label": "",
"logBase": 1,
"max": "1",
"min": "0",
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": false
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"repeat": null,
Expand Down Expand Up @@ -5798,6 +5886,94 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "number of dm-masters try to start leader components per minute",
"fill": 1,
"gridPos": {
"h": 7,
"w": 6,
"x": 6,
"y": 55
},
"id": 86,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "dm_master_start_leader_counter",
"format": "time_series",
"instant": false,
"intervalFactor": 2,
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "number of dm-masters start leader components per minute",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"transparent": true,
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"decimals": 0,
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": false
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"repeat": null,
Expand Down
2 changes: 2 additions & 0 deletions dm/master/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (s *Server) isLeaderAndNeedForward() (isLeader bool, needForward bool) {
}

func (s *Server) startLeaderComponent(ctx context.Context) bool {
metrics.ReportStartLeader()

// try to upgrade the cluster version if a member become the leader.
// so if the old leader failed when upgrading, the new leader can try again.
// NOTE: if the cluster has been upgraded, calling this method again should have no side effects.
Expand Down
14 changes: 14 additions & 0 deletions dm/master/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ var (
Name: "worker_event_error",
Help: "number of error related to worker event, during handling or watching",
}, []string{"type"})

startLeaderCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "master",
Name: "start_leader_counter",
Help: "number of this dm-master try to start leader components",
})
)

func collectMetrics() {
Expand Down Expand Up @@ -122,6 +130,7 @@ func RegistryMetrics() {
registry.MustRegister(ddlPendingCounter)
registry.MustRegister(ddlErrCounter)
registry.MustRegister(workerEventErrCounter)
registry.MustRegister(startLeaderCounter)

prometheus.DefaultGatherer = registry
}
Expand Down Expand Up @@ -161,6 +170,11 @@ func ReportWorkerEventErr(errType string) {
workerEventErrCounter.WithLabelValues(errType).Inc()
}

// ReportStartLeader increases startLeaderCounter by one
func ReportStartLeader() {
startLeaderCounter.Inc()
}

// OnRetireLeader cleans some metrics when retires
func OnRetireLeader() {
workerState.Reset()
Expand Down
18 changes: 9 additions & 9 deletions pkg/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio

var (
oldLeaderID string
compaignWg sync.WaitGroup
campaignWg sync.WaitGroup
)
for {
// check context canceled/timeout
Expand All @@ -249,20 +249,20 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio
e.campaignMu.Lock()
e.cancelCampaign = func() {
cancel2()
compaignWg.Wait()
campaignWg.Wait()
}
e.campaignMu.Unlock()

compaignWg.Add(1)
campaignWg.Add(1)
go func() {
defer compaignWg.Done()
defer campaignWg.Done()

if e.evictLeader.Get() == 1 {
// skip campaign
return
}

e.l.Debug("begin to compaign", zap.Stringer("current member", e.info))
e.l.Debug("begin to campaign", zap.Stringer("current member", e.info))

err2 := elec.Campaign(ctx2, e.infoStr)
if err2 != nil {
Expand Down Expand Up @@ -309,15 +309,15 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio

if leaderInfo == nil || len(leaderInfo.ID) == 0 {
cancel2()
compaignWg.Wait()
campaignWg.Wait()
continue
}

if leaderInfo.ID != e.info.ID {
e.l.Info("current member is not the leader", zap.Stringer("current member", e.info), zap.Stringer("leader", leaderInfo))
e.notifyLeader(ctx, leaderInfo)
cancel2()
compaignWg.Wait()
campaignWg.Wait()
continue
}

Expand All @@ -329,7 +329,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio
oldLeaderID = ""

cancel2()
compaignWg.Wait()
campaignWg.Wait()
}
}

Expand Down Expand Up @@ -417,7 +417,7 @@ func (e *Election) EvictLeader() {

// Resign resign the leader
func (e *Election) Resign() {
// cancel campagin or current member is leader and then resign
// cancel campaign or current member is leader and then resign
e.campaignMu.Lock()
if e.cancelCampaign != nil {
e.cancelCampaign()
Expand Down
2 changes: 2 additions & 0 deletions syncer/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql"
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -254,6 +255,7 @@ func (h *Heartbeat) run(ctx context.Context) {
case <-updateTicker.C:
err := h.updateTS()
if err != nil {
heartbeatUpdateErr.WithLabelValues(strconv.Itoa(int(h.cfg.serverID))).Inc()
h.logger.Error("update heartbeat ts", zap.Error(err))
}

Expand Down
9 changes: 9 additions & 0 deletions syncer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ var (
Name: "shard_lock_resolving",
Help: "waiting shard DDL lock to be resolved",
}, []string{"task", "source_id"})

heartbeatUpdateErr = metricsproxy.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "syncer",
Name: "heartbeat_update_error",
Help: "number of error when update heartbeat timestamp",
}, []string{"server_id"})
)

// RegisterMetrics registers metrics
Expand All @@ -233,6 +241,7 @@ func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(remainingTimeGauge)
registry.MustRegister(unsyncedTableGauge)
registry.MustRegister(shardLockResolving)
registry.MustRegister(heartbeatUpdateErr)
}

// InitStatusAndMetrics register prometheus metrics and listen for status port.
Expand Down