Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: add metrics for sharding merge #96

Merged
merged 4 commits into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions dm/dm-ansible/scripts/dm.json
Original file line number Diff line number Diff line change
Expand Up @@ -2529,6 +2529,162 @@
"show": true
}
]
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "number of unsynced tables in the subtask",
"fill": 1,
"id": 45,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"span": 6,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "dm_syncer_unsynced_table_number{instance=\"$instance\", task=\"$task\"}",
"format": "time_series",
"instant": false,
"intervalFactor": 2,
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
"title": "unsynced tables",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"transparent": true,
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"decimals": 0,
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
]
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "waiting shard DDL lock to be resolved, >0 means waiting",
"fill": 1,
"id": 46,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"span": 6,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "dm_syncer_shard_lock_resolving{instance=\"$instance\", task=\"$task\"}",
"format": "time_series",
"instant": false,
"intervalFactor": 2,
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeShift": null,
"title": "shard lock resolving",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"transparent": true,
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"decimals": 0,
"format": "short",
"label": "",
"logBase": 1,
"max": "1",
"min": "0",
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": false
}
]
}
],
"repeat": null,
Expand Down
18 changes: 18 additions & 0 deletions syncer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,22 @@ var (
Name: "remaining_time",
Help: "the remaining time in second to catch up master",
}, []string{"task"})

unsyncedTableGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "syncer",
Name: "unsynced_table_number",
Help: "number of unsynced tables in the subtask",
}, []string{"task", "table"})

shardLockResolving = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "syncer",
Name: "shard_lock_resolving",
Help: "waiting shard DDL lock to be resolved",
}, []string{"task"})
)

// RegisterMetrics registers metrics
Expand All @@ -142,6 +158,8 @@ func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(syncerExitWithErrorCounter)
registry.MustRegister(replicationLagGauge)
registry.MustRegister(remainingTimeGauge)
registry.MustRegister(unsyncedTableGauge)
registry.MustRegister(shardLockResolving)
}

func (s *Syncer) runBackgroundJob(ctx context.Context) {
Expand Down
4 changes: 4 additions & 0 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1578,6 +1578,8 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}

if needShardingHandle {
target, _ := GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name)
unsyncedTableGauge.WithLabelValues(s.cfg.Name, target).Set(float64(remain))
log.Infof("[syncer] query event %v for source %v is in sharding, synced: %v, remain: %d", startPos, source, synced, remain)
err = safeMode.IncrForTable(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try enable safe-mode when starting syncing for sharding group
if err != nil {
Expand Down Expand Up @@ -1628,8 +1630,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
s.ddlInfoCh <- ddlInfo1 // save DDLInfo, and dm-worker will fetch it

// block and wait DDL lock to be synced
shardLockResolving.WithLabelValues(s.cfg.Name).Set(1)
var ok bool
ddlExecItem, ok = <-s.ddlExecInfo.Chan(needHandleDDLs)
shardLockResolving.WithLabelValues(s.cfg.Name).Set(0)
if !ok {
// chan closed
log.Info("[syncer] cancel to add DDL to job because of canceled from external")
Expand Down