From a803a6f519440ba6f2e6f6d849cfe767081bce57 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Fri, 10 Apr 2020 18:30:28 +0800 Subject: [PATCH 01/14] *: add metric for addJob; increase job queue length and make it configurable; make flush checkpoint interval configurable --- dm/config/task.go | 17 +++++++++++++---- syncer/checkpoint.go | 4 +--- syncer/metrics.go | 10 ++++++++++ syncer/syncer.go | 9 ++++++++- 4 files changed, 32 insertions(+), 8 deletions(-) diff --git a/dm/config/task.go b/dm/config/task.go index cfa867fdcb..9de462c1e5 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -55,8 +55,10 @@ var ( defaultPoolSize = 16 defaultDir = "./dumped_data" // SyncerConfig - defaultWorkerCount = 16 - defaultBatch = 100 + defaultWorkerCount = 16 + defaultBatch = 100 + defaultQueueSize = 5120 + defaultCheckpointFlushInterval = 30 ) // Meta represents binlog's meta pos @@ -193,6 +195,11 @@ type SyncerConfig struct { MetaFile string `yaml:"meta-file" toml:"meta-file" json:"meta-file"` // meta filename, used only when load SubConfig directly WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"` Batch int `yaml:"batch" toml:"batch" json:"batch"` + QueueSize int `yaml:"queue-size" toml:"queue-size" json:"queue-size"` + + // checkpoint flush interval in seconds. + CheckpointFlushInterval int `yaml:"checkpoint-flush-interval" toml:"checkpoint-flush-interval" json:"checkpoint-flush-interval"` + // deprecated MaxRetry int `yaml:"max-retry" toml:"max-retry" json:"max-retry"` @@ -206,8 +213,10 @@ type SyncerConfig struct { func defaultSyncerConfig() SyncerConfig { return SyncerConfig{ - WorkerCount: defaultWorkerCount, - Batch: defaultBatch, + WorkerCount: defaultWorkerCount, + Batch: defaultBatch, + QueueSize: defaultQueueSize, + CheckpointFlushInterval: defaultCheckpointFlushInterval, } } diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 031108a650..d4efd7b4da 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -46,8 +46,6 @@ var ( globalCpTable = "" // global checkpoint's cp_table maxCheckPointTimeout = "1m" minCheckpoint = mysql.Position{Pos: 4} - - maxCheckPointSaveTime = 30 * time.Second ) // NOTE: now we sync from relay log, so not add GTID support yet @@ -453,7 +451,7 @@ func (cp *RemoteCheckPoint) String() string { func (cp *RemoteCheckPoint) CheckGlobalPoint() bool { cp.RLock() defer cp.RUnlock() - return time.Since(cp.globalPointSaveTime) >= maxCheckPointSaveTime + return time.Since(cp.globalPointSaveTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second } // Rollback implements CheckPoint.Rollback diff --git a/syncer/metrics.go b/syncer/metrics.go index 4d9c1bb675..d3c109043a 100644 --- a/syncer/metrics.go +++ b/syncer/metrics.go @@ -64,6 +64,15 @@ var ( Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), }, []string{"task"}) + addJobDurationHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dm", + Subsystem: "syncer", + Name: "add_job_duration", + Help: "bucketed histogram of add a job to the queue time (s)", + Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + }, []string{"type", "task", "queueNo"}) + binlogSkippedEventsTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "dm", @@ -205,6 +214,7 @@ func RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(binlogEventSizeHistogram) registry.MustRegister(binlogEvent) registry.MustRegister(conflictDetectDurationHistogram) + registry.MustRegister(addJobDurationHistogram) registry.MustRegister(binlogSkippedEventsTotal) registry.MustRegister(addedJobsTotal) registry.MustRegister(finishedJobsTotal) diff --git a/syncer/syncer.go b/syncer/syncer.go index a4fc437812..cebb977e45 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -275,7 +275,7 @@ func (s *Syncer) newJobChans(count int) { s.closeJobChans() s.jobs = make([]chan *job, 0, count) for i := 0; i < count; i++ { - s.jobs = append(s.jobs, make(chan *job, 1000)) + s.jobs = append(s.jobs, make(chan *job, s.cfg.QueueSize)) } s.jobsClosed.Set(false) } @@ -757,7 +757,10 @@ func (s *Syncer) addJob(job *job) error { // ugly code addJob and sync, refine it later s.jobWg.Add(s.cfg.WorkerCount) for i := 0; i < s.cfg.WorkerCount; i++ { + startTime := time.Now() s.jobs[i] <- job + // flush for every DML queue + addJobDurationHistogram.WithLabelValues("flush", s.cfg.Name, s.queueBucketMapping[i]).Observe(time.Since(startTime).Seconds()) } s.jobWg.Wait() finishedJobsTotal.WithLabelValues("flush", s.cfg.Name, adminQueueName).Inc() @@ -767,7 +770,9 @@ func (s *Syncer) addJob(job *job) error { addedJobsTotal.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Inc() s.jobWg.Add(1) queueBucket = s.cfg.WorkerCount + startTime := time.Now() s.jobs[queueBucket] <- job + addJobDurationHistogram.WithLabelValues("ddl", s.cfg.Name, adminQueueName).Observe(time.Since(startTime).Seconds()) if job.ddlExecItem != nil { execDDLReq = job.ddlExecItem.req } @@ -775,7 +780,9 @@ func (s *Syncer) addJob(job *job) error { s.jobWg.Add(1) queueBucket = int(utils.GenHashKey(job.key)) % s.cfg.WorkerCount s.addCount(false, s.queueBucketMapping[queueBucket], job.tp, 1) + startTime := time.Now() s.jobs[queueBucket] <- job + addJobDurationHistogram.WithLabelValues(job.tp.String(), s.cfg.Name, s.queueBucketMapping[queueBucket]).Observe(time.Since(startTime).Seconds()) } if s.tracer.Enable() { From 7de3a37454277726178c6b3dc2b1958d8973ef6a Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Sat, 11 Apr 2020 09:06:11 +0800 Subject: [PATCH 02/14] *: copy dashboard from #591 --- dm/dm-ansible/scripts/dm.json | 332 ++++++++++++++++++++++++++++------ 1 file changed, 273 insertions(+), 59 deletions(-) diff --git a/dm/dm-ansible/scripts/dm.json b/dm/dm-ansible/scripts/dm.json index 09fde73be1..ebc369842a 100644 --- a/dm/dm-ansible/scripts/dm.json +++ b/dm/dm-ansible/scripts/dm.json @@ -451,14 +451,18 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The number of binlog files in Syncer that are behind the master", + "description": "The number of binlog files in binlog replication unit that are behind the master", "fill": 1, "id": 51, "legend": { + "alignAsTable": false, "avg": false, "current": false, + "hideEmpty": false, + "hideZero": false, "max": false, "min": false, + "rightSide": false, "show": false, "total": false, "values": false @@ -1190,11 +1194,13 @@ "fill": 1, "id": 5, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -1213,10 +1219,18 @@ "steppedLine": false, "targets": [ { - "expr": "dm_relay_binlog_file{instance=\"$instance\"}", + "expr": "dm_relay_binlog_file{instance=\"$instance\", node=\"master\"}", "format": "time_series", "intervalFactor": 2, + "legendFormat": "master", "refId": "A" + }, + { + "expr": "dm_relay_binlog_file{instance=\"$instance\", node=\"relay\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "relay", + "refId": "B" } ], "thresholds": [], @@ -1345,11 +1359,13 @@ "fill": 1, "id": 6, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -1368,10 +1384,18 @@ "steppedLine": false, "targets": [ { - "expr": "dm_relay_binlog_pos{instance=\"$instance\"}", + "expr": "dm_relay_binlog_pos{instance=\"$instance\", node=\"master\"}", "format": "time_series", "intervalFactor": 2, + "legendFormat": "master", "refId": "A" + }, + { + "expr": "dm_relay_binlog_pos{instance=\"$instance\", node=\"relay\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "relay", + "refId": "B" } ], "thresholds": [], @@ -1418,15 +1442,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The duration that the relay log reads binlog from the upstream MySQL (in seconds)", + "description": "The duration that the relay log reads binlog event from the upstream MySQL (in seconds)", "fill": 1, "id": 9, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -1449,13 +1475,28 @@ "format": "time_series", "interval": "", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_relay_read_binlog_duration_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_relay_read_binlog_duration_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "read binlog duration", + "title": "read binlog event duration", "tooltip": { "shared": true, "sort": 0, @@ -1486,7 +1527,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -1496,15 +1537,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The duration that the relay log writes binlog into the disks each time (in seconds)", + "description": "The duration that the relay log writes binlog event into the disks each time (in seconds)", "fill": 1, "id": 12, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -1526,7 +1569,22 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_relay_write_duration_bucket{instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_relay_write_duration_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_relay_write_duration_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], @@ -1563,7 +1621,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -1577,11 +1635,13 @@ "fill": 1, "id": 25, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -1603,13 +1663,28 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_relay_write_size_bucket{instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_relay_write_size_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_relay_write_size_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "binlog size", + "title": "binlog event size", "tooltip": { "shared": true, "sort": 0, @@ -1640,7 +1715,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] } @@ -2122,11 +2197,13 @@ "fill": 1, "id": 18, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -2148,13 +2225,28 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_loader_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_loader_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_loader_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "latency of execute transaction", + "title": "transaction execution latency", "tooltip": { "shared": true, "sort": 0, @@ -2184,7 +2276,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -2194,15 +2286,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The duration that Loader executes a query (in seconds)", + "description": "The time it takes loader to execute every statement to the downstream (in seconds)", "fill": 1, "id": 34, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -2221,16 +2315,38 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(dm_loader_query_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.90, sum(rate(dm_loader_stmt_duration_time_bucket{task=\"$task\", type=\"begin\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "begin", "refId": "A" + }, + { + "expr": "histogram_quantile(0.90, sum(rate(dm_loader_stmt_duration_time_bucket{task=\"$task\", type=\"stmt\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "stmt", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.90, sum(rate(dm_loader_stmt_duration_time_bucket{task=\"$task\", type=\"commit\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "commit", + "refId": "C" + }, + { + "expr": "histogram_quantile(0.90, sum(rate(dm_loader_stmt_duration_time_bucket{task=\"$task\", type=\"rollback\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "rollback", + "refId": "D" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "latency of query", + "title": "statement execution latency - 90", "tooltip": { "shared": true, "sort": 0, @@ -2260,7 +2376,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] } @@ -2510,7 +2626,7 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The number of binlog files in Syncer that are behind the master", + "description": "The number of binlog files in binlog replication unit that are behind the master", "fill": 1, "id": 43, "legend": { @@ -2588,7 +2704,7 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The number of binlog files in Syncer that are behind relay", + "description": "The number of binlog files in binlog replication unit that are behind relay", "fill": 1, "id": 44, "legend": { @@ -2702,7 +2818,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "binlog event qps", + "title": "binlog event QPS", "tooltip": { "shared": true, "sort": 0, @@ -2732,7 +2848,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -2778,7 +2894,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "skipped binlog event qps", + "title": "skipped binlog event QPS", "tooltip": { "shared": true, "sort": 0, @@ -2808,7 +2924,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -2818,15 +2934,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The duration that the binlog replication unit reads binlog from the relay log or upstream MySQL (in seconds)", + "description": "The duration that the binlog replication unit reads binlog event from the relay log or upstream MySQL (in seconds)", "fill": 1, "id": 53, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -2848,13 +2966,29 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_read_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_read_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "hide": false, + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_read_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "read binlog duration", + "title": "read binlog event duration", "tooltip": { "shared": true, "sort": 0, @@ -2885,7 +3019,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -2895,15 +3029,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The time it takes Syncer to parse and transform the binlog into SQL statements (in seconds)", + "description": "The time it takes binlog replication unit to parse and transform the binlog into SQL statements (in seconds)", "fill": 1, "id": 36, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -2925,13 +3061,28 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_binlog_transform_cost_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_binlog_transform_cost_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_binlog_transform_cost_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "cost of binlog event transform", + "title": "transform binlog event duration", "tooltip": { "shared": true, "sort": 0, @@ -2962,7 +3113,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -2976,11 +3127,13 @@ "fill": 1, "id": 54, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -3002,13 +3155,28 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "cost of conflict detect", + "title": "DML conflict detect duration", "tooltip": { "shared": true, "sort": 0, @@ -3039,7 +3207,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3053,11 +3221,13 @@ "fill": 1, "id": 1, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -3079,7 +3249,22 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_txn_duration_time_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], @@ -3116,7 +3301,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3126,15 +3311,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The size of a single binlog event that the binlog replication reads from relay log or upstream master", + "description": "The size of a single binlog event that the binlog replication unit reads from relay log or upstream master", "fill": 1, "id": 55, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -3156,7 +3343,22 @@ "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_binlog_event_size_bucket{instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, + "legendFormat": "90", "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_binlog_event_size_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_binlog_event_size_bucket{instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" } ], "thresholds": [], @@ -3193,7 +3395,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3207,11 +3409,13 @@ "fill": 1, "id": 56, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -3230,9 +3434,10 @@ "steppedLine": false, "targets": [ { - "expr": "rate(dm_syncer_query_duration_time_count{task=\"$task\", instance=\"$instance\"}[1m])", + "expr": "dm_syncer_queue_size{task=\"$task\", instance=\"$instance\"}", "format": "time_series", "intervalFactor": 2, + "legendFormat": "{{queueNo}}", "refId": "A" } ], @@ -3256,6 +3461,7 @@ }, "yaxes": [ { + "decimals": 0, "format": "short", "label": null, "logBase": 1, @@ -3269,7 +3475,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3283,11 +3489,13 @@ "fill": 1, "id": 3, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -3309,13 +3517,14 @@ "expr": "rate(dm_syncer_added_jobs_total{task=\"$task\", instance=\"$instance\"}[1m])", "format": "time_series", "intervalFactor": 2, + "legendFormat": "{{queueNo}}-{{type}}", "refId": "A" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "total sql jobs", + "title": "total SQL jobs", "tooltip": { "shared": true, "sort": 0, @@ -3345,7 +3554,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3359,11 +3568,13 @@ "fill": 1, "id": 2, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -3385,13 +3596,14 @@ "expr": "rate(dm_syncer_finished_jobs_total{task=\"$task\", instance=\"$instance\"}[1m])", "format": "time_series", "intervalFactor": 2, + "legendFormat": "{{queueNo}}-{{type}}", "refId": "A" } ], "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "finished sql jobs", + "title": "finished SQL jobs", "tooltip": { "shared": true, "sort": 0, @@ -3421,7 +3633,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3435,11 +3647,13 @@ "fill": 1, "id": 57, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -3489,7 +3703,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "statement execution latency", + "title": "statement execution latency - 90", "tooltip": { "shared": true, "sort": 0, @@ -3520,7 +3734,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3598,7 +3812,7 @@ "logBase": 1, "max": null, "min": null, - "show": true + "show": false } ] }, @@ -3767,5 +3981,5 @@ }, "timezone": "", "title": "DM-task", - "version": 29 + "version": 30 } \ No newline at end of file From a47d9bbfe783adff2f42aed77ccee69b2510dbd0 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Sat, 11 Apr 2020 09:24:14 +0800 Subject: [PATCH 03/14] *: revert checkpoint flush interval; fix UT --- dm/config/task.go | 17 ++++++----------- syncer/checkpoint.go | 4 +++- syncer/syncer_test.go | 1 + 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/dm/config/task.go b/dm/config/task.go index 9de462c1e5..ab16bdf103 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -55,10 +55,9 @@ var ( defaultPoolSize = 16 defaultDir = "./dumped_data" // SyncerConfig - defaultWorkerCount = 16 - defaultBatch = 100 - defaultQueueSize = 5120 - defaultCheckpointFlushInterval = 30 + defaultWorkerCount = 16 + defaultBatch = 100 + defaultQueueSize = 5120 ) // Meta represents binlog's meta pos @@ -197,9 +196,6 @@ type SyncerConfig struct { Batch int `yaml:"batch" toml:"batch" json:"batch"` QueueSize int `yaml:"queue-size" toml:"queue-size" json:"queue-size"` - // checkpoint flush interval in seconds. - CheckpointFlushInterval int `yaml:"checkpoint-flush-interval" toml:"checkpoint-flush-interval" json:"checkpoint-flush-interval"` - // deprecated MaxRetry int `yaml:"max-retry" toml:"max-retry" json:"max-retry"` @@ -213,10 +209,9 @@ type SyncerConfig struct { func defaultSyncerConfig() SyncerConfig { return SyncerConfig{ - WorkerCount: defaultWorkerCount, - Batch: defaultBatch, - QueueSize: defaultQueueSize, - CheckpointFlushInterval: defaultCheckpointFlushInterval, + WorkerCount: defaultWorkerCount, + Batch: defaultBatch, + QueueSize: defaultQueueSize, } } diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index d4efd7b4da..031108a650 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -46,6 +46,8 @@ var ( globalCpTable = "" // global checkpoint's cp_table maxCheckPointTimeout = "1m" minCheckpoint = mysql.Position{Pos: 4} + + maxCheckPointSaveTime = 30 * time.Second ) // NOTE: now we sync from relay log, so not add GTID support yet @@ -451,7 +453,7 @@ func (cp *RemoteCheckPoint) String() string { func (cp *RemoteCheckPoint) CheckGlobalPoint() bool { cp.RLock() defer cp.RUnlock() - return time.Since(cp.globalPointSaveTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second + return time.Since(cp.globalPointSaveTime) >= maxCheckPointSaveTime } // Rollback implements CheckPoint.Rollback diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index c631f8db18..c8dfb08932 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1036,6 +1036,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) { s.cfg.WorkerCount = 1 syncer := NewSyncer(s.cfg) syncer.jobs = []chan *job{make(chan *job, 1)} + syncer.queueBucketMapping = []string{"queue_0", adminQueueName} wg.Add(1) go func() { From 26b7e397e2dc580b468956e5a652a20072c2a66c Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Sat, 11 Apr 2020 12:08:08 +0800 Subject: [PATCH 04/14] *: add plane for add job duration --- dm/dm-ansible/scripts/dm.json | 102 ++++++++++++++++++++++++++++++++-- 1 file changed, 98 insertions(+), 4 deletions(-) diff --git a/dm/dm-ansible/scripts/dm.json b/dm/dm-ansible/scripts/dm.json index ebc369842a..a8bd513ae0 100644 --- a/dm/dm-ansible/scripts/dm.json +++ b/dm/dm-ansible/scripts/dm.json @@ -3667,7 +3667,7 @@ "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "span": 4, + "span": 3, "stack": false, "steppedLine": false, "targets": [ @@ -3738,6 +3738,100 @@ } ] }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The time it takes binlog replication unit to add a job to the queue (in seconds)", + "fill": 1, + "id": 58, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 3, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_add_job_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "90", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_add_job_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_add_job_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "add job duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 2, + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": 0, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ] + }, { "aliasColors": {}, "bars": false, @@ -3766,7 +3860,7 @@ "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "span": 4, + "span": 3, "stack": false, "steppedLine": false, "targets": [ @@ -3844,7 +3938,7 @@ "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "span": 4, + "span": 3, "stack": false, "steppedLine": false, "targets": [ @@ -3981,5 +4075,5 @@ }, "timezone": "", "title": "DM-task", - "version": 30 + "version": 31 } \ No newline at end of file From 1b68cd3e0ae2bd082313eaf7d57bbf6f0f81659d Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Sat, 11 Apr 2020 15:45:34 +0800 Subject: [PATCH 05/14] syncer: use `strings.Builder` to replace `fmt.Sprintf`; 470 -> 457 --- syncer/dml.go | 188 ++++++++++++++++++++++++--------------------- syncer/dml_test.go | 26 ------- 2 files changed, 99 insertions(+), 115 deletions(-) diff --git a/syncer/dml.go b/syncer/dml.go index e8dcc60ce8..f9bd047594 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -14,9 +14,9 @@ package syncer import ( - "bytes" "encoding/binary" "fmt" + "io" "strconv" "strings" @@ -102,42 +102,35 @@ func extractValueFromData(data []interface{}, columns []*column) []interface{} { func genInsertSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, error) { var ( - schema = param.schema - table = param.table + fullname = dbutil.TableName(param.schema, param.table) dataSeq = param.data originalDataSeq = param.originalData columns = param.columns originalColumns = param.originalColumns originalIndexColumns = param.originalIndexColumns + sqls = make([]string, 0, len(dataSeq)) + keys = make([][]string, 0, len(dataSeq)) + values = make([][]interface{}, 0, len(dataSeq)) ) - sqls := make([]string, 0, len(dataSeq)) - keys := make([][]string, 0, len(dataSeq)) - values := make([][]interface{}, 0, len(dataSeq)) - columnList := genColumnList(columns) - columnPlaceholders := genColumnPlaceholders(len(columns)) + + var insertOrReplace = "INSERT INTO" + if param.safeMode { + insertOrReplace = "REPLACE INTO" + } + sql := genInsertReplace(insertOrReplace, fullname, columns) + for dataIdx, data := range dataSeq { if len(data) != len(columns) { return nil, nil, nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(columns), len(data)) } value := extractValueFromData(data, columns) - originalData := originalDataSeq[dataIdx] - var originalValue []interface{} - if len(columns) == len(originalColumns) { - originalValue = value - } else { - originalValue = extractValueFromData(originalData, originalColumns) + var originalValue = value + if len(columns) != len(originalColumns) { + originalValue = extractValueFromData(originalDataSeq[dataIdx], originalColumns) } - var insertOrReplace string - if param.safeMode { - insertOrReplace = "REPLACE" - } else { - insertOrReplace = "INSERT" - } - - sql := fmt.Sprintf("%s INTO `%s`.`%s` (%s) VALUES (%s);", insertOrReplace, schema, table, columnList, columnPlaceholders) - ks := genMultipleKeys(originalColumns, originalValue, originalIndexColumns, dbutil.TableName(schema, table)) + ks := genMultipleKeys(originalColumns, originalValue, originalIndexColumns, fullname) sqls = append(sqls, sql) values = append(values, value) keys = append(keys, ks) @@ -148,21 +141,22 @@ func genInsertSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e func genUpdateSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, error) { var ( - schema = param.schema - table = param.table - safeMode = param.safeMode + fullname = dbutil.TableName(param.schema, param.table) data = param.data originalData = param.originalData columns = param.columns originalColumns = param.originalColumns originalIndexColumns = param.originalIndexColumns + defaultIndexColumns = findFitIndex(originalIndexColumns) + replaceSQL string // `REPLACE INTO` SQL + sqls = make([]string, 0, len(data)/2) + keys = make([][]string, 0, len(data)/2) + values = make([][]interface{}, 0, len(data)/2) ) - sqls := make([]string, 0, len(data)/2) - keys := make([][]string, 0, len(data)/2) - values := make([][]interface{}, 0, len(data)/2) - columnList := genColumnList(columns) - columnPlaceholders := genColumnPlaceholders(len(columns)) - defaultIndexColumns := findFitIndex(originalIndexColumns) + + if param.safeMode { + replaceSQL = genInsertReplace("REPLACE INTO", fullname, columns) + } for i := 0; i < len(data); i += 2 { oldData := data[i] @@ -194,23 +188,23 @@ func genUpdateSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e defaultIndexColumns = getAvailableIndexColumn(originalIndexColumns, oriOldValues) } - ks := genMultipleKeys(originalColumns, oriOldValues, originalIndexColumns, dbutil.TableName(schema, table)) - ks = append(ks, genMultipleKeys(originalColumns, oriChangedValues, originalIndexColumns, dbutil.TableName(schema, table))...) + ks := genMultipleKeys(originalColumns, oriOldValues, originalIndexColumns, fullname) + ks = append(ks, genMultipleKeys(originalColumns, oriChangedValues, originalIndexColumns, fullname)...) - if safeMode { + if param.safeMode { // generate delete sql from old data - sql, value := genDeleteSQL(schema, table, oriOldValues, originalColumns, defaultIndexColumns) + sql, value := genDeleteSQL(fullname, oriOldValues, originalColumns, defaultIndexColumns) sqls = append(sqls, sql) values = append(values, value) keys = append(keys, ks) // generate replace sql from new data - sql = fmt.Sprintf("REPLACE INTO `%s`.`%s` (%s) VALUES (%s);", schema, table, columnList, columnPlaceholders) - sqls = append(sqls, sql) + sqls = append(sqls, replaceSQL) values = append(values, changedValues) keys = append(keys, ks) continue } + // NOTE: move these variables outer of `for` if needed (to reuse). updateColumns := make([]*column, 0, len(defaultIndexColumns)) updateValues := make([]interface{}, 0, len(defaultIndexColumns)) for j := range oldValues { @@ -224,7 +218,6 @@ func genUpdateSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e } value := make([]interface{}, 0, len(oldData)) - kvs := genKVs(updateColumns) value = append(value, updateValues...) whereColumns, whereValues := originalColumns, oriOldValues @@ -232,10 +225,9 @@ func genUpdateSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e whereColumns, whereValues = getColumnData(originalColumns, defaultIndexColumns, oriOldValues) } - where := genWhere(whereColumns, whereValues) value = append(value, whereValues...) - sql := fmt.Sprintf("UPDATE `%s`.`%s` SET %s WHERE %s LIMIT 1;", schema, table, kvs, where) + sql := genUpdateSQL(fullname, updateColumns, whereColumns, whereValues) sqls = append(sqls, sql) values = append(values, value) keys = append(keys, ks) @@ -246,16 +238,15 @@ func genUpdateSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e func genDeleteSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, error) { var ( - schema = param.schema - table = param.table - dataSeq = param.originalData - columns = param.originalColumns - indexColumns = param.originalIndexColumns + fullname = dbutil.TableName(param.schema, param.table) + dataSeq = param.originalData + columns = param.originalColumns + indexColumns = param.originalIndexColumns + defaultIndexColumns = findFitIndex(indexColumns) + sqls = make([]string, 0, len(dataSeq)) + keys = make([][]string, 0, len(dataSeq)) + values = make([][]interface{}, 0, len(dataSeq)) ) - sqls := make([]string, 0, len(dataSeq)) - keys := make([][]string, 0, len(dataSeq)) - values := make([][]interface{}, 0, len(dataSeq)) - defaultIndexColumns := findFitIndex(indexColumns) for _, data := range dataSeq { if len(data) != len(columns) { @@ -267,9 +258,9 @@ func genDeleteSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e if len(defaultIndexColumns) == 0 { defaultIndexColumns = getAvailableIndexColumn(indexColumns, value) } - ks := genMultipleKeys(columns, value, indexColumns, dbutil.TableName(schema, table)) + ks := genMultipleKeys(columns, value, indexColumns, fullname) - sql, value := genDeleteSQL(schema, table, value, columns, defaultIndexColumns) + sql, value := genDeleteSQL(fullname, value, columns, defaultIndexColumns) sqls = append(sqls, sql) values = append(values, value) keys = append(keys, ks) @@ -278,20 +269,14 @@ func genDeleteSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e return sqls, keys, values, nil } -func genDeleteSQL(schema string, table string, value []interface{}, columns []*column, indexColumns []*column) (string, []interface{}) { - whereColumns, whereValues := columns, value - if len(indexColumns) > 0 { - whereColumns, whereValues = getColumnData(columns, indexColumns, value) - } - - where := genWhere(whereColumns, whereValues) - sql := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE %s LIMIT 1;", schema, table, where) - - return sql, whereValues -} - -func genColumnList(columns []*column) string { +// genInsertReplace generates a DML for `INSERT INTO` or `REPLCATE INTO`. +// the returned SQL with placeholders for `VALUES`. +func genInsertReplace(op, table string, columns []*column) string { + // NOTE: use sync.Pool to hold the builder if needed later. var buf strings.Builder + buf.Grow(256) + buf.WriteString(op) + buf.WriteString(" " + table + " ") for i, column := range columns { if i != len(columns)-1 { buf.WriteString("`" + column.name + "`,") @@ -299,16 +284,57 @@ func genColumnList(columns []*column) string { buf.WriteString("`" + column.name + "`") } } + buf.WriteString(" VALUES ") + // placeholders + for i := range columns { + if i != len(columns)-1 { + buf.WriteString("?,") + } else { + buf.WriteString("?") + } + } return buf.String() } -func genColumnPlaceholders(length int) string { - values := make([]string, length) - for i := 0; i < length; i++ { - values[i] = "?" +// genUpdateSQL generates a `UPDATE` SQL with `SET` and `WHERE`. +func genUpdateSQL(table string, updateColumns, whereColumns []*column, whereValues []interface{}) string { + var buf strings.Builder + buf.Grow(2048) + buf.WriteString("UPDATE ") + buf.WriteString(table) + buf.WriteString(" SET ") + + for i, column := range updateColumns { + if i == len(updateColumns)-1 { + fmt.Fprintf(&buf, "`%s` = ?", column.name) // TODO: update `tidb-tools` to use `dbutil.ColumnName`. + } else { + fmt.Fprintf(&buf, "`%s` = ?, ", column.name) + } } - return strings.Join(values, ",") + + buf.WriteString(" WHERE ") + genWhere(&buf, whereColumns, whereValues) + buf.WriteString(" LIMIT 1") + return buf.String() +} + +// genDeleteSQL generates a `DELETE FROM` SQL with `WHERE`. +func genDeleteSQL(table string, value []interface{}, columns []*column, indexColumns []*column) (string, []interface{}) { + whereColumns, whereValues := columns, value + if len(indexColumns) > 0 { + whereColumns, whereValues = getColumnData(columns, indexColumns, value) + } + + var buf strings.Builder + buf.Grow(1024) + buf.WriteString("DELETE FROM ") + buf.WriteString(table) + buf.WriteString(" WHERE ") + genWhere(&buf, whereColumns, whereValues) + buf.WriteString(" LIMIT 1") + + return buf.String(), whereValues } func castUnsigned(data interface{}, unsigned bool, tp string) interface{} { @@ -490,8 +516,7 @@ func getColumnData(columns []*column, indexColumns []*column, data []interface{} return cols, values } -func genWhere(columns []*column, data []interface{}) string { - var kvs bytes.Buffer +func genWhere(w io.Writer, columns []*column, data []interface{}) { for i := range columns { kvSplit := "=" if data[i] == nil { @@ -499,26 +524,11 @@ func genWhere(columns []*column, data []interface{}) string { } if i == len(columns)-1 { - fmt.Fprintf(&kvs, "`%s` %s ?", columns[i].name, kvSplit) - } else { - fmt.Fprintf(&kvs, "`%s` %s ? AND ", columns[i].name, kvSplit) - } - } - - return kvs.String() -} - -func genKVs(columns []*column) string { - var kvs bytes.Buffer - for i := range columns { - if i == len(columns)-1 { - fmt.Fprintf(&kvs, "`%s` = ?", columns[i].name) + fmt.Fprintf(w, "`%s` %s ?", columns[i].name, kvSplit) } else { - fmt.Fprintf(&kvs, "`%s` = ?, ", columns[i].name) + fmt.Fprintf(w, "`%s` %s ? AND ", columns[i].name, kvSplit) } } - - return kvs.String() } func (s *Syncer) mappingDML(schema, table string, columns []string, data [][]interface{}) ([][]interface{}, error) { diff --git a/syncer/dml_test.go b/syncer/dml_test.go index ba365a9a92..4c52988f5d 100644 --- a/syncer/dml_test.go +++ b/syncer/dml_test.go @@ -45,32 +45,6 @@ func (s *testSyncerSuite) TestCastUnsigned(c *C) { } } -func (s *testSyncerSuite) TestGenColumnPlaceholders(c *C) { - placeholderStr := genColumnPlaceholders(1) - c.Assert(placeholderStr, Equals, "?") - - placeholderStr = genColumnPlaceholders(3) - c.Assert(placeholderStr, Equals, "?,?,?") -} - -func (s *testSyncerSuite) TestGenColumnList(c *C) { - columns := []*column{ - { - name: "a", - }, { - name: "b", - }, { - name: "c", - }, - } - - columnList := genColumnList(columns[:1]) - c.Assert(columnList, Equals, "`a`") - - columnList = genColumnList(columns) - c.Assert(columnList, Equals, "`a`,`b`,`c`") -} - func (s *testSyncerSuite) TestFindFitIndex(c *C) { pkColumns := []*column{ { From 5fdeb8930046ae351498e75900626f17f23ac7e8 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Sat, 11 Apr 2020 15:58:34 +0800 Subject: [PATCH 06/14] syncer: fix `genInsertReplace` --- syncer/dml.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/syncer/dml.go b/syncer/dml.go index f9bd047594..0f577c3cf8 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -276,22 +276,22 @@ func genInsertReplace(op, table string, columns []*column) string { var buf strings.Builder buf.Grow(256) buf.WriteString(op) - buf.WriteString(" " + table + " ") + buf.WriteString(" " + table + " (") for i, column := range columns { if i != len(columns)-1 { buf.WriteString("`" + column.name + "`,") } else { - buf.WriteString("`" + column.name + "`") + buf.WriteString("`" + column.name + "`)") } } - buf.WriteString(" VALUES ") + buf.WriteString(" VALUES (") // placeholders for i := range columns { if i != len(columns)-1 { buf.WriteString("?,") } else { - buf.WriteString("?") + buf.WriteString("?)") } } return buf.String() From 2e85c5fe769af05edda866d74ebf5904fb13c63f Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Sat, 11 Apr 2020 16:12:56 +0800 Subject: [PATCH 07/14] syncer: fix UT --- syncer/syncer_test.go | 64 +++++++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index c8dfb08932..9f86ac5c1c 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -842,15 +842,15 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { "insert into gctest_1.t_3(id, cfg) values (3, '{\"id\": 3}')", }, []string{ - "INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);", - "INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);", - "INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);", - "INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?);", - "INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?);", - "INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?);", - "INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?);", - "INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?);", - "INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?);", + "INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?)", + "INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?)", + "INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?)", + "INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?)", + "INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?)", + "INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?)", + "INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?)", + "INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?)", + "INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?)", }, [][]interface{}{ {int32(1), int32(18), "{}"}, @@ -876,14 +876,14 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { "update gctest_1.t_3 set cfg = '{\"id\": 12, \"old_id\": 2}' where gen_id = 2", }, []string{ - "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1;", - "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1;", - "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` IS ? AND `cfg_json` IS ? LIMIT 1;", - "UPDATE `gctest_1`.`t_2` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? LIMIT 1;", - "UPDATE `gctest_1`.`t_2` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? LIMIT 1;", - "UPDATE `gctest_1`.`t_2` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? LIMIT 1;", - "UPDATE `gctest_1`.`t_3` SET `id` = ?, `cfg` = ? WHERE `gen_id` = ? LIMIT 1;", - "UPDATE `gctest_1`.`t_3` SET `id` = ?, `cfg` = ? WHERE `gen_id` = ? LIMIT 1;", + "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1", + "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1", + "UPDATE `gctest_1`.`t_1` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? AND `age` = ? AND `cfg` IS ? AND `cfg_json` IS ? LIMIT 1", + "UPDATE `gctest_1`.`t_2` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? LIMIT 1", + "UPDATE `gctest_1`.`t_2` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? LIMIT 1", + "UPDATE `gctest_1`.`t_2` SET `id` = ?, `age` = ?, `cfg` = ? WHERE `id` = ? LIMIT 1", + "UPDATE `gctest_1`.`t_3` SET `id` = ?, `cfg` = ? WHERE `gen_id` = ? LIMIT 1", + "UPDATE `gctest_1`.`t_3` SET `id` = ?, `cfg` = ? WHERE `gen_id` = ? LIMIT 1", }, [][]interface{}{ {int32(1), int32(21), "{\"a\": 12}", int32(1), int32(18), "{}", []uint8("{}")}, @@ -908,14 +908,14 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { "delete from gctest_1.t_3 where gen_id = 12", }, []string{ - "DELETE FROM `gctest_1`.`t_1` WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1;", - "DELETE FROM `gctest_1`.`t_1` WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1;", - "DELETE FROM `gctest_1`.`t_1` WHERE `id` = ? AND `age` = ? AND `cfg` IS ? AND `cfg_json` IS ? LIMIT 1;", - "DELETE FROM `gctest_1`.`t_2` WHERE `id` = ? LIMIT 1;", - "DELETE FROM `gctest_1`.`t_2` WHERE `id` = ? LIMIT 1;", - "DELETE FROM `gctest_1`.`t_2` WHERE `id` = ? LIMIT 1;", - "DELETE FROM `gctest_1`.`t_3` WHERE `gen_id` = ? LIMIT 1;", - "DELETE FROM `gctest_1`.`t_3` WHERE `gen_id` = ? LIMIT 1;", + "DELETE FROM `gctest_1`.`t_1` WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1", + "DELETE FROM `gctest_1`.`t_1` WHERE `id` = ? AND `age` = ? AND `cfg` = ? AND `cfg_json` = ? LIMIT 1", + "DELETE FROM `gctest_1`.`t_1` WHERE `id` = ? AND `age` = ? AND `cfg` IS ? AND `cfg_json` IS ? LIMIT 1", + "DELETE FROM `gctest_1`.`t_2` WHERE `id` = ? LIMIT 1", + "DELETE FROM `gctest_1`.`t_2` WHERE `id` = ? LIMIT 1", + "DELETE FROM `gctest_1`.`t_2` WHERE `id` = ? LIMIT 1", + "DELETE FROM `gctest_1`.`t_3` WHERE `gen_id` = ? LIMIT 1", + "DELETE FROM `gctest_1`.`t_3` WHERE `gen_id` = ? LIMIT 1", }, [][]interface{}{ {int32(1), int32(21), "{\"a\": 12}", []uint8("{\"a\":12}")}, @@ -1475,7 +1475,7 @@ func (s *testSyncerSuite) TestRun(c *C) { nil, }, { insert, - "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?);", + "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", []interface{}{int64(580981944116838401), "a"}, }, { ddl, @@ -1483,11 +1483,11 @@ func (s *testSyncerSuite) TestRun(c *C) { nil, }, { insert, - "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?);", + "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", []interface{}{int64(580981944116838402), "b"}, }, { del, - "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1;", + "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", []interface{}{int64(580981944116838401)}, }, { flush, @@ -1496,12 +1496,12 @@ func (s *testSyncerSuite) TestRun(c *C) { }, { // in first 5 minutes, safe mode is true, will split update to delete + replace update, - "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1;", + "DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", []interface{}{int64(580981944116838402)}, }, { // in first 5 minutes, , safe mode is true, will split update to delete + replace update, - "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?);", + "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", []interface{}{int64(580981944116838401), "b"}, }, } @@ -1557,11 +1557,11 @@ func (s *testSyncerSuite) TestRun(c *C) { expectJobs2 := []*expectJob{ { insert, - "REPLACE INTO `test_1`.`t_2` (`id`,`name`) VALUES (?,?);", + "REPLACE INTO `test_1`.`t_2` (`id`,`name`) VALUES (?,?)", []interface{}{int32(3), "c"}, }, { del, - "DELETE FROM `test_1`.`t_2` WHERE `id` = ? LIMIT 1;", + "DELETE FROM `test_1`.`t_2` WHERE `id` = ? LIMIT 1", []interface{}{int32(3)}, }, } From ff3eb9ccfa70964a8ebb825129170b1e862e0770 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Sat, 11 Apr 2020 16:46:19 +0800 Subject: [PATCH 08/14] syncer: do not create new column name for `getColumnData`; 457 -> 453 --- syncer/dml.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/syncer/dml.go b/syncer/dml.go index 0f577c3cf8..33e7b66e2e 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -130,7 +130,7 @@ func genInsertSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e originalValue = extractValueFromData(originalDataSeq[dataIdx], originalColumns) } - ks := genMultipleKeys(originalColumns, originalValue, originalIndexColumns, fullname) + ks := genMultipleKeys(originalValue, originalIndexColumns, fullname) sqls = append(sqls, sql) values = append(values, value) keys = append(keys, ks) @@ -188,8 +188,8 @@ func genUpdateSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e defaultIndexColumns = getAvailableIndexColumn(originalIndexColumns, oriOldValues) } - ks := genMultipleKeys(originalColumns, oriOldValues, originalIndexColumns, fullname) - ks = append(ks, genMultipleKeys(originalColumns, oriChangedValues, originalIndexColumns, fullname)...) + ks := genMultipleKeys(oriOldValues, originalIndexColumns, fullname) + ks = append(ks, genMultipleKeys(oriChangedValues, originalIndexColumns, fullname)...) if param.safeMode { // generate delete sql from old data @@ -222,7 +222,8 @@ func genUpdateSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e whereColumns, whereValues := originalColumns, oriOldValues if len(defaultIndexColumns) > 0 { - whereColumns, whereValues = getColumnData(originalColumns, defaultIndexColumns, oriOldValues) + whereColumns = defaultIndexColumns + whereValues = getColumnData(defaultIndexColumns, oriOldValues) } value = append(value, whereValues...) @@ -258,7 +259,7 @@ func genDeleteSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e if len(defaultIndexColumns) == 0 { defaultIndexColumns = getAvailableIndexColumn(indexColumns, value) } - ks := genMultipleKeys(columns, value, indexColumns, fullname) + ks := genMultipleKeys(value, indexColumns, fullname) sql, value := genDeleteSQL(fullname, value, columns, defaultIndexColumns) sqls = append(sqls, sql) @@ -323,7 +324,8 @@ func genUpdateSQL(table string, updateColumns, whereColumns []*column, whereValu func genDeleteSQL(table string, value []interface{}, columns []*column, indexColumns []*column) (string, []interface{}) { whereColumns, whereValues := columns, value if len(indexColumns) > 0 { - whereColumns, whereValues = getColumnData(columns, indexColumns, value) + whereColumns = indexColumns + whereValues = getColumnData(indexColumns, value) } var buf strings.Builder @@ -448,11 +450,11 @@ func genKeyList(columns []*column, dataSeq []interface{}) string { return strings.Join(values, ",") } -func genMultipleKeys(columns []*column, value []interface{}, indexColumns map[string][]*column, table string) []string { +func genMultipleKeys(value []interface{}, indexColumns map[string][]*column, table string) []string { multipleKeys := make([]string, 0, len(indexColumns)) for _, indexCols := range indexColumns { - cols, vals := getColumnData(columns, indexCols, value) - multipleKeys = append(multipleKeys, table+genKeyList(cols, vals)) + vals := getColumnData(indexCols, value) + multipleKeys = append(multipleKeys, table+genKeyList(indexCols, vals)) } return multipleKeys } @@ -505,15 +507,13 @@ func getSpecifiedIndexColumn(indexColumns map[string][]*column, fn func(col *col return nil } -func getColumnData(columns []*column, indexColumns []*column, data []interface{}) ([]*column, []interface{}) { - cols := make([]*column, 0, len(columns)) - values := make([]interface{}, 0, len(columns)) +func getColumnData(indexColumns []*column, data []interface{}) []interface{} { + values := make([]interface{}, 0, len(indexColumns)) for _, column := range indexColumns { - cols = append(cols, column) values = append(values, data[column.idx]) } - return cols, values + return values } func genWhere(w io.Writer, columns []*column, data []interface{}) { From 1be19f998eb1f86aaa235c8eb6bd50ef181c3b75 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Sat, 11 Apr 2020 18:52:14 +0800 Subject: [PATCH 09/14] syncer: use `strings.Builder` for `genKeyList`; 453 --- syncer/dml.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/syncer/dml.go b/syncer/dml.go index 33e7b66e2e..99667e1659 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -441,20 +441,21 @@ func findColumns(columns []*column, indexColumns map[string][]string) map[string return result } -func genKeyList(columns []*column, dataSeq []interface{}) string { - values := make([]string, 0, len(dataSeq)) +func genKeyList(table string, columns []*column, dataSeq []interface{}) string { + var buf strings.Builder + buf.WriteString(table) for i, data := range dataSeq { - values = append(values, columnValue(data, columns[i].unsigned, columns[i].tp)) + // for key, I think no need to add the `,` separator. + buf.WriteString(columnValue(data, columns[i].unsigned, columns[i].tp)) } - - return strings.Join(values, ",") + return buf.String() } func genMultipleKeys(value []interface{}, indexColumns map[string][]*column, table string) []string { multipleKeys := make([]string, 0, len(indexColumns)) for _, indexCols := range indexColumns { vals := getColumnData(indexCols, value) - multipleKeys = append(multipleKeys, table+genKeyList(indexCols, vals)) + multipleKeys = append(multipleKeys, genKeyList(table, indexCols, vals)) } return multipleKeys } From 107ef78d3d8a73308ec22635d73e492d227ad12e Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Sat, 11 Apr 2020 23:15:58 +0800 Subject: [PATCH 10/14] *: make first upper bound lower (fix metrics) --- loader/metrics.go | 6 +++--- syncer/metrics.go | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/loader/metrics.go b/loader/metrics.go index e95071756c..959e6fd5d8 100644 --- a/loader/metrics.go +++ b/loader/metrics.go @@ -33,7 +33,7 @@ var ( Subsystem: "loader", Name: "query_duration_time", Help: "Bucketed histogram of query time (s) of a txn.", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) txnHistogram = prometheus.NewHistogramVec( @@ -42,7 +42,7 @@ var ( Subsystem: "loader", Name: "txn_duration_time", Help: "Bucketed histogram of processing time (s) of a txn.", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) stmtHistogram = prometheus.NewHistogramVec( @@ -51,7 +51,7 @@ var ( Subsystem: "loader", Name: "stmt_duration_time", Help: "Bucketed histogram of every statement query time (s).", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"type", "task"}) dataFileGauge = prometheus.NewGaugeVec( diff --git a/syncer/metrics.go b/syncer/metrics.go index d3c109043a..2579ff4a69 100644 --- a/syncer/metrics.go +++ b/syncer/metrics.go @@ -34,7 +34,7 @@ var ( Subsystem: "syncer", Name: "read_binlog_duration", Help: "bucketed histogram of read time (s) for single binlog event from the relay log or master.", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) binlogEventSizeHistogram = prometheus.NewHistogramVec( @@ -52,7 +52,7 @@ var ( Subsystem: "syncer", Name: "binlog_transform_cost", Help: "cost of binlog event transform", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"type", "task"}) conflictDetectDurationHistogram = prometheus.NewHistogramVec( @@ -61,7 +61,7 @@ var ( Subsystem: "syncer", Name: "conflict_detect_duration", Help: "bucketed histogram of conflict detect time (s) for single DML statement", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) addJobDurationHistogram = prometheus.NewHistogramVec( @@ -70,7 +70,7 @@ var ( Subsystem: "syncer", Name: "add_job_duration", Help: "bucketed histogram of add a job to the queue time (s)", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"type", "task", "queueNo"}) binlogSkippedEventsTotal = prometheus.NewCounterVec( @@ -135,7 +135,7 @@ var ( Subsystem: "syncer", Name: "txn_duration_time", Help: "Bucketed histogram of processing time (s) of a txn.", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) queryHistogram = prometheus.NewHistogramVec( @@ -144,7 +144,7 @@ var ( Subsystem: "syncer", Name: "query_duration_time", Help: "Bucketed histogram of query time (s).", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"task"}) stmtHistogram = prometheus.NewHistogramVec( @@ -153,7 +153,7 @@ var ( Subsystem: "syncer", Name: "stmt_duration_time", Help: "Bucketed histogram of every statement query time (s).", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"type", "task"}) // FIXME: should I move it to dm-worker? From 42c931daa212348dabff8513b85b0f36c1fd2c53 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Sun, 12 Apr 2020 08:13:16 +0800 Subject: [PATCH 11/14] *: update relay metrics bucket --- relay/metrics.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/relay/metrics.go b/relay/metrics.go index 34c173ed22..1009164c7a 100644 --- a/relay/metrics.go +++ b/relay/metrics.go @@ -83,7 +83,7 @@ var ( Subsystem: "relay", Name: "write_duration", Help: "bucketed histogram of write time (s) of single relay log event", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }) // should alert @@ -110,7 +110,7 @@ var ( Subsystem: "relay", Name: "read_binlog_duration", Help: "bucketed histogram of read time (s) of single binlog event from the master.", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }) binlogTransformDurationHistogram = prometheus.NewHistogram( @@ -119,7 +119,7 @@ var ( Subsystem: "relay", Name: "read_transform_duration", Help: "bucketed histogram of transform time (s) of single binlog event.", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }) // should alert From 272aa311d296ab4e9d69c2dd96428672d9e597d6 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Sun, 12 Apr 2020 08:55:04 +0800 Subject: [PATCH 12/14] syncer: add a metric for dispatch a binlog event --- syncer/metrics.go | 12 ++++++++++++ syncer/syncer.go | 2 ++ 2 files changed, 14 insertions(+) diff --git a/syncer/metrics.go b/syncer/metrics.go index 2579ff4a69..6268ee276e 100644 --- a/syncer/metrics.go +++ b/syncer/metrics.go @@ -73,6 +73,17 @@ var ( Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), }, []string{"type", "task", "queueNo"}) + // dispatch/add multiple jobs for one binlog event. + // NOTE: only observe for DML now. + dispatchBinlogDurationHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "dm", + Subsystem: "syncer", + Name: "dispatch_binlog_duration", + Help: "bucketed histogram of dispatch a binlog event time (s)", + Buckets: prometheus.ExponentialBuckets(0.000005, 2, 25), + }, []string{"type", "task"}) + binlogSkippedEventsTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "dm", @@ -215,6 +226,7 @@ func RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(binlogEvent) registry.MustRegister(conflictDetectDurationHistogram) registry.MustRegister(addJobDurationHistogram) + registry.MustRegister(dispatchBinlogDurationHistogram) registry.MustRegister(binlogSkippedEventsTotal) registry.MustRegister(addedJobsTotal) registry.MustRegister(finishedJobsTotal) diff --git a/syncer/syncer.go b/syncer/syncer.go index cebb977e45..fc0d4ab64f 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1552,6 +1552,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err *ec.traceID = traceEvent.Base.TraceID } + startTime := time.Now() for i := range sqls { var arg []interface{} var key []string @@ -1566,6 +1567,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err return err } } + dispatchBinlogDurationHistogram.WithLabelValues(ec.latestOp.String(), s.cfg.Name).Observe(time.Since(startTime).Seconds()) return nil } From 4645ee6a2f51482abdc0ea167d07feb749ce124f Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Sun, 12 Apr 2020 09:19:19 +0800 Subject: [PATCH 13/14] *: update Grafana dashboard --- dm/dm-ansible/scripts/dm.json | 114 +++++++++++++++++++++++++++++++--- 1 file changed, 104 insertions(+), 10 deletions(-) diff --git a/dm/dm-ansible/scripts/dm.json b/dm/dm-ansible/scripts/dm.json index a8bd513ae0..8642e0822a 100644 --- a/dm/dm-ansible/scripts/dm.json +++ b/dm/dm-ansible/scripts/dm.json @@ -3123,9 +3123,9 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The time it takes binlog replication unit to detect conflicts between DMLs (in seconds)", + "description": "The time it takes binlog replication unit to dispatch a binlog event (in seconds)", "fill": 1, - "id": 54, + "id": 59, "legend": { "alignAsTable": true, "avg": false, @@ -3152,21 +3152,21 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_dispatch_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "90", "refId": "A" }, { - "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_dispatch_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "95", "refId": "B" }, { - "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_dispatch_binlog_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", "format": "time_series", "intervalFactor": 2, "legendFormat": "99", @@ -3176,7 +3176,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "DML conflict detect duration", + "title": "dispatch binlog event duration", "tooltip": { "shared": true, "sort": 0, @@ -3667,7 +3667,7 @@ "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "span": 3, + "span": 4, "stack": false, "steppedLine": false, "targets": [ @@ -3768,7 +3768,7 @@ "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "span": 3, + "span": 4, "stack": false, "steppedLine": false, "targets": [ @@ -3832,6 +3832,100 @@ } ] }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The time it takes binlog replication unit to detect conflicts between DMLs (in seconds)", + "fill": 1, + "id": 54, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "span": 4, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.90, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "90", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "95", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(dm_syncer_conflict_detect_duration_bucket{task=\"$task\", instance=\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "99", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeShift": null, + "title": "DML conflict detect duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "transparent": true, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 2, + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": 0, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ] + }, { "aliasColors": {}, "bars": false, @@ -3860,7 +3954,7 @@ "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "span": 3, + "span": 6, "stack": false, "steppedLine": false, "targets": [ @@ -3938,7 +4032,7 @@ "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "span": 3, + "span": 6, "stack": false, "steppedLine": false, "targets": [ From 05e021e27f699a84c4c068bfae09f92901bb2ad9 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Sun, 12 Apr 2020 10:47:33 +0800 Subject: [PATCH 14/14] *: update Grafana dashboard for QPS --- dm/dm-ansible/scripts/dm.json | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/dm/dm-ansible/scripts/dm.json b/dm/dm-ansible/scripts/dm.json index 8642e0822a..dd6f696c68 100644 --- a/dm/dm-ansible/scripts/dm.json +++ b/dm/dm-ansible/scripts/dm.json @@ -2786,11 +2786,13 @@ "fill": 1, "id": 4, "legend": { + "alignAsTable": true, "avg": false, "current": false, "max": false, "min": false, - "show": false, + "rightSide": true, + "show": true, "total": false, "values": false }, @@ -2809,10 +2811,32 @@ "steppedLine": false, "targets": [ { - "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", instance=\"$instance\"}[1m])", + "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", instance=\"$instance\", type=\"write_rows\"}[1m])", "format": "time_series", "intervalFactor": 2, + "legendFormat": "insert", "refId": "A" + }, + { + "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", instance=\"$instance\", type=\"update_rows\"}[1m])", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "update", + "refId": "B" + }, + { + "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", instance=\"$instance\", type=\"delete_rows\"}[1m])", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "delete", + "refId": "C" + }, + { + "expr": "rate(dm_syncer_binlog_transform_cost_count{task=\"$task\", instance=\"$instance\", type=\"query\"}[1m])", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "query", + "refId": "D" } ], "thresholds": [],