From d79bfc904546ab578a29a8187b528b6feb32328b Mon Sep 17 00:00:00 2001 From: Xuecheng Zhang Date: Sun, 12 Apr 2020 20:20:28 +0800 Subject: [PATCH] syncer: improve performance (#594) --- dm/config/task.go | 4 + dm/dm-ansible/scripts/dm.json | 556 ++++++++++++++++++++++++++++++---- loader/metrics.go | 6 +- relay/metrics.go | 6 +- syncer/dml.go | 217 ++++++------- syncer/dml_test.go | 26 -- syncer/metrics.go | 34 ++- syncer/syncer.go | 11 +- syncer/syncer_test.go | 65 ++-- 9 files changed, 686 insertions(+), 239 deletions(-) diff --git a/dm/config/task.go b/dm/config/task.go index cfa867fdcb..ab16bdf103 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -57,6 +57,7 @@ var ( // SyncerConfig defaultWorkerCount = 16 defaultBatch = 100 + defaultQueueSize = 5120 ) // Meta represents binlog's meta pos @@ -193,6 +194,8 @@ type SyncerConfig struct { MetaFile string `yaml:"meta-file" toml:"meta-file" json:"meta-file"` // meta filename, used only when load SubConfig directly WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"` Batch int `yaml:"batch" toml:"batch" json:"batch"` + QueueSize int `yaml:"queue-size" toml:"queue-size" json:"queue-size"` + // deprecated MaxRetry int `yaml:"max-retry" toml:"max-retry" json:"max-retry"` @@ -208,6 +211,7 @@ func defaultSyncerConfig() SyncerConfig { return SyncerConfig{ WorkerCount: defaultWorkerCount, Batch: defaultBatch, + QueueSize: defaultQueueSize, } } diff --git a/dm/dm-ansible/scripts/dm.json b/dm/dm-ansible/scripts/dm.json index 09fde73be1..dd6f696c68 100644 --- a/dm/dm-ansible/scripts/dm.json +++ b/dm/dm-ansible/scripts/dm.json @@ -451,14 +451,18 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The number of binlog files in Syncer that are behind the master", + "description": "The number of binlog files in binlog replication unit that are behind the master", "fill": 1, "id": 51, "legend": { + "alignAsTable": false, "avg": false, "current": false, + "hideEmpty": false, + "hideZero": false, "max": false, "min": false, + "rightSide": false, "show": false, "total": false, "values": false @@ -1190,11 +1194,13 @@ "fill": 1, "id": 5, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -1213,10 +1219,18 @@ "steppedLine": false, "targets": [ { - "expr": "dm_relay_binlog_file{instance=\"$instance\"}", + "expr": "dm_relay_binlog_file{instance=\"$instance\", node=\"master\"}", "format": "time_series", "intervalFactor": 2, + "legendFormat": "master", "refId": "A" + }, + { + "expr": "dm_relay_binlog_file{instance=\"$instance\", node=\"relay\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "relay", + "refId": "B" } ], "thresholds": [], @@ -1345,11 +1359,13 @@ "fill": 1, "id": 6, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -1368,10 +1384,18 @@ "steppedLine": false, "targets": [ { - "expr": "dm_relay_binlog_pos{instance=\"$instance\"}", + "expr": "dm_relay_binlog_pos{instance=\"$instance\", node=\"master\"}", "format": "time_series", "intervalFactor": 2, + "legendFormat": "master", "refId": "A" + }, + { + "expr": "dm_relay_binlog_pos{instance=\"$instance\", node=\"relay\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "relay", + "refId": "B" } ], "thresholds": [], @@ -1418,15 +1442,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The duration that the relay log reads binlog from the upstream MySQL (in seconds)", + "description": "The duration that the relay log reads binlog event from the upstream MySQL (in seconds)", "fill": 1, "id": 9, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -1449,13 +1475,28 @@ "format": "time_series", "interval": "", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_relay_read_binlog_duration_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_relay_read_binlog_duration_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "read binlog duration", + "title": "read binlog event duration", "tooltip": { "shared": true, "sort": 0, @@ -1486,7 +1527,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -1496,15 +1537,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The duration that the relay log writes binlog into the disks each time (in seconds)", + "description": "The duration that the relay log writes binlog event into the disks each time (in seconds)", "fill": 1, "id": 12, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -1526,7 +1569,22 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_relay_write_duration_bucket{instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_relay_write_duration_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_relay_write_duration_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], @@ -1563,7 +1621,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -1577,11 +1635,13 @@ "fill": 1, "id": 25, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -1603,13 +1663,28 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_relay_write_size_bucket{instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_relay_write_size_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_relay_write_size_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "binlog size", + "title": "binlog event size", "tooltip": { "shared": true, "sort": 0, @@ -1640,7 +1715,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] } @@ -2122,11 +2197,13 @@ "fill": 1, "id": 18, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -2148,13 +2225,28 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_loader_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_loader_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_loader_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "latency of execute transaction", + "title": "transaction execution latency", "tooltip": { "shared": true, "sort": 0, @@ -2184,7 +2276,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -2194,15 +2286,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The duration that Loader executes a query (in seconds)", + "description": "The time it takes loader to execute every statement to the downstream (in seconds)", "fill": 1, "id": 34, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -2221,16 +2315,38 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(dm_loader_query_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.90, sum(rate(dm_loader_stmt_duration_time_bucket{task=\"$task\", type=\"begin\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "begin", "refId": "A" + }, + { + "expr": "histogram_quantile(0.90, sum(rate(dm_loader_stmt_duration_time_bucket{task=\"$task\", type=\"stmt\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "stmt", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.90, sum(rate(dm_loader_stmt_duration_time_bucket{task=\"$task\", type=\"commit\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "commit", + "refId": "C" + }, + { + "expr": "histogram_quantile(0.90, sum(rate(dm_loader_stmt_duration_time_bucket{task=\"$task\", type=\"rollback\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "rollback", + "refId": "D" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "latency of query", + "title": "statement execution latency - 90", "tooltip": { "shared": true, "sort": 0, @@ -2260,7 +2376,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] } @@ -2510,7 +2626,7 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The number of binlog files in Syncer that are behind the master", + "description": "The number of binlog files in binlog replication unit that are behind the master", "fill": 1, "id": 43, "legend": { @@ -2588,7 +2704,7 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The number of binlog files in Syncer that are behind relay", + "description": "The number of binlog files in binlog replication unit that are behind relay", "fill": 1, "id": 44, "legend": { @@ -2670,11 +2786,13 @@ "fill": 1, "id": 4, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -2693,16 +2811,38 @@ "steppedLine": false, "targets": [ { - "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", instance=\"$instance\"}[1m])", + "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", instance=\"$instance\", type=\"write_rows\"}[1m])", "format": "time_series", "intervalFactor": 2, + "legendFormat": "insert", "refId": "A" + }, + { + "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", instance=\"$instance\", type=\"update_rows\"}[1m])", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "update", + "refId": "B" + }, + { + "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", instance=\"$instance\", type=\"delete_rows\"}[1m])", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "delete", + "refId": "C" + }, + { + "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", instance=\"$instance\", type=\"query\"}[1m])", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "query", + "refId": "D" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "binlog event qps", + "title": "binlog event QPS", "tooltip": { "shared": true, "sort": 0, @@ -2732,7 +2872,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -2778,7 +2918,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "skipped binlog event qps", + "title": "skipped binlog event QPS", "tooltip": { "shared": true, "sort": 0, @@ -2808,7 +2948,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -2818,15 +2958,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The duration that the binlog replication unit reads binlog from the relay log or upstream MySQL (in seconds)", + "description": "The duration that the binlog replication unit reads binlog event from the relay log or upstream MySQL (in seconds)", "fill": 1, "id": 53, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -2848,13 +2990,29 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_read_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_read_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "hide": false, + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_read_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "read binlog duration", + "title": "read binlog event duration", "tooltip": { "shared": true, "sort": 0, @@ -2885,7 +3043,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -2895,15 +3053,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The time it takes Syncer to parse and transform the binlog into SQL statements (in seconds)", + "description": "The time it takes binlog replication unit to parse and transform the binlog into SQL statements (in seconds)", "fill": 1, "id": 36, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -2925,13 +3085,28 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_binlog_transform_cost_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_binlog_transform_cost_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_binlog_transform_cost_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "cost of binlog event transform", + "title": "transform binlog event duration", "tooltip": { "shared": true, "sort": 0, @@ -2962,7 +3137,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -2972,15 +3147,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The time it takes binlog replication unit to detect conflicts between DMLs (in seconds)", + "description": "The time it takes binlog replication unit to dispatch a binlog event (in seconds)", "fill": 1, - "id": 54, + "id": 59, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -2999,16 +3176,31 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_dispatch_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_dispatch_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_dispatch_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "cost of conflict detect", + "title": "dispatch binlog event duration", "tooltip": { "shared": true, "sort": 0, @@ -3039,7 +3231,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3053,11 +3245,13 @@ "fill": 1, "id": 1, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -3079,7 +3273,22 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], @@ -3116,7 +3325,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3126,15 +3335,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The size of a single binlog event that the binlog replication reads from relay log or upstream master", + "description": "The size of a single binlog event that the binlog replication unit reads from relay log or upstream master", "fill": 1, "id": 55, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -3156,7 +3367,22 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_binlog_event_size_bucket{instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_binlog_event_size_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_binlog_event_size_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], @@ -3193,7 +3419,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3207,11 +3433,13 @@ "fill": 1, "id": 56, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -3230,9 +3458,10 @@ "steppedLine": false, "targets": [ { - "expr": "rate(dm_syncer_query_duration_time_count{task=\"$task\", instance=\"$instance\"}[1m])", + "expr": "dm_syncer_queue_size{task=\"$task\", instance=\"$instance\"}", "format": "time_series", "intervalFactor": 2, + "legendFormat": "{{queueNo}}", "refId": "A" } ], @@ -3256,6 +3485,7 @@ }, "yaxes": [ { + "decimals": 0, "format": "short", "label": null, "logBase": 1, @@ -3269,7 +3499,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3283,11 +3513,13 @@ "fill": 1, "id": 3, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -3309,13 +3541,14 @@ "expr": "rate(dm_syncer_added_jobs_total{task=\"$task\", instance=\"$instance\"}[1m])", "format": "time_series", "intervalFactor": 2, + "legendFormat": "{{queueNo}}-{{type}}", "refId": "A" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "total sql jobs", + "title": "total SQL jobs", "tooltip": { "shared": true, "sort": 0, @@ -3345,7 +3578,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3359,11 +3592,13 @@ "fill": 1, "id": 2, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -3385,13 +3620,14 @@ "expr": "rate(dm_syncer_finished_jobs_total{task=\"$task\", instance=\"$instance\"}[1m])", "format": "time_series", "intervalFactor": 2, + "legendFormat": "{{queueNo}}-{{type}}", "refId": "A" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "finished sql jobs", + "title": "finished SQL jobs", "tooltip": { "shared": true, "sort": 0, @@ -3421,7 +3657,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3435,11 +3671,13 @@ "fill": 1, "id": 57, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -3489,7 +3727,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "statement execution latency", + "title": "statement execution latency - 90", "tooltip": { "shared": true, "sort": 0, @@ -3520,7 +3758,195 @@ "logBase": 1, "max": null, "min": null, + "show": false + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The time it takes binlog replication unit to add a job to the queue (in seconds)", + "fill": 1, + "id": 58, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 4, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_add_job_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "90", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_add_job_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_add_job_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "add job duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 2, + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": 0, "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The time it takes binlog replication unit to detect conflicts between DMLs (in seconds)", + "fill": 1, + "id": 54, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 4, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "90", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "DML conflict detect duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 2, + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": 0, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false } ] }, @@ -3552,7 +3978,7 @@ "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "span": 4, + "span": 6, "stack": false, "steppedLine": false, "targets": [ @@ -3598,7 +4024,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3630,7 +4056,7 @@ "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "span": 4, + "span": 6, "stack": false, "steppedLine": false, "targets": [ @@ -3767,5 +4193,5 @@ }, "timezone": "", "title": "DM-task", - "version": 29 + "version": 31 } \ No newline at end of file diff --git a/loader/metrics.go b/loader/metrics.go index e95071756c..959e6fd5d8 100644 --- a/loader/metrics.go +++ b/loader/metrics.go @@ -33,7 +33,7 @@ var ( Subsystem: "loader", Name: "query_duration_time", Help: "Bucketed histogram of query time (s) of a txn.", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) txnHistogram = prometheus.NewHistogramVec( @@ -42,7 +42,7 @@ var ( Subsystem: "loader", Name: "txn_duration_time", Help: "Bucketed histogram of processing time (s) of a txn.", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) stmtHistogram = prometheus.NewHistogramVec( @@ -51,7 +51,7 @@ var ( Subsystem: "loader", Name: "stmt_duration_time", Help: "Bucketed histogram of every statement query time (s).", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"type", "task"}) dataFileGauge = prometheus.NewGaugeVec( diff --git a/relay/metrics.go b/relay/metrics.go index 34c173ed22..1009164c7a 100644 --- a/relay/metrics.go +++ b/relay/metrics.go @@ -83,7 +83,7 @@ var ( Subsystem: "relay", Name: "write_duration", Help: "bucketed histogram of write time (s) of single relay log event", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }) // should alert @@ -110,7 +110,7 @@ var ( Subsystem: "relay", Name: "read_binlog_duration", Help: "bucketed histogram of read time (s) of single binlog event from the master.", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }) binlogTransformDurationHistogram = prometheus.NewHistogram( @@ -119,7 +119,7 @@ var ( Subsystem: "relay", Name: "read_transform_duration", Help: "bucketed histogram of transform time (s) of single binlog event.", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }) // should alert diff --git a/syncer/dml.go b/syncer/dml.go index e8dcc60ce8..99667e1659 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -14,9 +14,9 @@ package syncer import ( - "bytes" "encoding/binary" "fmt" + "io" "strconv" "strings" @@ -102,42 +102,35 @@ func extractValueFromData(data []interface{}, columns []*column) []interface{} { func genInsertSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, error) { var ( - schema = param.schema - table = param.table + fullname = dbutil.TableName(param.schema, param.table) dataSeq = param.data originalDataSeq = param.originalData columns = param.columns originalColumns = param.originalColumns originalIndexColumns = param.originalIndexColumns + sqls = make([]string, 0, len(dataSeq)) + keys = make([][]string, 0, len(dataSeq)) + values = make([][]interface{}, 0, len(dataSeq)) ) - sqls := make([]string, 0, len(dataSeq)) - keys := make([][]string, 0, len(dataSeq)) - values := make([][]interface{}, 0, len(dataSeq)) - columnList := genColumnList(columns) - columnPlaceholders := genColumnPlaceholders(len(columns)) + + var insertOrReplace = "INSERT INTO" + if param.safeMode { + insertOrReplace = "REPLACE INTO" + } + sql := genInsertReplace(insertOrReplace, fullname, columns) + for dataIdx, data := range dataSeq { if len(data) != len(columns) { return nil, nil, nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(columns), len(data)) } value := extractValueFromData(data, columns) - originalData := originalDataSeq[dataIdx] - var originalValue []interface{} - if len(columns) == len(originalColumns) { - originalValue = value - } else { - originalValue = extractValueFromData(originalData, originalColumns) - } - - var insertOrReplace string - if param.safeMode { - insertOrReplace = "REPLACE" - } else { - insertOrReplace = "INSERT" + var originalValue = value + if len(columns) != len(originalColumns) { + originalValue = extractValueFromData(originalDataSeq[dataIdx], originalColumns) } - sql := fmt.Sprintf("%s INTO `%s`.`%s` (%s) VALUES (%s);", insertOrReplace, schema, table, columnList, columnPlaceholders) - ks := genMultipleKeys(originalColumns, originalValue, originalIndexColumns, dbutil.TableName(schema, table)) + ks := genMultipleKeys(originalValue, originalIndexColumns, fullname) sqls = append(sqls, sql) values = append(values, value) keys = append(keys, ks) @@ -148,21 +141,22 @@ func genInsertSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e func genUpdateSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, error) { var ( - schema = param.schema - table = param.table - safeMode = param.safeMode + fullname = dbutil.TableName(param.schema, param.table) data = param.data originalData = param.originalData columns = param.columns originalColumns = param.originalColumns originalIndexColumns = param.originalIndexColumns + defaultIndexColumns = findFitIndex(originalIndexColumns) + replaceSQL string // `REPLACE INTO` SQL + sqls = make([]string, 0, len(data)/2) + keys = make([][]string, 0, len(data)/2) + values = make([][]interface{}, 0, len(data)/2) ) - sqls := make([]string, 0, len(data)/2) - keys := make([][]string, 0, len(data)/2) - values := make([][]interface{}, 0, len(data)/2) - columnList := genColumnList(columns) - columnPlaceholders := genColumnPlaceholders(len(columns)) - defaultIndexColumns := findFitIndex(originalIndexColumns) + + if param.safeMode { + replaceSQL = genInsertReplace("REPLACE INTO", fullname, columns) + } for i := 0; i < len(data); i += 2 { oldData := data[i] @@ -194,23 +188,23 @@ func genUpdateSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e defaultIndexColumns = getAvailableIndexColumn(originalIndexColumns, oriOldValues) } - ks := genMultipleKeys(originalColumns, oriOldValues, originalIndexColumns, dbutil.TableName(schema, table)) - ks = append(ks, genMultipleKeys(originalColumns, oriChangedValues, originalIndexColumns, dbutil.TableName(schema, table))...) + ks := genMultipleKeys(oriOldValues, originalIndexColumns, fullname) + ks = append(ks, genMultipleKeys(oriChangedValues, originalIndexColumns, fullname)...) - if safeMode { + if param.safeMode { // generate delete sql from old data - sql, value := genDeleteSQL(schema, table, oriOldValues, originalColumns, defaultIndexColumns) + sql, value := genDeleteSQL(fullname, oriOldValues, originalColumns, defaultIndexColumns) sqls = append(sqls, sql) values = append(values, value) keys = append(keys, ks) // generate replace sql from new data - sql = fmt.Sprintf("REPLACE INTO `%s`.`%s` (%s) VALUES (%s);", schema, table, columnList, columnPlaceholders) - sqls = append(sqls, sql) + sqls = append(sqls, replaceSQL) values = append(values, changedValues) keys = append(keys, ks) continue } + // NOTE: move these variables outer of `for` if needed (to reuse). updateColumns := make([]*column, 0, len(defaultIndexColumns)) updateValues := make([]interface{}, 0, len(defaultIndexColumns)) for j := range oldValues { @@ -224,18 +218,17 @@ func genUpdateSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e } value := make([]interface{}, 0, len(oldData)) - kvs := genKVs(updateColumns) value = append(value, updateValues...) whereColumns, whereValues := originalColumns, oriOldValues if len(defaultIndexColumns) > 0 { - whereColumns, whereValues = getColumnData(originalColumns, defaultIndexColumns, oriOldValues) + whereColumns = defaultIndexColumns + whereValues = getColumnData(defaultIndexColumns, oriOldValues) } - where := genWhere(whereColumns, whereValues) value = append(value, whereValues...) - sql := fmt.Sprintf("UPDATE `%s`.`%s` SET %s WHERE %s LIMIT 1;", schema, table, kvs, where) + sql := genUpdateSQL(fullname, updateColumns, whereColumns, whereValues) sqls = append(sqls, sql) values = append(values, value) keys = append(keys, ks) @@ -246,16 +239,15 @@ func genUpdateSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e func genDeleteSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, error) { var ( - schema = param.schema - table = param.table - dataSeq = param.originalData - columns = param.originalColumns - indexColumns = param.originalIndexColumns + fullname = dbutil.TableName(param.schema, param.table) + dataSeq = param.originalData + columns = param.originalColumns + indexColumns = param.originalIndexColumns + defaultIndexColumns = findFitIndex(indexColumns) + sqls = make([]string, 0, len(dataSeq)) + keys = make([][]string, 0, len(dataSeq)) + values = make([][]interface{}, 0, len(dataSeq)) ) - sqls := make([]string, 0, len(dataSeq)) - keys := make([][]string, 0, len(dataSeq)) - values := make([][]interface{}, 0, len(dataSeq)) - defaultIndexColumns := findFitIndex(indexColumns) for _, data := range dataSeq { if len(data) != len(columns) { @@ -267,9 +259,9 @@ func genDeleteSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e if len(defaultIndexColumns) == 0 { defaultIndexColumns = getAvailableIndexColumn(indexColumns, value) } - ks := genMultipleKeys(columns, value, indexColumns, dbutil.TableName(schema, table)) + ks := genMultipleKeys(value, indexColumns, fullname) - sql, value := genDeleteSQL(schema, table, value, columns, defaultIndexColumns) + sql, value := genDeleteSQL(fullname, value, columns, defaultIndexColumns) sqls = append(sqls, sql) values = append(values, value) keys = append(keys, ks) @@ -278,37 +270,73 @@ func genDeleteSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e return sqls, keys, values, nil } -func genDeleteSQL(schema string, table string, value []interface{}, columns []*column, indexColumns []*column) (string, []interface{}) { - whereColumns, whereValues := columns, value - if len(indexColumns) > 0 { - whereColumns, whereValues = getColumnData(columns, indexColumns, value) +// genInsertReplace generates a DML for `INSERT INTO` or `REPLCATE INTO`. +// the returned SQL with placeholders for `VALUES`. +func genInsertReplace(op, table string, columns []*column) string { + // NOTE: use sync.Pool to hold the builder if needed later. + var buf strings.Builder + buf.Grow(256) + buf.WriteString(op) + buf.WriteString(" " + table + " (") + for i, column := range columns { + if i != len(columns)-1 { + buf.WriteString("`" + column.name + "`,") + } else { + buf.WriteString("`" + column.name + "`)") + } } + buf.WriteString(" VALUES (") - where := genWhere(whereColumns, whereValues) - sql := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE %s LIMIT 1;", schema, table, where) - - return sql, whereValues + // placeholders + for i := range columns { + if i != len(columns)-1 { + buf.WriteString("?,") + } else { + buf.WriteString("?)") + } + } + return buf.String() } -func genColumnList(columns []*column) string { +// genUpdateSQL generates a `UPDATE` SQL with `SET` and `WHERE`. +func genUpdateSQL(table string, updateColumns, whereColumns []*column, whereValues []interface{}) string { var buf strings.Builder - for i, column := range columns { - if i != len(columns)-1 { - buf.WriteString("`" + column.name + "`,") + buf.Grow(2048) + buf.WriteString("UPDATE ") + buf.WriteString(table) + buf.WriteString(" SET ") + + for i, column := range updateColumns { + if i == len(updateColumns)-1 { + fmt.Fprintf(&buf, "`%s` = ?", column.name) // TODO: update `tidb-tools` to use `dbutil.ColumnName`. } else { - buf.WriteString("`" + column.name + "`") + fmt.Fprintf(&buf, "`%s` = ?, ", column.name) } } + buf.WriteString(" WHERE ") + genWhere(&buf, whereColumns, whereValues) + buf.WriteString(" LIMIT 1") return buf.String() } -func genColumnPlaceholders(length int) string { - values := make([]string, length) - for i := 0; i < length; i++ { - values[i] = "?" +// genDeleteSQL generates a `DELETE FROM` SQL with `WHERE`. +func genDeleteSQL(table string, value []interface{}, columns []*column, indexColumns []*column) (string, []interface{}) { + whereColumns, whereValues := columns, value + if len(indexColumns) > 0 { + whereColumns = indexColumns + whereValues = getColumnData(indexColumns, value) } - return strings.Join(values, ",") + + var buf strings.Builder + buf.Grow(1024) + buf.WriteString("DELETE FROM ") + buf.WriteString(table) + buf.WriteString(" WHERE ") + genWhere(&buf, whereColumns, whereValues) + buf.WriteString(" LIMIT 1") + + return buf.String(), whereValues } func castUnsigned(data interface{}, unsigned bool, tp string) interface{} { @@ -413,20 +441,21 @@ func findColumns(columns []*column, indexColumns map[string][]string) map[string return result } -func genKeyList(columns []*column, dataSeq []interface{}) string { - values := make([]string, 0, len(dataSeq)) +func genKeyList(table string, columns []*column, dataSeq []interface{}) string { + var buf strings.Builder + buf.WriteString(table) for i, data := range dataSeq { - values = append(values, columnValue(data, columns[i].unsigned, columns[i].tp)) + // for key, I think no need to add the `,` separator. + buf.WriteString(columnValue(data, columns[i].unsigned, columns[i].tp)) } - - return strings.Join(values, ",") + return buf.String() } -func genMultipleKeys(columns []*column, value []interface{}, indexColumns map[string][]*column, table string) []string { +func genMultipleKeys(value []interface{}, indexColumns map[string][]*column, table string) []string { multipleKeys := make([]string, 0, len(indexColumns)) for _, indexCols := range indexColumns { - cols, vals := getColumnData(columns, indexCols, value) - multipleKeys = append(multipleKeys, table+genKeyList(cols, vals)) + vals := getColumnData(indexCols, value) + multipleKeys = append(multipleKeys, genKeyList(table, indexCols, vals)) } return multipleKeys } @@ -479,19 +508,16 @@ func getSpecifiedIndexColumn(indexColumns map[string][]*column, fn func(col *col return nil } -func getColumnData(columns []*column, indexColumns []*column, data []interface{}) ([]*column, []interface{}) { - cols := make([]*column, 0, len(columns)) - values := make([]interface{}, 0, len(columns)) +func getColumnData(indexColumns []*column, data []interface{}) []interface{} { + values := make([]interface{}, 0, len(indexColumns)) for _, column := range indexColumns { - cols = append(cols, column) values = append(values, data[column.idx]) } - return cols, values + return values } -func genWhere(columns []*column, data []interface{}) string { - var kvs bytes.Buffer +func genWhere(w io.Writer, columns []*column, data []interface{}) { for i := range columns { kvSplit := "=" if data[i] == nil { @@ -499,26 +525,11 @@ func genWhere(columns []*column, data []interface{}) string { } if i == len(columns)-1 { - fmt.Fprintf(&kvs, "`%s` %s ?", columns[i].name, kvSplit) + fmt.Fprintf(w, "`%s` %s ?", columns[i].name, kvSplit) } else { - fmt.Fprintf(&kvs, "`%s` %s ? AND ", columns[i].name, kvSplit) + fmt.Fprintf(w, "`%s` %s ? AND ", columns[i].name, kvSplit) } } - - return kvs.String() -} - -func genKVs(columns []*column) string { - var kvs bytes.Buffer - for i := range columns { - if i == len(columns)-1 { - fmt.Fprintf(&kvs, "`%s` = ?", columns[i].name) - } else { - fmt.Fprintf(&kvs, "`%s` = ?, ", columns[i].name) - } - } - - return kvs.String() } func (s *Syncer) mappingDML(schema, table string, columns []string, data [][]interface{}) ([][]interface{}, error) { diff --git a/syncer/dml_test.go b/syncer/dml_test.go index ba365a9a92..4c52988f5d 100644 --- a/syncer/dml_test.go +++ b/syncer/dml_test.go @@ -45,32 +45,6 @@ func (s *testSyncerSuite) TestCastUnsigned(c *C) { } } -func (s *testSyncerSuite) TestGenColumnPlaceholders(c *C) { - placeholderStr := genColumnPlaceholders(1) - c.Assert(placeholderStr, Equals, "?") - - placeholderStr = genColumnPlaceholders(3) - c.Assert(placeholderStr, Equals, "?,?,?") -} - -func (s *testSyncerSuite) TestGenColumnList(c *C) { - columns := []*column{ - { - name: "a", - }, { - name: "b", - }, { - name: "c", - }, - } - - columnList := genColumnList(columns[:1]) - c.Assert(columnList, Equals, "`a`") - - columnList = genColumnList(columns) - c.Assert(columnList, Equals, "`a`,`b`,`c`") -} - func (s *testSyncerSuite) TestFindFitIndex(c *C) { pkColumns := []*column{ { diff --git a/syncer/metrics.go b/syncer/metrics.go index 4d9c1bb675..6268ee276e 100644 --- a/syncer/metrics.go +++ b/syncer/metrics.go @@ -34,7 +34,7 @@ var ( Subsystem: "syncer", Name: "read_binlog_duration", Help: "bucketed histogram of read time (s) for single binlog event from the relay log or master.", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) binlogEventSizeHistogram = prometheus.NewHistogramVec( @@ -52,7 +52,7 @@ var ( Subsystem: "syncer", Name: "binlog_transform_cost", Help: "cost of binlog event transform", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"type", "task"}) conflictDetectDurationHistogram = prometheus.NewHistogramVec( @@ -61,9 +61,29 @@ var ( Subsystem: "syncer", Name: "conflict_detect_duration", Help: "bucketed histogram of conflict detect time (s) for single DML statement", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) + addJobDurationHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dm", + Subsystem: "syncer", + Name: "add_job_duration", + Help: "bucketed histogram of add a job to the queue time (s)", + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), + }, []string{"type", "task", "queueNo"}) + + // dispatch/add multiple jobs for one binlog event. + // NOTE: only observe for DML now. + dispatchBinlogDurationHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dm", + Subsystem: "syncer", + Name: "dispatch_binlog_duration", + Help: "bucketed histogram of dispatch a binlog event time (s)", + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), + }, []string{"type", "task"}) + binlogSkippedEventsTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "dm", @@ -126,7 +146,7 @@ var ( Subsystem: "syncer", Name: "txn_duration_time", Help: "Bucketed histogram of processing time (s) of a txn.", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) queryHistogram = prometheus.NewHistogramVec( @@ -135,7 +155,7 @@ var ( Subsystem: "syncer", Name: "query_duration_time", Help: "Bucketed histogram of query time (s).", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) stmtHistogram = prometheus.NewHistogramVec( @@ -144,7 +164,7 @@ var ( Subsystem: "syncer", Name: "stmt_duration_time", Help: "Bucketed histogram of every statement query time (s).", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"type", "task"}) // FIXME: should I move it to dm-worker? @@ -205,6 +225,8 @@ func RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(binlogEventSizeHistogram) registry.MustRegister(binlogEvent) registry.MustRegister(conflictDetectDurationHistogram) + registry.MustRegister(addJobDurationHistogram) + registry.MustRegister(dispatchBinlogDurationHistogram) registry.MustRegister(binlogSkippedEventsTotal) registry.MustRegister(addedJobsTotal) registry.MustRegister(finishedJobsTotal) diff --git a/syncer/syncer.go b/syncer/syncer.go index a4fc437812..fc0d4ab64f 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -275,7 +275,7 @@ func (s *Syncer) newJobChans(count int) { s.closeJobChans() s.jobs = make([]chan *job, 0, count) for i := 0; i < count; i++ { - s.jobs = append(s.jobs, make(chan *job, 1000)) + s.jobs = append(s.jobs, make(chan *job, s.cfg.QueueSize)) } s.jobsClosed.Set(false) } @@ -757,7 +757,10 @@ func (s *Syncer) addJob(job *job) error { // ugly code addJob and sync, refine it later s.jobWg.Add(s.cfg.WorkerCount) for i := 0; i < s.cfg.WorkerCount; i++ { + startTime := time.Now() s.jobs[i] <- job + // flush for every DML queue + addJobDurationHistogram.WithLabelValues("flush", s.cfg.Name, s.queueBucketMapping[i]).Observe(time.Since(startTime).Seconds()) } s.jobWg.Wait() finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() @@ -767,7 +770,9 @@ func (s *Syncer) addJob(job *job) error { addedJobsTotal.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Inc() s.jobWg.Add(1) queueBucket = s.cfg.WorkerCount + startTime := time.Now() s.jobs[queueBucket] <- job + addJobDurationHistogram.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Observe(time.Since(startTime).Seconds()) if job.ddlExecItem != nil { execDDLReq = job.ddlExecItem.req } @@ -775,7 +780,9 @@ func (s *Syncer) addJob(job *job) error { s.jobWg.Add(1) queueBucket = int(utils.GenHashKey(job.key)) % s.cfg.WorkerCount s.addCount(false, s.queueBucketMapping[queueBucket], job.tp, 1) + startTime := time.Now() s.jobs[queueBucket] <- job + addJobDurationHistogram.WithLabelValues(job.tp.String(), s.cfg.Name, s.queueBucketMapping[queueBucket]).Observe(time.Since(startTime).Seconds()) } if s.tracer.Enable() { @@ -1545,6 +1552,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err *ec.traceID = traceEvent.Base.TraceID } + startTime := time.Now() for i := range sqls { var arg []interface{} var key []string @@ -1559,6 +1567,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err return err } } + dispatchBinlogDurationHistogram.WithLabelValues(ec.latestOp.String(), s.cfg.Name).Observe(time.Since(startTime).Seconds()) return nil } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index c631f8db18..9f86ac5c1c 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -842,15 +842,15 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { "insert into gctest_1.t_3(id, cfg) values (3, '{\"id\": 3}')", }, []string{ - "INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);", - "INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);", - "INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);", - "INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?);", - "INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?);", - "INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?);", - "INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?);", - "INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?);", - "INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?);", + "INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?)", + "INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?)", + "INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?)", + "INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?)", + "INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?)", + "INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?)", + "INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?)", + "INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?)", + "INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?)", }, [][]interface{}{ {int32(1), int32(18), "{}"}, @@ -876,14 +876,14 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { "update gctest_1.t_3 set cfg = '{\"id\": 12, \"old_id\": 2}' where gen_id = 2", }, []string{ - "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1;", - "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1;", - "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` IS ? AND `cfg_json` IS ? LIMIT 1;", - "UPDATE `gctest_1`.`t_2` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? LIMIT 1;", - "UPDATE `gctest_1`.`t_2` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? LIMIT 1;", - "UPDATE `gctest_1`.`t_2` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? LIMIT 1;", - "UPDATE `gctest_1`.`t_3` SET `id` = ?, `cfg` = ? WHERE `gen_id` = ? LIMIT 1;", - "UPDATE `gctest_1`.`t_3` SET `id` = ?, `cfg` = ? WHERE `gen_id` = ? LIMIT 1;", + "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1", + "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1", + "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` IS ? AND `cfg_json` IS ? LIMIT 1", + "UPDATE `gctest_1`.`t_2` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? LIMIT 1", + "UPDATE `gctest_1`.`t_2` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? LIMIT 1", + "UPDATE `gctest_1`.`t_2` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? LIMIT 1", + "UPDATE `gctest_1`.`t_3` SET `id` = ?, `cfg` = ? WHERE `gen_id` = ? LIMIT 1", + "UPDATE `gctest_1`.`t_3` SET `id` = ?, `cfg` = ? WHERE `gen_id` = ? LIMIT 1", }, [][]interface{}{ {int32(1), int32(21), "{\"a\": 12}", int32(1), int32(18), "{}", []uint8("{}")}, @@ -908,14 +908,14 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { "delete from gctest_1.t_3 where gen_id = 12", }, []string{ - "DELETE FROM `gctest_1`.`t_1` WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1;", - "DELETE FROM `gctest_1`.`t_1` WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1;", - "DELETE FROM `gctest_1`.`t_1` WHERE `id` = ? AND `age` = ? AND `cfg` IS ? AND `cfg_json` IS ? LIMIT 1;", - "DELETE FROM `gctest_1`.`t_2` WHERE `id` = ? LIMIT 1;", - "DELETE FROM `gctest_1`.`t_2` WHERE `id` = ? LIMIT 1;", - "DELETE FROM `gctest_1`.`t_2` WHERE `id` = ? LIMIT 1;", - "DELETE FROM `gctest_1`.`t_3` WHERE `gen_id` = ? LIMIT 1;", - "DELETE FROM `gctest_1`.`t_3` WHERE `gen_id` = ? LIMIT 1;", + "DELETE FROM `gctest_1`.`t_1` WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1", + "DELETE FROM `gctest_1`.`t_1` WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1", + "DELETE FROM `gctest_1`.`t_1` WHERE `id` = ? AND `age` = ? AND `cfg` IS ? AND `cfg_json` IS ? LIMIT 1", + "DELETE FROM `gctest_1`.`t_2` WHERE `id` = ? LIMIT 1", + "DELETE FROM `gctest_1`.`t_2` WHERE `id` = ? LIMIT 1", + "DELETE FROM `gctest_1`.`t_2` WHERE `id` = ? LIMIT 1", + "DELETE FROM `gctest_1`.`t_3` WHERE `gen_id` = ? LIMIT 1", + "DELETE FROM `gctest_1`.`t_3` WHERE `gen_id` = ? LIMIT 1", }, [][]interface{}{ {int32(1), int32(21), "{\"a\": 12}", []uint8("{\"a\":12}")}, @@ -1036,6 +1036,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) { s.cfg.WorkerCount = 1 syncer := NewSyncer(s.cfg) syncer.jobs = []chan *job{make(chan *job, 1)} + syncer.queueBucketMapping = []string{"queue_0", adminQueueName} wg.Add(1) go func() { @@ -1474,7 +1475,7 @@ func (s *testSyncerSuite) TestRun(c *C) { nil, }, { insert, - "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?);", + "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", []interface{}{int64(580981944116838401), "a"}, }, { ddl, @@ -1482,11 +1483,11 @@ func (s *testSyncerSuite) TestRun(c *C) { nil, }, { insert, - "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?);", + "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", []interface{}{int64(580981944116838402), "b"}, }, { del, - "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1;", + "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", []interface{}{int64(580981944116838401)}, }, { flush, @@ -1495,12 +1496,12 @@ func (s *testSyncerSuite) TestRun(c *C) { }, { // in first 5 minutes, safe mode is true, will split update to delete + replace update, - "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1;", + "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", []interface{}{int64(580981944116838402)}, }, { // in first 5 minutes, , safe mode is true, will split update to delete + replace update, - "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?);", + "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", []interface{}{int64(580981944116838401), "b"}, }, } @@ -1556,11 +1557,11 @@ func (s *testSyncerSuite) TestRun(c *C) { expectJobs2 := []*expectJob{ { insert, - "REPLACE INTO `test_1`.`t_2` (`id`,`name`) VALUES (?,?);", + "REPLACE INTO `test_1`.`t_2` (`id`,`name`) VALUES (?,?)", []interface{}{int32(3), "c"}, }, { del, - "DELETE FROM `test_1`.`t_2` WHERE `id` = ? LIMIT 1;", + "DELETE FROM `test_1`.`t_2` WHERE `id` = ? LIMIT 1", []interface{}{int32(3)}, }, }