diff --git a/dm/dm-ansible/scripts/dm.json b/dm/dm-ansible/scripts/dm.json index 15b94ceeba..5d4d9730e0 100644 --- a/dm/dm-ansible/scripts/dm.json +++ b/dm/dm-ansible/scripts/dm.json @@ -2529,6 +2529,162 @@ "show": true } ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "number of unsynced tables in the subtask", + "fill": 1, + "id": 45, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 6, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "dm_syncer_unsynced_table_number{instance=\"$instance\", task=\"$task\"}", + "format": "time_series", + "instant": false, + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "unsynced tables", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "waiting shard DDL lock to be resolved, >0 means waiting", + "fill": 1, + "id": 46, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 6, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "dm_syncer_shard_lock_resolving{instance=\"$instance\", task=\"$task\"}", + "format": "time_series", + "instant": false, + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "shard lock resolving", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": "", + "logBase": 1, + "max": "1", + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ] } ], "repeat": null, diff --git a/syncer/metrics.go b/syncer/metrics.go index 42177d1d35..ef1c65d12d 100644 --- a/syncer/metrics.go +++ b/syncer/metrics.go @@ -126,6 +126,22 @@ var ( Name: "remaining_time", Help: "the remaining time in second to catch up master", }, []string{"task"}) + + unsyncedTableGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "dm", + Subsystem: "syncer", + Name: "unsynced_table_number", + Help: "number of unsynced tables in the subtask", + }, []string{"task", "table"}) + + shardLockResolving = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "dm", + Subsystem: "syncer", + Name: "shard_lock_resolving", + Help: "waiting shard DDL lock to be resolved", + }, []string{"task"}) ) // RegisterMetrics registers metrics @@ -142,6 +158,8 @@ func RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(syncerExitWithErrorCounter) registry.MustRegister(replicationLagGauge) registry.MustRegister(remainingTimeGauge) + registry.MustRegister(unsyncedTableGauge) + registry.MustRegister(shardLockResolving) } func (s *Syncer) runBackgroundJob(ctx context.Context) { diff --git a/syncer/syncer.go b/syncer/syncer.go index 23a65fa6a9..2b9cf6ddbe 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1578,6 +1578,8 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } if needShardingHandle { + target, _ := GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) + unsyncedTableGauge.WithLabelValues(s.cfg.Name, target).Set(float64(remain)) log.Infof("[syncer] query event %v for source %v is in sharding, synced: %v, remain: %d", startPos, source, synced, remain) err = safeMode.IncrForTable(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try enable safe-mode when starting syncing for sharding group if err != nil { @@ -1628,8 +1630,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.ddlInfoCh <- ddlInfo1 // save DDLInfo, and dm-worker will fetch it // block and wait DDL lock to be synced + shardLockResolving.WithLabelValues(s.cfg.Name).Set(1) var ok bool ddlExecItem, ok = <-s.ddlExecInfo.Chan(needHandleDDLs) + shardLockResolving.WithLabelValues(s.cfg.Name).Set(0) if !ok { // chan closed log.Info("[syncer] cancel to add DDL to job because of canceled from external")