From 0ef0cc083ba8c076c9538df5de4e85eed31f1e07 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 15 Jul 2020 17:02:55 +0800 Subject: [PATCH 1/4] ansible: add metric panels --- dm/dm-ansible/scripts/dm.json | 716 +++++++++++++++++++++++++++++++++- 1 file changed, 713 insertions(+), 3 deletions(-) diff --git a/dm/dm-ansible/scripts/dm.json b/dm/dm-ansible/scripts/dm.json index e348d47c06..a248999ed0 100644 --- a/dm/dm-ansible/scripts/dm.json +++ b/dm/dm-ansible/scripts/dm.json @@ -690,6 +690,716 @@ "x": 0, "y": 15 }, + "id": 67, + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of error happens before operate", + "fill": 1, + "gridPos": { + "h": 7, + "w": 6, + "x": 0, + "y": 16 + }, + "id": 71, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "dm_worker_operate_error{type=\"BeforeAnyOp\"}", + "format": "time_series", + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "before any operate error", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 1, + "max": "5", + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of error happens in operate source bound", + "fill": 1, + "gridPos": { + "h": 7, + "w": 6, + "x": 6, + "y": 16 + }, + "id": 72, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "dm_worker_operate_error{type=\"SourceBound\"}", + "format": "time_series", + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "source bound error", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 1, + "max": "5", + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of error happens in operate auto-resume", + "fill": 1, + "gridPos": { + "h": 7, + "w": 6, + "x": 12, + "y": 16 + }, + "id": 69, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "dm_worker_operate_error{type=\"AutoResume\"}", + "format": "time_series", + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "auto-resume error", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 1, + "max": "5", + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of error happens in operate stop", + "fill": 1, + "gridPos": { + "h": 7, + "w": 6, + "x": 18, + "y": 16 + }, + "id": 75, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "dm_worker_operate_error{type=\"Stop\"}", + "format": "time_series", + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "stop error", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 1, + "max": "5", + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of error happens in operate pause", + "fill": 1, + "gridPos": { + "h": 7, + "w": 6, + "x": 0, + "y": 23 + }, + "id": 74, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "dm_worker_operate_error{type=\"Pause\"}", + "format": "time_series", + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "pause error", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 1, + "max": "5", + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of error happens in operate resume", + "fill": 1, + "gridPos": { + "h": 7, + "w": 6, + "x": 6, + "y": 23 + }, + "id": 78, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "dm_worker_operate_error{type=\"Resume\"}", + "format": "time_series", + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "resume error", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 1, + "max": "5", + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of error happens in operate start", + "fill": 1, + "gridPos": { + "h": 7, + "w": 6, + "x": 12, + "y": 23 + }, + "id": 76, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "dm_worker_operate_error{type=\"Start\"}", + "format": "time_series", + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "start error", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 1, + "max": "5", + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of error happens in operate update", + "fill": 1, + "gridPos": { + "h": 7, + "w": 6, + "x": 18, + "y": 23 + }, + "id": 77, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "dm_worker_operate_error{type=\"Update\"}", + "format": "time_series", + "intervalFactor": 2, + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "update error", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 1, + "max": "5", + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "title": "operate error", + "type": "row" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 30 + }, "id": 54, "panels": [ { @@ -809,7 +1519,7 @@ "h": 1, "w": 24, "x": 0, - "y": 16 + "y": 31 }, "id": 55, "panels": [ @@ -1905,7 +2615,7 @@ "h": 1, "w": 24, "x": 0, - "y": 17 + "y": 32 }, "id": 56, "panels": [ @@ -2619,7 +3329,7 @@ "h": 1, "w": 24, "x": 0, - "y": 18 + "y": 33 }, "id": 57, "panels": [ From abe974b6ce4cb7c3d2979c5ec070f686b8316f1c Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 16 Jul 2020 16:12:52 +0800 Subject: [PATCH 2/4] finish syncer part changing from instance to source --- dm/dm-ansible/scripts/dm.json | 106 ++++++++++++++++++++-------------- syncer/metrics.go | 32 +++++----- syncer/syncer.go | 68 +++++++++++----------- 3 files changed, 114 insertions(+), 92 deletions(-) diff --git a/dm/dm-ansible/scripts/dm.json b/dm/dm-ansible/scripts/dm.json index a248999ed0..104e59356e 100644 --- a/dm/dm-ansible/scripts/dm.json +++ b/dm/dm-ansible/scripts/dm.json @@ -1464,7 +1464,7 @@ "tableColumn": "", "targets": [ { - "expr": "dm_worker_task_state{task=\"$task\",instance=\"$instance\"}", + "expr": "dm_worker_task_state{task=\"$task\",source_id=\"$source\"}", "format": "time_series", "intervalFactor": 2, "refId": "A" @@ -2234,14 +2234,14 @@ "steppedLine": false, "targets": [ { - "expr": "dm_relay_binlog_pos{instance=\"$instance\", node=\"master\"}", + "expr": "dm_relay_binlog_pos{source_id=\"$source\", node=\"master\"}", "format": "time_series", "intervalFactor": 2, "legendFormat": "master", "refId": "A" }, { - "expr": "dm_relay_binlog_pos{instance=\"$instance\", node=\"relay\"}", + "expr": "dm_relay_binlog_pos{source_id=\"$source\", node=\"relay\"}", "format": "time_series", "intervalFactor": 2, "legendFormat": "relay", @@ -2331,7 +2331,7 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(dm_relay_read_binlog_duration_bucket{instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.90, sum(rate(dm_relay_read_binlog_duration_bucket{source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "interval": "", "intervalFactor": 2, @@ -2339,14 +2339,14 @@ "refId": "A" }, { - "expr": "histogram_quantile(0.95, sum(rate(dm_relay_read_binlog_duration_bucket{instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.95, sum(rate(dm_relay_read_binlog_duration_bucket{source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "95", "refId": "B" }, { - "expr": "histogram_quantile(0.99, sum(rate(dm_relay_read_binlog_duration_bucket{instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.99, sum(rate(dm_relay_read_binlog_duration_bucket{source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "99", @@ -3395,7 +3395,7 @@ "tableColumn": "", "targets": [ { - "expr": "dm_syncer_remaining_time{task=\"$task\", instance=\"$instance\"}", + "expr": "dm_syncer_remaining_time{task=\"$task\", source_id=\"$source\"}", "format": "time_series", "intervalFactor": 2, "refId": "A" @@ -3557,7 +3557,7 @@ "tableColumn": "", "targets": [ { - "expr": "changes(dm_syncer_exit_with_error_count{task=\"$task\", instance=\"$instance\"}[30m])", + "expr": "changes(dm_syncer_exit_with_error_count{task=\"$task\", source_id=\"$source\"}[30m])", "format": "time_series", "intervalFactor": 2, "refId": "A" @@ -3614,7 +3614,7 @@ "steppedLine": false, "targets": [ { - "expr": "dm_syncer_binlog_file{instance=\"$instance\", task=\"$task\", node=\"master\"} - ON(instance, task, job) dm_syncer_binlog_file{instance=\"$instance\", task=\"$task\", node=\"syncer\"}", + "expr": "dm_syncer_binlog_file{source_id=\"$source\", task=\"$task\", node=\"master\"} - ON(instance, task, job) dm_syncer_binlog_file{source_id=\"$source\", task=\"$task\", node=\"syncer\"}", "format": "time_series", "hide": false, "intervalFactor": 2, @@ -3702,7 +3702,7 @@ "steppedLine": false, "targets": [ { - "expr": "dm_relay_binlog_file{instance=\"$instance\", node=\"relay\"} - ON(instance, job) dm_syncer_binlog_file{instance=\"$instance\", task=\"$task\", node=\"syncer\"}", + "expr": "dm_relay_binlog_file{instance=\"$instance\", node=\"relay\"} - ON(instance, job) dm_syncer_binlog_file{source_id=\"$source\", task=\"$task\", node=\"syncer\"}", "format": "time_series", "hide": false, "intervalFactor": 2, @@ -3792,28 +3792,28 @@ "steppedLine": false, "targets": [ { - "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", instance=\"$instance\", type=\"write_rows\"}[1m])", + "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", source_id=\"$source\", type=\"write_rows\"}[1m])", "format": "time_series", "intervalFactor": 2, "legendFormat": "insert", "refId": "A" }, { - "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", instance=\"$instance\", type=\"update_rows\"}[1m])", + "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", source_id=\"$source\", type=\"update_rows\"}[1m])", "format": "time_series", "intervalFactor": 2, "legendFormat": "update", "refId": "B" }, { - "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", instance=\"$instance\", type=\"delete_rows\"}[1m])", + "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", source_id=\"$source\", type=\"delete_rows\"}[1m])", "format": "time_series", "intervalFactor": 2, "legendFormat": "delete", "refId": "C" }, { - "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", instance=\"$instance\", type=\"query\"}[1m])", + "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", source_id=\"$source\", type=\"query\"}[1m])", "format": "time_series", "intervalFactor": 2, "legendFormat": "query", @@ -3902,14 +3902,14 @@ "steppedLine": false, "targets": [ { - "expr": "rate(dm_syncer_skip_binlog_duration_count{task=\"$task\", instance=\"$instance\", type=\"rows\"}[1m])", + "expr": "rate(dm_syncer_skip_binlog_duration_count{task=\"$task\", source_id=\"$source\", type=\"rows\"}[1m])", "format": "time_series", "intervalFactor": 2, "legendFormat": "rows", "refId": "A" }, { - "expr": "rate(dm_syncer_skip_binlog_duration_count{task=\"$task\", instance=\"$instance\", type=\"query\"}[1m])", + "expr": "rate(dm_syncer_skip_binlog_duration_count{task=\"$task\", source_id=\"$source\", type=\"query\"}[1m])", "format": "time_series", "intervalFactor": 2, "legendFormat": "query", @@ -3998,21 +3998,21 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_read_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_read_binlog_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "90", "refId": "A" }, { - "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_read_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_read_binlog_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "95", "refId": "B" }, { - "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_read_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_read_binlog_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "99", @@ -4102,21 +4102,21 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_binlog_transform_cost_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_binlog_transform_cost_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "90", "refId": "A" }, { - "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_binlog_transform_cost_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_binlog_transform_cost_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "95", "refId": "B" }, { - "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_binlog_transform_cost_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_binlog_transform_cost_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "99", @@ -4206,21 +4206,21 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_dispatch_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_dispatch_binlog_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "90", "refId": "A" }, { - "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_dispatch_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_dispatch_binlog_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "95", "refId": "B" }, { - "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_dispatch_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_dispatch_binlog_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "99", @@ -4414,21 +4414,21 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_binlog_event_size_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_binlog_event_size_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "90", "refId": "A" }, { - "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_binlog_event_size_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_binlog_event_size_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "95", "refId": "B" }, { - "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_binlog_event_size_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_binlog_event_size_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "99", @@ -4518,7 +4518,7 @@ "steppedLine": false, "targets": [ { - "expr": "dm_syncer_queue_size{task=\"$task\", instance=\"$instance\"}", + "expr": "dm_syncer_queue_size{task=\"$task\", source_id=\"$source\"}", "format": "time_series", "intervalFactor": 2, "legendFormat": "{{queueNo}}", @@ -4607,7 +4607,7 @@ "steppedLine": false, "targets": [ { - "expr": "rate(dm_syncer_added_jobs_total{task=\"$task\", instance=\"$instance\"}[1m])", + "expr": "rate(dm_syncer_added_jobs_total{task=\"$task\", source_id=\"$source\"}[1m])", "format": "time_series", "intervalFactor": 2, "legendFormat": "{{queueNo}}-{{type}}", @@ -4696,7 +4696,7 @@ "steppedLine": false, "targets": [ { - "expr": "rate(dm_syncer_finished_jobs_total{task=\"$task\", instance=\"$instance\"}[1m])", + "expr": "rate(dm_syncer_finished_jobs_total{task=\"$task\", source_id=\"$source\"}[1m])", "format": "time_series", "intervalFactor": 2, "legendFormat": "{{queueNo}}-{{type}}", @@ -4896,21 +4896,21 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_add_job_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_add_job_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "90", "refId": "A" }, { - "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_add_job_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_add_job_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "95", "refId": "B" }, { - "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_add_job_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_add_job_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "99", @@ -5000,21 +5000,21 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "90", "refId": "A" }, { - "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "95", "refId": "B" }, { - "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "99", @@ -5104,21 +5104,21 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_skip_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_skip_binlog_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "90", "refId": "A" }, { - "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_skip_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_skip_binlog_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "95", "refId": "B" }, { - "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_skip_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_skip_binlog_duration_bucket{task=\"$task\", source_id=\"$source\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "99", @@ -5206,7 +5206,7 @@ "steppedLine": false, "targets": [ { - "expr": "dm_syncer_unsynced_table_number{instance=\"$instance\", task=\"$task\"}", + "expr": "dm_syncer_unsynced_table_number{source_id=\"$source\", task=\"$task\"}", "format": "time_series", "instant": false, "intervalFactor": 2, @@ -5294,7 +5294,7 @@ "steppedLine": false, "targets": [ { - "expr": "dm_syncer_shard_lock_resolving{instance=\"$instance\", task=\"$task\"}", + "expr": "dm_syncer_shard_lock_resolving{source_id=\"$source\", task=\"$task\"}", "format": "time_series", "instant": false, "intervalFactor": 2, @@ -5378,6 +5378,28 @@ "type": "query", "useTags": false }, + { + "allValue": null, + "current": {}, + "datasource": "${DS_TEST-CLUSTER}", + "definition": "", + "hide": 0, + "includeAll": false, + "label": null, + "multi": true, + "name": "source", + "options": [], + "query": "label_values(dm_worker_task_state, source_id)", + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false + }, { "allValue": null, "current": {}, diff --git a/syncer/metrics.go b/syncer/metrics.go index d1b2dde459..528e4be5ae 100644 --- a/syncer/metrics.go +++ b/syncer/metrics.go @@ -34,7 +34,7 @@ var ( Name: "read_binlog_duration", Help: "bucketed histogram of read time (s) for single binlog event from the relay log or master.", Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), - }, []string{"task"}) + }, []string{"task", "source_id"}) binlogEventSizeHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ @@ -43,7 +43,7 @@ var ( Name: "binlog_event_size", Help: "size of a binlog event", Buckets: prometheus.ExponentialBuckets(16, 2, 20), - }, []string{"task"}) + }, []string{"task", "source_id"}) binlogEvent = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ @@ -52,7 +52,7 @@ var ( Name: "binlog_transform_cost", Help: "cost of binlog event transform", Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), - }, []string{"type", "task"}) + }, []string{"type", "task", "source_id"}) conflictDetectDurationHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ @@ -61,7 +61,7 @@ var ( Name: "conflict_detect_duration", Help: "bucketed histogram of conflict detect time (s) for single DML statement", Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), - }, []string{"task"}) + }, []string{"task", "source_id"}) addJobDurationHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ @@ -70,7 +70,7 @@ var ( Name: "add_job_duration", Help: "bucketed histogram of add a job to the queue time (s)", Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), - }, []string{"type", "task", "queueNo"}) + }, []string{"type", "task", "queueNo", "source_id"}) // dispatch/add multiple jobs for one binlog event. // NOTE: only observe for DML now. @@ -81,7 +81,7 @@ var ( Name: "dispatch_binlog_duration", Help: "bucketed histogram of dispatch a binlog event time (s)", Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), - }, []string{"type", "task"}) + }, []string{"type", "task", "source_id"}) skipBinlogDurationHistogram = metricsproxy.NewHistogramVec( prometheus.HistogramOpts{ @@ -90,7 +90,7 @@ var ( Name: "skip_binlog_duration", Help: "bucketed histogram of skip a binlog event time (s)", Buckets: prometheus.ExponentialBuckets(0.0000005, 2, 25), // this should be very fast. - }, []string{"type", "task"}) + }, []string{"type", "task", "source_id"}) addedJobsTotal = metricsproxy.NewCounterVec( prometheus.CounterOpts{ @@ -98,7 +98,7 @@ var ( Subsystem: "syncer", Name: "added_jobs_total", Help: "total number of added jobs", - }, []string{"type", "task", "queueNo"}) + }, []string{"type", "task", "queueNo", "source_id"}) finishedJobsTotal = metricsproxy.NewCounterVec( prometheus.CounterOpts{ @@ -106,7 +106,7 @@ var ( Subsystem: "syncer", Name: "finished_jobs_total", Help: "total number of finished jobs", - }, []string{"type", "task", "queueNo"}) + }, []string{"type", "task", "queueNo", "source_id"}) queueSizeGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ @@ -114,7 +114,7 @@ var ( Subsystem: "syncer", Name: "queue_size", Help: "remain size of the DML queue", - }, []string{"task", "queueNo"}) + }, []string{"task", "queueNo", "source_id"}) binlogPosGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ @@ -122,7 +122,7 @@ var ( Subsystem: "syncer", Name: "binlog_pos", Help: "current binlog pos", - }, []string{"node", "task"}) + }, []string{"node", "task", "source_id"}) binlogFileGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ @@ -130,7 +130,7 @@ var ( Subsystem: "syncer", Name: "binlog_file", Help: "current binlog file index", - }, []string{"node", "task"}) + }, []string{"node", "task", "source_id"}) sqlRetriesTotal = metricsproxy.NewCounterVec( prometheus.CounterOpts{ @@ -174,7 +174,7 @@ var ( Subsystem: "syncer", Name: "exit_with_error_count", Help: "counter for syncer exits with error", - }, []string{"task"}) + }, []string{"task", "source_id"}) // some problems with it replicationLagGauge = metricsproxy.NewGaugeVec( @@ -191,7 +191,7 @@ var ( Subsystem: "syncer", Name: "remaining_time", Help: "the remaining time in second to catch up master", - }, []string{"task"}) + }, []string{"task", "source_id"}) unsyncedTableGauge = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ @@ -199,7 +199,7 @@ var ( Subsystem: "syncer", Name: "unsynced_table_number", Help: "number of unsynced tables in the subtask", - }, []string{"task", "table"}) + }, []string{"task", "table", "source_id"}) shardLockResolving = metricsproxy.NewGaugeVec( prometheus.GaugeOpts{ @@ -207,7 +207,7 @@ var ( Subsystem: "syncer", Name: "shard_lock_resolving", Help: "waiting shard DDL lock to be resolved", - }, []string{"task"}) + }, []string{"task", "source_id"}) ) // RegisterMetrics registers metrics diff --git a/syncer/syncer.go b/syncer/syncer.go index c25f3b3b3e..8dff600c12 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -479,7 +479,7 @@ func (s *Syncer) resetDBs(tctx *tcontext.Context) error { // Process implements the dm.Unit interface. func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { - syncerExitWithErrorCounter.WithLabelValues(s.cfg.Name).Add(0) + syncerExitWithErrorCounter.WithLabelValues(s.cfg.Name, s.cfg.SourceID).Add(0) newCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -501,7 +501,7 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { return } cancel() // cancel s.Run - syncerExitWithErrorCounter.WithLabelValues(s.cfg.Name).Inc() + syncerExitWithErrorCounter.WithLabelValues(s.cfg.Name, s.cfg.SourceID).Inc() errs = append(errs, err) } }() @@ -524,7 +524,7 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { wg.Wait() // wait for receive all fatal from s.runFatalChan if err != nil { - syncerExitWithErrorCounter.WithLabelValues(s.cfg.Name).Inc() + syncerExitWithErrorCounter.WithLabelValues(s.cfg.Name, s.cfg.SourceID).Inc() errs = append(errs, unit.NewProcessError(err)) } @@ -652,17 +652,17 @@ func (s *Syncer) addCount(isFinished bool, queueBucket string, tp opType, n int6 switch tp { case insert: - m.WithLabelValues("insert", s.cfg.Name, queueBucket).Add(float64(n)) + m.WithLabelValues("insert", s.cfg.Name, queueBucket, s.cfg.SourceID).Add(float64(n)) case update: - m.WithLabelValues("update", s.cfg.Name, queueBucket).Add(float64(n)) + m.WithLabelValues("update", s.cfg.Name, queueBucket, s.cfg.SourceID).Add(float64(n)) case del: - m.WithLabelValues("del", s.cfg.Name, queueBucket).Add(float64(n)) + m.WithLabelValues("del", s.cfg.Name, queueBucket, s.cfg.SourceID).Add(float64(n)) case ddl: - m.WithLabelValues("ddl", s.cfg.Name, queueBucket).Add(float64(n)) + m.WithLabelValues("ddl", s.cfg.Name, queueBucket, s.cfg.SourceID).Add(float64(n)) case xid: // ignore xid jobs case flush: - m.WithLabelValues("flush", s.cfg.Name, queueBucket).Add(float64(n)) + m.WithLabelValues("flush", s.cfg.Name, queueBucket, s.cfg.SourceID).Add(float64(n)) case skip: // ignore skip jobs default: @@ -703,26 +703,26 @@ func (s *Syncer) addJob(job *job) error { s.saveGlobalPoint(job.location) return nil case flush: - addedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() + addedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName, s.cfg.SourceID).Inc() // ugly code addJob and sync, refine it later s.jobWg.Add(s.cfg.WorkerCount) for i := 0; i < s.cfg.WorkerCount; i++ { startTime := time.Now() s.jobs[i] <- job // flush for every DML queue - addJobDurationHistogram.WithLabelValues("flush", s.cfg.Name, s.queueBucketMapping[i]).Observe(time.Since(startTime).Seconds()) + addJobDurationHistogram.WithLabelValues("flush", s.cfg.Name, s.queueBucketMapping[i], s.cfg.SourceID).Observe(time.Since(startTime).Seconds()) } s.jobWg.Wait() - finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() + finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName, s.cfg.SourceID).Inc() return s.flushCheckPoints() case ddl: s.jobWg.Wait() - addedJobsTotal.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Inc() + addedJobsTotal.WithLabelValues("ddl", s.cfg.Name, adminQueueName, s.cfg.SourceID).Inc() s.jobWg.Add(1) queueBucket = s.cfg.WorkerCount startTime := time.Now() s.jobs[queueBucket] <- job - addJobDurationHistogram.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Observe(time.Since(startTime).Seconds()) + addJobDurationHistogram.WithLabelValues("ddl", s.cfg.Name, adminQueueName, s.cfg.SourceID).Observe(time.Since(startTime).Seconds()) case insert, update, del: s.jobWg.Add(1) queueBucket = int(utils.GenHashKey(job.key)) % s.cfg.WorkerCount @@ -730,7 +730,7 @@ func (s *Syncer) addJob(job *job) error { startTime := time.Now() s.tctx.L().Debug("queue for key", zap.Int("queue", queueBucket), zap.String("key", job.key)) s.jobs[queueBucket] <- job - addJobDurationHistogram.WithLabelValues(job.tp.String(), s.cfg.Name, s.queueBucketMapping[queueBucket]).Observe(time.Since(startTime).Seconds()) + addJobDurationHistogram.WithLabelValues(job.tp.String(), s.cfg.Name, s.queueBucketMapping[queueBucket], s.cfg.SourceID).Observe(time.Since(startTime).Seconds()) } wait := s.checkWait(job) @@ -992,7 +992,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo for { select { case sqlJob, ok := <-jobChan: - queueSizeGauge.WithLabelValues(s.cfg.Name, queueBucket).Set(float64(len(jobChan))) + queueSizeGauge.WithLabelValues(s.cfg.Name, queueBucket, s.cfg.SourceID).Set(float64(len(jobChan))) if !ok { return } @@ -1257,20 +1257,20 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } // time duration for reading an event from relay log or upstream master. - binlogReadDurationHistogram.WithLabelValues(s.cfg.Name).Observe(time.Since(startTime).Seconds()) + binlogReadDurationHistogram.WithLabelValues(s.cfg.Name, s.cfg.SourceID).Observe(time.Since(startTime).Seconds()) startTime = time.Now() // reset start time for the next metric. // get binlog event, reset tryReSync, so we can re-sync binlog while syncer meets errors next time tryReSync = true - binlogPosGauge.WithLabelValues("syncer", s.cfg.Name).Set(float64(e.Header.LogPos)) + binlogPosGauge.WithLabelValues("syncer", s.cfg.Name, s.cfg.SourceID).Set(float64(e.Header.LogPos)) index, err := binlog.GetFilenameIndex(lastLocation.Position.Name) if err != nil { s.tctx.L().Error("fail to get index number of binlog file", log.ShortError(err)) } else { - binlogFileGauge.WithLabelValues("syncer", s.cfg.Name).Set(float64(index)) + binlogFileGauge.WithLabelValues("syncer", s.cfg.Name, s.cfg.SourceID).Set(float64(index)) } s.binlogSizeCount.Add(int64(e.Header.EventSize)) - binlogEventSizeHistogram.WithLabelValues(s.cfg.Name).Observe(float64(e.Header.EventSize)) + binlogEventSizeHistogram.WithLabelValues(s.cfg.Name, s.cfg.SourceID).Observe(float64(e.Header.EventSize)) failpoint.Inject("ProcessBinlogSlowDown", nil) @@ -1453,7 +1453,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err return err } if ignore { - skipBinlogDurationHistogram.WithLabelValues("rows", s.cfg.Name).Observe(time.Since(ec.startTime).Seconds()) + skipBinlogDurationHistogram.WithLabelValues("rows", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(ec.startTime).Seconds()) // for RowsEvent, we should record lastLocation rather than currentLocation return s.recordSkipSQLsLocation(*ec.lastLocation) } @@ -1513,7 +1513,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err return terror.Annotatef(err, "gen insert sqls failed, schema: %s, table: %s", schemaName, tableName) } } - binlogEvent.WithLabelValues("write_rows", s.cfg.Name).Observe(time.Since(ec.startTime).Seconds()) + binlogEvent.WithLabelValues("write_rows", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(ec.startTime).Seconds()) *ec.latestOp = insert case replication.UPDATE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: @@ -1524,7 +1524,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err return terror.Annotatef(err, "gen update sqls failed, schema: %s, table: %s", schemaName, tableName) } } - binlogEvent.WithLabelValues("update_rows", s.cfg.Name).Observe(time.Since(ec.startTime).Seconds()) + binlogEvent.WithLabelValues("update_rows", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(ec.startTime).Seconds()) *ec.latestOp = update case replication.DELETE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: @@ -1534,7 +1534,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err return terror.Annotatef(err, "gen delete sqls failed, schema: %s, table: %s", schemaName, tableName) } } - binlogEvent.WithLabelValues("delete_rows", s.cfg.Name).Observe(time.Since(ec.startTime).Seconds()) + binlogEvent.WithLabelValues("delete_rows", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(ec.startTime).Seconds()) *ec.latestOp = del default: @@ -1557,7 +1557,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err return err } } - dispatchBinlogDurationHistogram.WithLabelValues(ec.latestOp.String(), s.cfg.Name).Observe(time.Since(startTime).Seconds()) + dispatchBinlogDurationHistogram.WithLabelValues(ec.latestOp.String(), s.cfg.Name, s.cfg.SourceID).Observe(time.Since(startTime).Seconds()) return nil } @@ -1580,7 +1580,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e } if parseResult.ignore { - skipBinlogDurationHistogram.WithLabelValues("query", s.cfg.Name).Observe(time.Since(ec.startTime).Seconds()) + skipBinlogDurationHistogram.WithLabelValues("query", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(ec.startTime).Seconds()) s.tctx.L().Warn("skip event", zap.String("event", "query"), zap.String("statement", sql), zap.String("schema", usedSchema)) *ec.lastLocation = ec.currentLocation.Clone() // before record skip location, update lastLocation return s.recordSkipSQLsLocation(*ec.lastLocation) @@ -1632,7 +1632,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e return terror.ErrSyncerUnitOnlineDDLOnMultipleTable.Generate(string(ev.Query)) } - binlogEvent.WithLabelValues("query", s.cfg.Name).Observe(time.Since(ec.startTime).Seconds()) + binlogEvent.WithLabelValues("query", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(ec.startTime).Seconds()) /* we construct a application transaction for ddl. we save checkpoint after we execute all ddls @@ -1658,7 +1658,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e return handleErr } if len(sqlDDL) == 0 { - skipBinlogDurationHistogram.WithLabelValues("query", s.cfg.Name).Observe(time.Since(ec.startTime).Seconds()) + skipBinlogDurationHistogram.WithLabelValues("query", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(ec.startTime).Seconds()) s.tctx.L().Warn("skip event", zap.String("event", "query"), zap.String("statement", sql), zap.String("schema", usedSchema)) continue } @@ -1863,7 +1863,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e if needShardingHandle { target, _ := GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) - unsyncedTableGauge.WithLabelValues(s.cfg.Name, target).Set(float64(remain)) + unsyncedTableGauge.WithLabelValues(s.cfg.Name, target, s.cfg.SourceID).Set(float64(remain)) err = ec.safeMode.IncrForTable(s.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try enable safe-mode when starting syncing for sharding group if err != nil { return err @@ -1917,11 +1917,11 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e if err2 != nil { return err2 } - shardLockResolving.WithLabelValues(s.cfg.Name).Set(1) // block and wait DDL lock to be synced + shardLockResolving.WithLabelValues(s.cfg.Name, s.cfg.SourceID).Set(1) // block and wait DDL lock to be synced s.tctx.L().Info("putted shard DDL info", zap.Stringer("info", shardInfo), zap.Int64("revision", rev)) shardOp, err2 := s.pessimist.GetOperation(ec.tctx.Ctx, shardInfo, rev+1) - shardLockResolving.WithLabelValues(s.cfg.Name).Set(0) + shardLockResolving.WithLabelValues(s.cfg.Name, s.cfg.SourceID).Set(0) if err2 != nil { return err2 } @@ -2059,7 +2059,7 @@ func (s *Syncer) commitJob(tp opType, sourceSchema, sourceTable, targetSchema, t return terror.ErrSyncerUnitResolveCasualityFail.Generate(err) } s.tctx.L().Debug("key for keys", zap.String("key", key), zap.Strings("keys", keys)) - conflictDetectDurationHistogram.WithLabelValues(s.cfg.Name).Observe(time.Since(startTime).Seconds()) + conflictDetectDurationHistogram.WithLabelValues(s.cfg.Name, s.cfg.SourceID).Observe(time.Since(startTime).Seconds()) job := newJob(tp, sourceSchema, sourceTable, targetSchema, targetTable, sql, args, key, location, cmdLocation, traceID) return s.addJobFunc(job) @@ -2161,7 +2161,7 @@ func (s *Syncer) printStatus(ctx context.Context) { zap.Int64("bytes/Second", bytesPerSec), zap.Int64("unsynced binlog size", remainingSize), zap.Int64("estimate time to catch up", remainingSeconds)) - remainingTimeGauge.WithLabelValues(s.cfg.Name).Set(float64(remainingSeconds)) + remainingTimeGauge.WithLabelValues(s.cfg.Name, s.cfg.SourceID).Set(float64(remainingSeconds)) } } } @@ -2170,12 +2170,12 @@ func (s *Syncer) printStatus(ctx context.Context) { if err != nil { s.tctx.L().Error("fail to get master status", log.ShortError(err)) } else { - binlogPosGauge.WithLabelValues("master", s.cfg.Name).Set(float64(latestMasterPos.Pos)) + binlogPosGauge.WithLabelValues("master", s.cfg.Name, s.cfg.SourceID).Set(float64(latestMasterPos.Pos)) index, err := binlog.GetFilenameIndex(latestMasterPos.Name) if err != nil { s.tctx.L().Error("fail to parse binlog file", log.ShortError(err)) } else { - binlogFileGauge.WithLabelValues("master", s.cfg.Name).Set(float64(index)) + binlogFileGauge.WithLabelValues("master", s.cfg.Name, s.cfg.SourceID).Set(float64(index)) } } From 6b8f4db1276e80570994e7b713c2e349c5f47645 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 17 Jul 2020 11:58:12 +0800 Subject: [PATCH 3/4] improve grafana --- dm/dm-ansible/scripts/dm.json | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/dm/dm-ansible/scripts/dm.json b/dm/dm-ansible/scripts/dm.json index 104e59356e..37ddbf3b6d 100644 --- a/dm/dm-ansible/scripts/dm.json +++ b/dm/dm-ansible/scripts/dm.json @@ -2234,14 +2234,14 @@ "steppedLine": false, "targets": [ { - "expr": "dm_relay_binlog_pos{source_id=\"$source\", node=\"master\"}", + "expr": "dm_relay_binlog_pos{instance=\"$instance\", node=\"master\"}", "format": "time_series", "intervalFactor": 2, "legendFormat": "master", "refId": "A" }, { - "expr": "dm_relay_binlog_pos{source_id=\"$source\", node=\"relay\"}", + "expr": "dm_relay_binlog_pos{instance=\"$instance\", node=\"relay\"}", "format": "time_series", "intervalFactor": 2, "legendFormat": "relay", @@ -2331,7 +2331,7 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(dm_relay_read_binlog_duration_bucket{source_id=\"$source\"}[1m])) by (le))", + "expr": "histogram_quantile(0.90, sum(rate(dm_relay_read_binlog_duration_bucket{instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "interval": "", "intervalFactor": 2, @@ -2339,14 +2339,14 @@ "refId": "A" }, { - "expr": "histogram_quantile(0.95, sum(rate(dm_relay_read_binlog_duration_bucket{source_id=\"$source\"}[1m])) by (le))", + "expr": "histogram_quantile(0.95, sum(rate(dm_relay_read_binlog_duration_bucket{instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "95", "refId": "B" }, { - "expr": "histogram_quantile(0.99, sum(rate(dm_relay_read_binlog_duration_bucket{source_id=\"$source\"}[1m])) by (le))", + "expr": "histogram_quantile(0.99, sum(rate(dm_relay_read_binlog_duration_bucket{instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "99", @@ -3614,7 +3614,7 @@ "steppedLine": false, "targets": [ { - "expr": "dm_syncer_binlog_file{source_id=\"$source\", task=\"$task\", node=\"master\"} - ON(instance, task, job) dm_syncer_binlog_file{source_id=\"$source\", task=\"$task\", node=\"syncer\"}", + "expr": "dm_syncer_binlog_file{source_id=\"$source\", task=\"$task\", node=\"master\"} - ON(source, task, job) dm_syncer_binlog_file{source_id=\"$source\", task=\"$task\", node=\"syncer\"}", "format": "time_series", "hide": false, "intervalFactor": 2, @@ -3702,7 +3702,7 @@ "steppedLine": false, "targets": [ { - "expr": "dm_relay_binlog_file{instance=\"$instance\", node=\"relay\"} - ON(instance, job) dm_syncer_binlog_file{source_id=\"$source\", task=\"$task\", node=\"syncer\"}", + "expr": "dm_relay_binlog_file{instance=\"$instance\", node=\"relay\"} - ON(instance, job) dm_syncer_binlog_file{instance=\"$instance\", task=\"$task\", node=\"syncer\"}", "format": "time_series", "hide": false, "intervalFactor": 2, @@ -5408,7 +5408,7 @@ "hide": 0, "includeAll": false, "label": null, - "multi": false, + "multi": true, "name": "instance", "options": [], "query": "label_values(dm_worker_task_state{task=\"$task\"}, instance)", From 0e4dae29bdaa1b22d6349e4769ab255a2ca413d4 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 21 Jul 2020 11:10:57 +0800 Subject: [PATCH 4/4] address comments --- dm/dm-ansible/scripts/dm.json | 50 +++++++++++++++++------------------ 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/dm/dm-ansible/scripts/dm.json b/dm/dm-ansible/scripts/dm.json index 7c6d5f0c36..8bddbfc3aa 100644 --- a/dm/dm-ansible/scripts/dm.json +++ b/dm/dm-ansible/scripts/dm.json @@ -872,7 +872,7 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The number of error happens in operate auto-resume", + "description": "The number of error happens in operate start", "fill": 1, "gridPos": { "h": 7, @@ -880,7 +880,7 @@ "x": 12, "y": 16 }, - "id": 69, + "id": 76, "legend": { "avg": false, "current": false, @@ -904,7 +904,7 @@ "steppedLine": false, "targets": [ { - "expr": "dm_worker_operate_error{type=\"AutoResume\"}", + "expr": "dm_worker_operate_error{type=\"Start\"}", "format": "time_series", "intervalFactor": 2, "refId": "A" @@ -914,7 +914,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "auto-resume error", + "title": "start error", "tooltip": { "shared": true, "sort": 0, @@ -959,7 +959,7 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The number of error happens in operate stop", + "description": "The number of error happens in operate pause", "fill": 1, "gridPos": { "h": 7, @@ -967,7 +967,7 @@ "x": 18, "y": 16 }, - "id": 75, + "id": 74, "legend": { "avg": false, "current": false, @@ -991,7 +991,7 @@ "steppedLine": false, "targets": [ { - "expr": "dm_worker_operate_error{type=\"Stop\"}", + "expr": "dm_worker_operate_error{type=\"Pause\"}", "format": "time_series", "intervalFactor": 2, "refId": "A" @@ -1001,7 +1001,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "stop error", + "title": "pause error", "tooltip": { "shared": true, "sort": 0, @@ -1046,7 +1046,7 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The number of error happens in operate pause", + "description": "The number of error happens in operate resume", "fill": 1, "gridPos": { "h": 7, @@ -1054,7 +1054,7 @@ "x": 0, "y": 23 }, - "id": 74, + "id": 78, "legend": { "avg": false, "current": false, @@ -1078,7 +1078,7 @@ "steppedLine": false, "targets": [ { - "expr": "dm_worker_operate_error{type=\"Pause\"}", + "expr": "dm_worker_operate_error{type=\"Resume\"}", "format": "time_series", "intervalFactor": 2, "refId": "A" @@ -1088,7 +1088,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "pause error", + "title": "resume error", "tooltip": { "shared": true, "sort": 0, @@ -1133,15 +1133,15 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The number of error happens in operate resume", + "description": "The number of error happens in operate auto-resume", "fill": 1, "gridPos": { "h": 7, "w": 6, "x": 6, - "y": 23 + "y": 16 }, - "id": 78, + "id": 69, "legend": { "avg": false, "current": false, @@ -1165,7 +1165,7 @@ "steppedLine": false, "targets": [ { - "expr": "dm_worker_operate_error{type=\"Resume\"}", + "expr": "dm_worker_operate_error{type=\"AutoResume\"}", "format": "time_series", "intervalFactor": 2, "refId": "A" @@ -1175,7 +1175,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "resume error", + "title": "auto-resume error", "tooltip": { "shared": true, "sort": 0, @@ -1220,7 +1220,7 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The number of error happens in operate start", + "description": "The number of error happens in operate update", "fill": 1, "gridPos": { "h": 7, @@ -1228,7 +1228,7 @@ "x": 12, "y": 23 }, - "id": 76, + "id": 77, "legend": { "avg": false, "current": false, @@ -1252,7 +1252,7 @@ "steppedLine": false, "targets": [ { - "expr": "dm_worker_operate_error{type=\"Start\"}", + "expr": "dm_worker_operate_error{type=\"Update\"}", "format": "time_series", "intervalFactor": 2, "refId": "A" @@ -1262,7 +1262,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "start error", + "title": "update error", "tooltip": { "shared": true, "sort": 0, @@ -1307,7 +1307,7 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The number of error happens in operate update", + "description": "The number of error happens in operate stop", "fill": 1, "gridPos": { "h": 7, @@ -1315,7 +1315,7 @@ "x": 18, "y": 23 }, - "id": 77, + "id": 75, "legend": { "avg": false, "current": false, @@ -1339,7 +1339,7 @@ "steppedLine": false, "targets": [ { - "expr": "dm_worker_operate_error{type=\"Update\"}", + "expr": "dm_worker_operate_error{type=\"Stop\"}", "format": "time_series", "intervalFactor": 2, "refId": "A" @@ -1349,7 +1349,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "update error", + "title": "stop error", "tooltip": { "shared": true, "sort": 0,