From 3be1df92477155b12808328c325e05a536340f16 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 14 Jul 2021 10:35:32 +0800 Subject: [PATCH 1/7] owner, metrics: add changefeed status in metrics and grafana (#2267) --- cdc/model/changefeed.go | 20 ++ cdc/owner.go | 7 +- cdc/owner/metrics.go | 8 + cdc/owner/owner.go | 4 + metrics/grafana/ticdc.json | 478 ++++++++++++++++++++++--------------- 5 files changed, 318 insertions(+), 199 deletions(-) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index d8bcb48230b..31b39e7f983 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -52,6 +52,26 @@ const ( StateFinished FeedState = "finished" ) +// ToInt return a int for each `FeedState`, only use this for metrics. +func (s FeedState) ToInt() int { + switch s { + case StateNormal: + return 0 + case StateError: + return 1 + case StateFailed: + return 2 + case StateStopped: + return 3 + case StateFinished: + return 4 + case StateRemoved: + return 5 + } + // -1 for unknown feed state + return -1 +} + const ( // errorHistoryGCInterval represents how long we keep error record in changefeed info errorHistoryGCInterval = time.Minute * 10 diff --git a/cdc/owner.go b/cdc/owner.go index 9df35d17c18..f4e546a3ac5 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -52,7 +52,7 @@ type ownership struct { tickTime time.Duration } -func newOwnersip(tickTime time.Duration) ownership { +func newOwnership(tickTime time.Duration) ownership { minTickTime := 5 * time.Second if tickTime > minTickTime { log.Panic("ownership counter must be incearsed every 5 seconds") @@ -1110,7 +1110,6 @@ func (o *Owner) handleAdminJob(ctx context.Context) error { if err != nil { return errors.Trace(err) } - err = o.dispatchJob(ctx, job) if err != nil { return errors.Trace(err) @@ -1276,7 +1275,7 @@ func (o *Owner) Run(ctx context.Context, tickTime time.Duration) error { defer feedChangeReceiver.Stop() o.watchFeedChange(ctx1) - ownership := newOwnersip(tickTime) + ownership := newOwnership(tickTime) loop: for { select { @@ -1578,7 +1577,7 @@ func (o *Owner) watchCapture(ctx context.Context) error { failpoint.Inject("sleep-before-watch-capture", nil) // When an owner just starts, changefeed information is not updated at once. - // Supposing a crased capture should be removed now, the owner will miss deleting + // Supposing a crashed capture should be removed now, the owner will miss deleting // task status and task position if changefeed information is not loaded. // If the task positions and status decode failed, remove them. if err := o.checkAndCleanTasksInfo(ctx); err != nil { diff --git a/cdc/owner/metrics.go b/cdc/owner/metrics.go index 0b1ae01d765..4aeb513649b 100644 --- a/cdc/owner/metrics.go +++ b/cdc/owner/metrics.go @@ -44,6 +44,13 @@ var ( Name: "maintain_table_num", Help: "number of replicated tables maintained in owner", }, []string{"changefeed", "capture", "type"}) + changefeedStatusGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "owner", + Name: "status", + Help: "The status of changefeeds", + }, []string{"changefeed"}) ) const ( @@ -59,4 +66,5 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(changefeedCheckpointTsLagGauge) registry.MustRegister(ownershipCounter) registry.MustRegister(ownerMaintainTableNumGauge) + registry.MustRegister(changefeedStatusGauge) } diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 18b2f51904a..ead1f8d907b 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -235,6 +235,7 @@ func (o *Owner) updateMetrics(state *model.GlobalReactorState) { o.lastTickTime = now ownerMaintainTableNumGauge.Reset() + changefeedStatusGauge.Reset() for changefeedID, changefeedState := range state.Changefeeds { for captureID, captureInfo := range state.Captures { taskStatus, exist := changefeedState.TaskStatuses[captureID] @@ -243,6 +244,9 @@ func (o *Owner) updateMetrics(state *model.GlobalReactorState) { } ownerMaintainTableNumGauge.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeTotal).Set(float64(len(taskStatus.Tables))) ownerMaintainTableNumGauge.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeWip).Set(float64(len(taskStatus.Operation))) + if changefeedState.Info != nil { + changefeedStatusGauge.WithLabelValues(changefeedID).Set(float64(changefeedState.Info.State.ToInt())) + } } } } diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 51e30e23c23..41823f955f2 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -125,7 +125,7 @@ "gnetId": null, "graphTooltip": 1, "id": null, - "iteration": 1625328316840, + "iteration": 1626152035486, "links": [], "panels": [ { @@ -1709,6 +1709,202 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The status of each changefeed.\n\n0: Normal\n\n1: Error\n\n2: Failed\n\n3: Stopped\n\n4: Finished\n\n-1: Unknown", + "fill": 1, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 26 + }, + "id": 163, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 1, + "points": true, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "ticdc_owner_status{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}", + "format": "time_series", + "instant": false, + "intervalFactor": 1, + "legendFormat": "{{changefeed}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "The status of changefeeds", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "Percentiles of sink write duration of changefeeds", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 26 + }, + "hiddenSeries": false, + "id": 35, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "paceLength": 10, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.95, sum(rate(ticdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "{{instance}}-p95", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(ticdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "{{instance}}-p99", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.999, sum(rate(ticdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "{{instance}}-p999", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Sink write duration percentile", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 2, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "cards": { "cardPadding": 0, @@ -1730,7 +1926,7 @@ "h": 7, "w": 12, "x": 0, - "y": 26 + "y": 33 }, "heatmap": {}, "hideZeroBuckets": true, @@ -1794,17 +1990,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "Percentiles of sink write duration of changefeeds", + "description": "Percentiles of sink batch size", "fill": 1, "fillGradient": 0, "gridPos": { "h": 7, "w": 12, "x": 12, - "y": 26 + "y": 33 }, "hiddenSeries": false, - "id": 35, + "id": 36, "legend": { "alignAsTable": true, "avg": false, @@ -1834,24 +2030,25 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.95, sum(rate(ticdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "expr": "histogram_quantile(0.90, sum(rate(ticdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (le,capture))", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{instance}}-p95", + "legendFormat": "{{capture}}-p90", "refId": "A" }, { - "expr": "histogram_quantile(0.99, sum(rate(ticdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "expr": "histogram_quantile(0.99, sum(rate(ticdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (le,capture))", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{instance}}-p99", + "legendFormat": "{{capture}}-p99", "refId": "B" }, { - "expr": "histogram_quantile(0.999, sum(rate(ticdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance))", + "expr": "histogram_quantile(0.999, sum(rate(ticdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (le,capture))", "format": "time_series", + "hide": true, "intervalFactor": 1, - "legendFormat": "{{instance}}-p999", + "legendFormat": "{{capture}}-p999", "refId": "C" } ], @@ -1859,7 +2056,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Sink write duration percentile", + "title": "Sink write batch size percentile", "tooltip": { "shared": true, "sort": 0, @@ -1875,7 +2072,7 @@ }, "yaxes": [ { - "format": "s", + "format": "none", "label": null, "logBase": 2, "max": null, @@ -1909,7 +2106,7 @@ "h": 7, "w": 12, "x": 0, - "y": 33 + "y": 40 }, "hiddenSeries": false, "id": 34, @@ -2005,17 +2202,17 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "Percentiles of sink batch size", + "description": "Percentiles of asynchronous flush sink duration of changefeeds", "fill": 1, "fillGradient": 0, "gridPos": { "h": 7, "w": 12, "x": 12, - "y": 33 + "y": 40 }, "hiddenSeries": false, - "id": 36, + "id": 98, "legend": { "alignAsTable": true, "avg": false, @@ -2045,25 +2242,24 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.90, sum(rate(ticdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (le,capture))", + "expr": "histogram_quantile(0.95, sum(rate(ticdc_processor_flush_event_duration_seconds_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{capture}}-p90", + "legendFormat": "{{instance}}-{{type}}-p95", "refId": "A" }, { - "expr": "histogram_quantile(0.99, sum(rate(ticdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (le,capture))", + "expr": "histogram_quantile(0.99, sum(rate(ticdc_sink_flush_event_duration_seconds_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", "format": "time_series", "intervalFactor": 1, - "legendFormat": "{{capture}}-p99", + "legendFormat": "{{instance}}-{{type}}-p99", "refId": "B" }, { - "expr": "histogram_quantile(0.999, sum(rate(ticdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (le,capture))", + "expr": "histogram_quantile(0.999, sum(rate(ticdc_sink_flush_event_duration_seconds_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", "format": "time_series", - "hide": true, "intervalFactor": 1, - "legendFormat": "{{capture}}-p999", + "legendFormat": "{{instance}}-{{type}}-p999", "refId": "C" } ], @@ -2071,7 +2267,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Sink write batch size percentile", + "title": "Flush sink duration percentile", "tooltip": { "shared": true, "sort": 0, @@ -2087,7 +2283,7 @@ }, "yaxes": [ { - "format": "none", + "format": "s", "label": null, "logBase": 2, "max": null, @@ -2129,7 +2325,7 @@ "h": 7, "w": 12, "x": 0, - "y": 40 + "y": 47 }, "heatmap": {}, "hideZeroBuckets": true, @@ -2191,25 +2387,27 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "Percentiles of asynchronous flush sink duration of changefeeds", + "description": "Distribution of MySQL worker loads", "fill": 1, "fillGradient": 0, "gridPos": { "h": 7, "w": 12, "x": 12, - "y": 40 + "y": 47 }, "hiddenSeries": false, - "id": 98, + "id": 95, "legend": { "alignAsTable": true, - "avg": false, + "avg": true, "current": true, - "max": false, + "max": true, "min": false, - "rightSide": true, + "rightSide": false, "show": true, + "sort": "current", + "sortDesc": true, "total": false, "values": true }, @@ -2220,46 +2418,77 @@ "options": { "dataLinks": [] }, - "paceLength": 10, "percentage": false, "pointradius": 2, "points": false, "renderer": "flot", "seriesOverrides": [], "spaceLength": 10, - "stack": false, + "stack": true, "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.95, sum(rate(ticdc_processor_flush_event_duration_seconds_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", + "expr": "sum(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (capture,bucket)", "format": "time_series", + "hide": true, + "interval": "", "intervalFactor": 1, - "legendFormat": "{{instance}}-{{type}}-p95", + "legendFormat": "{{capture}}-{{bucket}}", "refId": "A" }, { - "expr": "histogram_quantile(0.99, sum(rate(ticdc_sink_flush_event_duration_seconds_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", + "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) >= 0)", "format": "time_series", + "hide": true, + "interval": "", "intervalFactor": 1, - "legendFormat": "{{instance}}-{{type}}-p99", + "legendFormat": "total worker", "refId": "B" }, { - "expr": "histogram_quantile(0.999, sum(rate(ticdc_sink_flush_event_duration_seconds_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (le,instance,type))", + "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) <= 2)", "format": "time_series", + "interval": "", "intervalFactor": 1, - "legendFormat": "{{instance}}-{{type}}-p999", + "legendFormat": "0-2 row/s worker", "refId": "C" + }, + { + "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) > 2 and rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) <= 10)", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "2-10 row/s worker", + "refId": "D" + }, + { + "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) > 10 and rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) <= 100)", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "10-100 row/s worker", + "refId": "E" + }, + { + "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) > 100)", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": ">100 row/s worker", + "refId": "F" } ], "thresholds": [], "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Flush sink duration percentile", + "title": "MySQL sink worker load", "tooltip": { "shared": true, - "sort": 0, + "sort": 2, "value_type": "individual" }, "type": "graph", @@ -2272,9 +2501,9 @@ }, "yaxes": [ { - "format": "s", + "format": "short", "label": null, - "logBase": 2, + "logBase": 1, "max": null, "min": null, "show": true @@ -2312,9 +2541,9 @@ "description": "The duration of detecting and waiting conflict of MySQL sink", "gridPos": { "h": 7, - "w": 8, + "w": 12, "x": 0, - "y": 47 + "y": 54 }, "heatmap": {}, "hideZeroBuckets": true, @@ -2382,9 +2611,9 @@ "fillGradient": 0, "gridPos": { "h": 7, - "w": 8, - "x": 8, - "y": 47 + "w": 12, + "x": 12, + "y": 54 }, "hiddenSeries": false, "id": 83, @@ -2481,147 +2710,6 @@ "alignLevel": null } }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_TEST-CLUSTER}", - "description": "Distribution of MySQL worker loads", - "fill": 1, - "fillGradient": 0, - "gridPos": { - "h": 7, - "w": 8, - "x": 16, - "y": 47 - }, - "hiddenSeries": false, - "id": 95, - "legend": { - "alignAsTable": true, - "avg": true, - "current": true, - "max": true, - "min": false, - "rightSide": false, - "show": true, - "sort": "current", - "sortDesc": true, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "options": { - "dataLinks": [] - }, - "percentage": false, - "pointradius": 2, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": true, - "steppedLine": false, - "targets": [ - { - "expr": "sum(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (capture,bucket)", - "format": "time_series", - "hide": true, - "interval": "", - "intervalFactor": 1, - "legendFormat": "{{capture}}-{{bucket}}", - "refId": "A" - }, - { - "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) >= 0)", - "format": "time_series", - "hide": true, - "interval": "", - "intervalFactor": 1, - "legendFormat": "total worker", - "refId": "B" - }, - { - "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) <= 2)", - "format": "time_series", - "interval": "", - "intervalFactor": 1, - "legendFormat": "0-2 row/s worker", - "refId": "C" - }, - { - "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) > 2 and rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) <= 10)", - "format": "time_series", - "hide": false, - "interval": "", - "intervalFactor": 1, - "legendFormat": "2-10 row/s worker", - "refId": "D" - }, - { - "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) > 10 and rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) <= 100)", - "format": "time_series", - "hide": false, - "interval": "", - "intervalFactor": 1, - "legendFormat": "10-100 row/s worker", - "refId": "E" - }, - { - "expr": "count(rate(ticdc_sink_bucket_size{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m]) > 100)", - "format": "time_series", - "hide": false, - "interval": "", - "intervalFactor": 1, - "legendFormat": ">100 row/s worker", - "refId": "F" - } - ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "MySQL sink worker load", - "tooltip": { - "shared": true, - "sort": 2, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, { "aliasColors": {}, "bars": false, @@ -2633,7 +2721,7 @@ "h": 8, "w": 12, "x": 0, - "y": 54 + "y": 61 }, "id": 149, "legend": { @@ -2714,7 +2802,7 @@ "h": 8, "w": 12, "x": 12, - "y": 54 + "y": 61 }, "id": 151, "links": [], @@ -7783,5 +7871,5 @@ "timezone": "browser", "title": "Test-Cluster-TiCDC", "uid": "YiGL8hBZ1", - "version": 17 -} \ No newline at end of file + "version": 18 +} From e1ff4219fdf78561225c5b8ac1ef8c87af80d25c Mon Sep 17 00:00:00 2001 From: leoppro Date: Wed, 14 Jul 2021 10:59:32 +0800 Subject: [PATCH 2/7] new_owner: add comments for async sink (#1900) --- cdc/owner/async_sink.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cdc/owner/async_sink.go b/cdc/owner/async_sink.go index b6af446cc94..4953cac66a3 100644 --- a/cdc/owner/async_sink.go +++ b/cdc/owner/async_sink.go @@ -156,9 +156,11 @@ func (s *asyncSinkImpl) EmitCheckpointTs(ctx cdcContext.Context, ts uint64) { func (s *asyncSinkImpl) EmitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) { ddlFinishedTs := atomic.LoadUint64(&s.ddlFinishedTs) if ddl.CommitTs <= ddlFinishedTs { + // the DDL event is executed successfully, and done is true return true, nil } if ddl.CommitTs <= s.ddlSentTs { + // the DDL event is executing and not finished yes, return false return false, nil } select { From e5c411bd4049c3c5976a3c4a51d6d23f519e3524 Mon Sep 17 00:00:00 2001 From: ben1009 Date: Wed, 14 Jul 2021 15:19:32 +0800 Subject: [PATCH 3/7] pkg: fix retry getBackoffInMs panic when sleep overflowed (#2280) --- pkg/retry/retry_with_opt.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/retry/retry_with_opt.go b/pkg/retry/retry_with_opt.go index b4af380b582..7bf666777b1 100644 --- a/pkg/retry/retry_with_opt.go +++ b/pkg/retry/retry_with_opt.go @@ -88,7 +88,10 @@ func getBackoffInMs(backoffBaseInMs, backoffCapInMs, try float64) time.Duration if temp <= 0 { temp = 1 } - sleep := temp + rand.Int63n(temp) - backOff := math.Min(backoffCapInMs, float64(rand.Int63n(sleep*3))+backoffBaseInMs) + sleep := (temp + rand.Int63n(temp)) * 3 + if sleep <= 0 { + sleep = math.MaxInt64 + } + backOff := math.Min(backoffCapInMs, float64(rand.Int63n(sleep))+backoffBaseInMs) return time.Duration(backOff) * time.Millisecond } From 548cf9c925accae7544f4c914875710bb3f1e140 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Wed, 14 Jul 2021 15:57:32 +0800 Subject: [PATCH 4/7] owner: fix new owner updating checkpoint too early with pending DDL (#2252) --- cdc/owner/changefeed.go | 8 +++++++- cdc/owner/schema.go | 5 ++++- tests/kill_owner_with_ddl/run.sh | 2 +- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index a5ad371deb2..782c7829a5d 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -212,7 +212,10 @@ LOOP: if err != nil { return errors.Trace(err) } - c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs) + // Since we wait for checkpoint == ddlJob.FinishTs before executing the DDL, + // when there is a recovery, there is no guarantee that the DDL at the checkpoint + // has been executed. So we need to start the DDL puller from (checkpoint-1). + c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1) if err != nil { return errors.Trace(err) } @@ -317,6 +320,9 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) { case ddlJobBarrier: ddlResolvedTs, ddlJob := c.ddlPuller.FrontDDL() if ddlJob == nil || ddlResolvedTs != barrierTs { + if ddlResolvedTs < barrierTs { + return barrierTs, nil + } c.barriers.Update(ddlJobBarrier, ddlResolvedTs) return barrierTs, nil } diff --git a/cdc/owner/schema.go b/cdc/owner/schema.go index feb70898920..0cb2327edab 100644 --- a/cdc/owner/schema.go +++ b/cdc/owner/schema.go @@ -46,6 +46,9 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con return nil, errors.Trace(err) } } + // We do a snapshot read of the metadata from TiKV at (startTs-1) instead of startTs, + // because the DDL puller might send a DDL at startTs, which would cause schema conflicts if + // the DDL's result is already contained in the snapshot. schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs-1, config.ForceReplicate) if err != nil { return nil, errors.Trace(err) @@ -58,7 +61,7 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con schemaSnapshot: schemaSnap, filter: f, config: config, - ddlHandledTs: startTs - 1, + ddlHandledTs: startTs, }, nil } diff --git a/tests/kill_owner_with_ddl/run.sh b/tests/kill_owner_with_ddl/run.sh index 6d98e001765..9582e3ef27f 100755 --- a/tests/kill_owner_with_ddl/run.sh +++ b/tests/kill_owner_with_ddl/run.sh @@ -72,7 +72,7 @@ function run() { for i in $(seq 1 3); do kill_cdc_and_restart $pd_addr $WORK_DIR $CDC_BINARY - sleep 2 + sleep 8 done export GO_FAILPOINTS='' From dcfae1971f946f30bbcd722b1ebedcdca1611d38 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 16 Jul 2021 15:01:33 +0800 Subject: [PATCH 5/7] sink: fix concurrent map access in sink manager (#2249) --- cdc/sink/manager.go | 4 ++-- cdc/sink/manager_test.go | 8 +++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index 34d059fe5f9..7fa8cc63699 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -58,6 +58,8 @@ func NewManager(ctx context.Context, backendSink Sink, errCh chan error, checkpo // CreateTableSink creates a table sink func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts) Sink { + m.tableSinksMu.Lock() + defer m.tableSinksMu.Unlock() if _, exist := m.tableSinks[tableID]; exist { log.Panic("the table sink already exists", zap.Uint64("tableID", uint64(tableID))) } @@ -67,8 +69,6 @@ func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts) buffer: make([]*model.RowChangedEvent, 0, 128), emittedTs: checkpointTs, } - m.tableSinksMu.Lock() - defer m.tableSinksMu.Unlock() m.tableSinks[tableID] = sink return sink } diff --git a/cdc/sink/manager_test.go b/cdc/sink/manager_test.go index 9e2de47ce17..49575793659 100644 --- a/cdc/sink/manager_test.go +++ b/cdc/sink/manager_test.go @@ -92,8 +92,14 @@ func (s *managerSuite) TestManagerRandom(c *check.C) { var wg sync.WaitGroup tableSinks := make([]Sink, goroutineNum) for i := 0; i < goroutineNum; i++ { - tableSinks[i] = manager.CreateTableSink(model.TableID(i), 0) + i := i + wg.Add(1) + go func() { + defer wg.Done() + tableSinks[i] = manager.CreateTableSink(model.TableID(i), 0) + }() } + wg.Wait() for i := 0; i < goroutineNum; i++ { i := i tableSink := tableSinks[i] From 78987aa2b736994b86a31d1c0c5848226720f2b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Fri, 16 Jul 2021 19:13:34 +0800 Subject: [PATCH 6/7] puller,mounter,processor: always pull the old value internally (#2271) --- cdc/entry/codec.go | 19 --- cdc/entry/mounter.go | 211 ++++------------------------ cdc/entry/mounter_test.go | 2 +- cdc/model/schema_storage.go | 2 + cdc/processor.go | 7 +- cdc/processor/pipeline/puller.go | 9 +- cdc/processor/pipeline/sink.go | 66 ++++++++- cdc/processor/pipeline/sink_test.go | 207 +++++++++++++++++++++++++-- pkg/regionspan/span.go | 13 +- pkg/regionspan/span_test.go | 11 +- 10 files changed, 308 insertions(+), 239 deletions(-) diff --git a/cdc/entry/codec.go b/cdc/entry/codec.go index f8803125e3b..0aed19266c4 100644 --- a/cdc/entry/codec.go +++ b/cdc/entry/codec.go @@ -31,7 +31,6 @@ import ( var ( tablePrefix = []byte{'t'} recordPrefix = []byte("_r") - indexPrefix = []byte("_i") metaPrefix = []byte("m") ) @@ -39,11 +38,9 @@ var ( intLen = 8 tablePrefixLen = len(tablePrefix) recordPrefixLen = len(recordPrefix) - indexPrefixLen = len(indexPrefix) metaPrefixLen = len(metaPrefix) prefixTableIDLen = tablePrefixLen + intLen /*tableID*/ prefixRecordIDLen = recordPrefixLen + intLen /*recordID*/ - prefixIndexLen = indexPrefixLen + intLen /*indexID*/ ) // MetaType is for data structure meta/data flag. @@ -120,22 +117,6 @@ func decodeRecordID(key []byte) (rest []byte, recordID int64, err error) { return } -func decodeIndexKey(key []byte) (indexID int64, indexValue []types.Datum, err error) { - if len(key) < prefixIndexLen || !bytes.HasPrefix(key, indexPrefix) { - return 0, nil, cerror.ErrInvalidRecordKey.GenWithStackByArgs(key) - } - key = key[indexPrefixLen:] - key, indexID, err = codec.DecodeInt(key) - if err != nil { - return 0, nil, cerror.WrapError(cerror.ErrCodecDecode, err) - } - indexValue, err = codec.Decode(key, 2) - if err != nil { - return 0, nil, cerror.WrapError(cerror.ErrCodecDecode, err) - } - return -} - func decodeMetaKey(ek []byte) (meta, error) { if !bytes.HasPrefix(ek, metaPrefix) { return nil, cerror.ErrInvalidRecordKey.GenWithStackByArgs(ek) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index af50a6185e6..c65bf6e3fad 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -16,7 +16,6 @@ package entry import ( "bytes" "context" - "encoding/binary" "encoding/json" "fmt" "math" @@ -64,66 +63,6 @@ type rowKVEntry struct { PreRowExist bool } -type indexKVEntry struct { - baseKVEntry - IndexID int64 - IndexValue []types.Datum -} - -func (idx *indexKVEntry) unflatten(tableInfo *model.TableInfo, tz *time.Location) error { - if tableInfo.ID != idx.PhysicalTableID { - isPartition := false - if pi := tableInfo.GetPartitionInfo(); pi != nil { - for _, p := range pi.Definitions { - if p.ID == idx.PhysicalTableID { - isPartition = true - break - } - } - } - if !isPartition { - return cerror.ErrWrongTableInfo.GenWithStackByArgs(tableInfo.ID, idx.PhysicalTableID) - } - } - index, exist := tableInfo.GetIndexInfo(idx.IndexID) - if !exist { - return cerror.ErrIndexKeyTableNotFound.GenWithStackByArgs(idx.IndexID) - } - if !isDistinct(index, idx.IndexValue) { - idx.RecordID = idx.baseKVEntry.RecordID - if idx.baseKVEntry.RecordID.IsInt() { - idx.IndexValue = idx.IndexValue[:len(idx.IndexValue)-1] - } else { - idx.IndexValue = idx.IndexValue[:len(idx.IndexValue)-idx.RecordID.NumCols()] - } - } - for i, v := range idx.IndexValue { - colOffset := index.Columns[i].Offset - fieldType := &tableInfo.Columns[colOffset].FieldType - datum, err := unflatten(v, fieldType, tz) - if err != nil { - return errors.Trace(err) - } - idx.IndexValue[i] = datum - } - return nil -} - -func isDistinct(index *timodel.IndexInfo, indexValue []types.Datum) bool { - if index.Primary { - return true - } - if index.Unique { - for _, value := range indexValue { - if value.IsNull() { - return false - } - } - return true - } - return false -} - // Mounter is used to parse SQL events from KV events type Mounter interface { Run(ctx context.Context) error @@ -258,8 +197,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode } return nil, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(physicalTableID) } - switch { - case bytes.HasPrefix(key, recordPrefix): + if bytes.HasPrefix(key, recordPrefix) { rowKV, err := m.unmarshalRowKVEntry(tableInfo, raw.Key, raw.Value, raw.OldValue, baseInfo) if err != nil { return nil, errors.Trace(err) @@ -268,15 +206,6 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode return nil, nil } return m.mountRowKVEntry(tableInfo, rowKV, raw.ApproximateSize()) - case bytes.HasPrefix(key, indexPrefix): - indexKV, err := m.unmarshalIndexKVEntry(key, raw.Value, raw.OldValue, baseInfo) - if err != nil { - return nil, errors.Trace(err) - } - if indexKV == nil { - return nil, nil - } - return m.mountIndexKVEntry(tableInfo, indexKV, raw.ApproximateSize()) } return nil, nil }() @@ -331,46 +260,6 @@ func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, rawKey []b }, nil } -func (m *mounterImpl) unmarshalIndexKVEntry(restKey []byte, rawValue []byte, rawOldValue []byte, base baseKVEntry) (*indexKVEntry, error) { - // Skip set index KV. - // By default we cannot get the old value of a deleted row, then we must get the value of unique key - // or primary key for seeking the deleted row through its index key. - // After the old value was enabled, we can skip the index key. - if !base.Delete || m.enableOldValue { - return nil, nil - } - - indexID, indexValue, err := decodeIndexKey(restKey) - if err != nil { - return nil, errors.Trace(err) - } - var handle kv.Handle - - if len(rawValue) == 8 { - // primary key or unique index - var recordID int64 - buf := bytes.NewBuffer(rawValue) - err = binary.Read(buf, binary.BigEndian, &recordID) - if err != nil { - return nil, errors.Trace(err) - } - handle = kv.IntHandle(recordID) - } else if len(rawValue) > 0 && rawValue[0] == tablecodec.CommonHandleFlag { - handleLen := uint16(rawValue[1])<<8 + uint16(rawValue[2]) - handleEndOff := 3 + handleLen - handle, err = kv.NewCommonHandle(rawValue[3:handleEndOff]) - if err != nil { - return nil, errors.Trace(err) - } - } - base.RecordID = handle - return &indexKVEntry{ - baseKVEntry: base, - IndexID: indexID, - IndexValue: indexValue, - }, nil -} - const ( ddlJobListKey = "DDLJobList" ddlAddIndexJobListKey = "DDLJobAddIdxList" @@ -442,18 +331,14 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill } func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, error) { - // if m.enableOldValue == true, go into this function - // if m.enableNewValue == false and row.Delete == false, go into this function - // if m.enableNewValue == false and row.Delete == true and use explict row id, go into this function - // only if m.enableNewValue == false and row.Delete == true and use implicit row id(_tidb_rowid), skip this function - useImplicitTiDBRowID := !tableInfo.PKIsHandle && !tableInfo.IsCommonHandle - if !m.enableOldValue && row.Delete && useImplicitTiDBRowID { - return nil, nil - } - var err error // Decode previous columns. var preCols []*model.Column + // Since we now always use old value internally, + // we need to control the output(sink will use the PreColumns field to determine whether to output old value). + // Normally old value is output when only enableOldValue is on, + // but for the Delete event, when the old value feature is off, + // the HandleKey column needs to be included as well. So we need to do the following filtering. if row.PreRowExist { // FIXME(leoppro): using pre table info to mounter pre column datum // the pre column and current column in one event may using different table info @@ -461,6 +346,17 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr if err != nil { return nil, errors.Trace(err) } + + // NOTICE: When the old Value feature is off, + // the Delete event only needs to keep the handle key column. + if row.Delete && !m.enableOldValue { + for i := range preCols { + col := preCols[i] + if col != nil && !col.Flag.IsHandleKey() { + preCols[i] = nil + } + } + } } var cols []*model.Column @@ -477,11 +373,20 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr if row.RecordID.IsInt() { intRowID = row.RecordID.IntValue() } + + var tableInfoVersion uint64 + // Align with the old format if old value disabled. + if row.Delete && !m.enableOldValue { + tableInfoVersion = 0 + } else { + tableInfoVersion = tableInfo.TableInfoVersion + } + return &model.RowChangedEvent{ StartTs: row.StartTs, CommitTs: row.CRTs, RowID: intRowID, - TableInfoVersion: tableInfo.TableInfoVersion, + TableInfoVersion: tableInfoVersion, Table: &model.TableName{ Schema: schemaName, Table: tableName, @@ -495,68 +400,6 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr }, nil } -func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKVEntry, dataSize int64) (*model.RowChangedEvent, error) { - // skip set index KV - if !idx.Delete || m.enableOldValue { - return nil, nil - } - // skip any index that is not the handle - if idx.IndexID != tableInfo.HandleIndexID { - return nil, nil - } - - indexInfo, exist := tableInfo.GetIndexInfo(idx.IndexID) - if !exist { - log.Warn("index info not found", zap.Int64("indexID", idx.IndexID)) - return nil, nil - } - - if !tableInfo.IsIndexUnique(indexInfo) { - return nil, nil - } - - err := idx.unflatten(tableInfo, m.tz) - if err != nil { - return nil, errors.Trace(err) - } - - preCols := make([]*model.Column, len(tableInfo.RowColumnsOffset)) - for i, idxCol := range indexInfo.Columns { - colInfo := tableInfo.Columns[idxCol.Offset] - value, warn, err := formatColVal(idx.IndexValue[i], colInfo.Tp) - if err != nil { - return nil, errors.Trace(err) - } - if warn != "" { - log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String())) - } - preCols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{ - Name: colInfo.Name.O, - Type: colInfo.Tp, - Value: value, - Flag: tableInfo.ColumnsFlag[colInfo.ID], - } - } - var intRowID int64 - if idx.RecordID != nil && idx.RecordID.IsInt() { - intRowID = idx.RecordID.IntValue() - } - return &model.RowChangedEvent{ - StartTs: idx.StartTs, - CommitTs: idx.CRTs, - RowID: intRowID, - Table: &model.TableName{ - Schema: tableInfo.TableName.Schema, - Table: tableInfo.TableName.Table, - TableID: idx.PhysicalTableID, - IsPartition: tableInfo.GetPartitionInfo() != nil, - }, - PreColumns: preCols, - IndexColumns: tableInfo.IndexColumnsOffset, - ApproximateSize: dataSize, - }, nil -} - var emptyBytes = make([]byte, 0) func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, err error) { diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index 4eb16eb9576..523bf8d1015 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -385,7 +385,7 @@ func walkTableSpanInStore(c *check.C, store tidbkv.Storage, tableID int64, f fun txn, err := store.Begin() c.Assert(err, check.IsNil) defer txn.Rollback() //nolint:errcheck - tableSpan := regionspan.GetTableSpan(tableID, false) + tableSpan := regionspan.GetTableSpan(tableID) kvIter, err := txn.Iter(tableSpan.Start, tableSpan.End) c.Assert(err, check.IsNil) defer kvIter.Close() diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 0bd8aaaecfd..2238dc3b331 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -143,6 +143,8 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode return ti } +// TODO(hi-rustin): After we don't need to subscribe index update, +// findHandleIndex may be not necessary any more. func (ti *TableInfo) findHandleIndex() { if ti.HandleIndexID == HandleIndexPKIsHandle { // pk is handle diff --git a/cdc/processor.go b/cdc/processor.go index 06b23636a8f..96a83238b2d 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -811,16 +811,17 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo startPuller := func(tableID model.TableID, pResolvedTs *uint64, pCheckpointTs *uint64) sink.Sink { // start table puller - enableOldValue := p.changefeed.Config.EnableOldValue - span := regionspan.GetTableSpan(tableID, enableOldValue) + span := regionspan.GetTableSpan(tableID) kvStorage, err := util.KVStorageFromCtx(ctx) if err != nil { p.sendError(err) return nil } + // NOTICE: always pull the old value internally + // See also: TODO(hi-rustin): add issue link here. plr := puller.NewPuller(ctx, p.pdCli, p.credential, kvStorage, replicaInfo.StartTs, []regionspan.Span{span}, p.limitter, - enableOldValue) + true) go func() { err := plr.Run(ctx) if errors.Cause(err) != context.Canceled { diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 7e82cea50b5..a5e535ce603 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -54,10 +54,10 @@ func (n *pullerNode) tableSpan(ctx cdcContext.Context) []regionspan.Span { // start table puller config := ctx.ChangefeedVars().Info.Config spans := make([]regionspan.Span, 0, 4) - spans = append(spans, regionspan.GetTableSpan(n.tableID, config.EnableOldValue)) + spans = append(spans, regionspan.GetTableSpan(n.tableID)) if config.Cyclic.IsEnabled() && n.replicaInfo.MarkTableID != 0 { - spans = append(spans, regionspan.GetTableSpan(n.replicaInfo.MarkTableID, config.EnableOldValue)) + spans = append(spans, regionspan.GetTableSpan(n.replicaInfo.MarkTableID)) } return spans } @@ -65,12 +65,13 @@ func (n *pullerNode) tableSpan(ctx cdcContext.Context) []regionspan.Span { func (n *pullerNode) Init(ctx pipeline.NodeContext) error { metricTableResolvedTsGauge := tableResolvedTsGauge.WithLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr, n.tableName) globalConfig := config.GetGlobalServerConfig() - config := ctx.ChangefeedVars().Info.Config ctxC, cancel := context.WithCancel(ctx) ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName) ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID) + // NOTICE: always pull the old value internally + // See also: TODO(hi-rustin): add issue link here. plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, globalConfig.Security, ctx.GlobalVars().KVStorage, - n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, config.EnableOldValue) + n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, true) n.wg.Go(func() error { ctx.Throw(errors.Trace(plr.Run(ctxC))) return nil diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index fa492118e20..911b760e0fc 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -142,7 +142,71 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err } func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicEvent) error { - n.eventBuffer = append(n.eventBuffer, event) + if event == nil || event.Row == nil { + return nil + } + + colLen := len(event.Row.Columns) + preColLen := len(event.Row.PreColumns) + config := ctx.ChangefeedVars().Info.Config + + // This indicates that it is an update event, + // and after enable old value internally by default(but disable in the configuration). + // We need to handle the update event to be compatible with the old format. + if !config.EnableOldValue && colLen != 0 && preColLen != 0 && colLen == preColLen { + handleKeyCount := 0 + equivalentHandleKeyCount := 0 + for i := range event.Row.Columns { + if event.Row.Columns[i].Flag.IsHandleKey() && event.Row.PreColumns[i].Flag.IsHandleKey() { + handleKeyCount++ + colValueString := model.ColumnValueString(event.Row.Columns[i].Value) + preColValueString := model.ColumnValueString(event.Row.PreColumns[i].Value) + if colValueString == preColValueString { + equivalentHandleKeyCount++ + } + } + } + + // If the handle key columns are not updated, PreColumns is directly ignored. + if handleKeyCount == equivalentHandleKeyCount { + event.Row.PreColumns = nil + n.eventBuffer = append(n.eventBuffer, event) + } else { + // If there is an update to handle key columns, + // we need to split the event into two events to be compatible with the old format. + // NOTICE: Here we don't need a full deep copy because our two events need Columns and PreColumns respectively, + // so it won't have an impact and no more full deep copy wastes memory. + deleteEvent := *event + deleteEventRow := *event.Row + deleteEventRowKV := *event.RawKV + deleteEvent.Row = &deleteEventRow + deleteEvent.RawKV = &deleteEventRowKV + + deleteEvent.Row.Columns = nil + for i := range deleteEvent.Row.PreColumns { + // NOTICE: Only the handle key pre column is retained in the delete event. + if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() { + deleteEvent.Row.PreColumns[i] = nil + } + } + // Align with the old format if old value disabled. + deleteEvent.Row.TableInfoVersion = 0 + n.eventBuffer = append(n.eventBuffer, &deleteEvent) + + insertEvent := *event + insertEventRow := *event.Row + insertEventRowKV := *event.RawKV + insertEvent.Row = &insertEventRow + insertEvent.RawKV = &insertEventRowKV + + // NOTICE: clean up pre cols for insert event. + insertEvent.Row.PreColumns = nil + n.eventBuffer = append(n.eventBuffer, &insertEvent) + } + } else { + n.eventBuffer = append(n.eventBuffer, event) + } + if len(n.eventBuffer) >= defaultSyncResolvedBatch { if err := n.flushRow2Sink(ctx); err != nil { return errors.Trace(err) diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 910ac91a3c0..35387c499f2 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -16,13 +16,16 @@ package pipeline import ( "context" "testing" + "time" "github.com/pingcap/check" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" cdcContext "github.com/pingcap/ticdc/pkg/context" cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/pipeline" "github.com/pingcap/ticdc/pkg/util/testleak" + "github.com/tikv/client-go/v2/oracle" ) func TestSuite(t *testing.T) { @@ -106,6 +109,13 @@ var _ = check.Suite(&outputSuite{}) func (s *outputSuite) TestStatus(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: "changefeed-id-test-status", + Info: &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + }, + }) // test stop at targetTs node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) @@ -116,19 +126,19 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}), nil)), check.IsNil) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}), nil)), check.IsNil) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}), nil)), check.IsNil) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusRunning) err := node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}), nil)) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)) c.Assert(cerrors.ErrTableProcessorStoppedSafely.Equal(err), check.IsTrue) c.Assert(node.Status(), check.Equals, TableStatusStopped) c.Assert(node.CheckpointTs(), check.Equals, uint64(10)) @@ -142,7 +152,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}), nil)), check.IsNil) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusRunning) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, @@ -150,7 +160,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.Status(), check.Equals, TableStatusRunning) err = node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}), nil)) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)) c.Assert(cerrors.ErrTableProcessorStoppedSafely.Equal(err), check.IsTrue) c.Assert(node.Status(), check.Equals, TableStatusStopped) c.Assert(node.CheckpointTs(), check.Equals, uint64(6)) @@ -164,7 +174,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}), nil)), check.IsNil) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusRunning) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, @@ -172,7 +182,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.Status(), check.Equals, TableStatusRunning) err = node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}), nil)) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)) c.Assert(cerrors.ErrTableProcessorStoppedSafely.Equal(err), check.IsTrue) c.Assert(node.Status(), check.Equals, TableStatusStopped) c.Assert(node.CheckpointTs(), check.Equals, uint64(7)) @@ -181,6 +191,13 @@ func (s *outputSuite) TestStatus(c *check.C) { func (s *outputSuite) TestManyTs(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: "changefeed-id-test-many-ts", + Info: &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + }, + }) sink := &mockSink{} node := newSinkNode(sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) @@ -195,7 +212,7 @@ func (s *outputSuite) TestManyTs(c *check.C) { c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, - pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}}), nil)), check.IsNil) + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusRunning) sink.Check(c, nil) @@ -227,3 +244,175 @@ func (s *outputSuite) TestManyTs(c *check.C) { c.Assert(node.ResolvedTs(), check.Equals, uint64(2)) c.Assert(node.CheckpointTs(), check.Equals, uint64(2)) } + +func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: "changefeed-id-test-split-update-event", + Info: &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + }, + }) + sink := &mockSink{} + node := newSinkNode(sink, 0, 10, &mockFlowController{}) + c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) + + // nil row. + c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}), nil)), check.IsNil) + c.Assert(node.eventBuffer, check.HasLen, 0) + + columns := []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value-updated", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value", + }, + } + preColumns := []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value", + }, + } + c.Assert(node.Receive(pipeline.MockNodeContext4Test( + ctx, + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, + Row: &model.RowChangedEvent{CommitTs: 1, Columns: columns, PreColumns: preColumns}, + }), nil)), + check.IsNil, + ) + c.Assert(node.eventBuffer, check.HasLen, 1) + c.Assert(node.eventBuffer[0].Row.Columns, check.HasLen, 2) + c.Assert(node.eventBuffer[0].Row.PreColumns, check.HasLen, 2) +} + +func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + cfg := config.GetDefaultReplicaConfig() + cfg.EnableOldValue = false + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: "changefeed-id-test-split-update-event", + Info: &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: cfg, + }, + }) + sink := &mockSink{} + node := newSinkNode(sink, 0, 10, &mockFlowController{}) + c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) + + // nil row. + c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}}), nil)), check.IsNil) + c.Assert(node.eventBuffer, check.HasLen, 0) + + // No update to the handle key column. + columns := []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value-updated", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value", + }, + } + preColumns := []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value", + }, + } + + c.Assert(node.Receive(pipeline.MockNodeContext4Test( + ctx, + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, + Row: &model.RowChangedEvent{CommitTs: 1, Columns: columns, PreColumns: preColumns}, + }), nil)), + check.IsNil, + ) + c.Assert(node.eventBuffer, check.HasLen, 1) + c.Assert(node.eventBuffer[0].Row.Columns, check.HasLen, 2) + c.Assert(node.eventBuffer[0].Row.PreColumns, check.HasLen, 0) + + // Cleanup. + node.eventBuffer = []*model.PolymorphicEvent{} + // Update to the handle key column. + columns = []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value-updated", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value-updated", + }, + } + preColumns = []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value", + }, + { + Name: "col2", + Flag: model.HandleKeyFlag, + Value: "col2-value", + }, + } + + c.Assert(node.Receive(pipeline.MockNodeContext4Test( + ctx, + pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, + Row: &model.RowChangedEvent{CommitTs: 1, Columns: columns, PreColumns: preColumns}, + }), nil)), + check.IsNil, + ) + // Split an update event into a delete and an insert event. + c.Assert(node.eventBuffer, check.HasLen, 2) + + deleteEventIndex := 0 + c.Assert(node.eventBuffer[deleteEventIndex].Row.Columns, check.HasLen, 0) + c.Assert(node.eventBuffer[deleteEventIndex].Row.PreColumns, check.HasLen, 2) + nonHandleKeyColIndex := 0 + handleKeyColIndex := 1 + // NOTICE: When old value disabled, we only keep the handle key pre cols. + c.Assert(node.eventBuffer[deleteEventIndex].Row.PreColumns[nonHandleKeyColIndex], check.IsNil) + c.Assert(node.eventBuffer[deleteEventIndex].Row.PreColumns[handleKeyColIndex].Name, check.Equals, "col2") + c.Assert(node.eventBuffer[deleteEventIndex].Row.PreColumns[handleKeyColIndex].Flag.IsHandleKey(), check.IsTrue) + + insertEventIndex := 1 + c.Assert(node.eventBuffer[insertEventIndex].Row.Columns, check.HasLen, 2) + c.Assert(node.eventBuffer[insertEventIndex].Row.PreColumns, check.HasLen, 0) +} diff --git a/pkg/regionspan/span.go b/pkg/regionspan/span.go index 7d06c482117..79fcb5eef7e 100644 --- a/pkg/regionspan/span.go +++ b/pkg/regionspan/span.go @@ -83,19 +83,14 @@ func hackSpan(originStart []byte, originEnd []byte) (start []byte, end []byte) { } // GetTableSpan returns the span to watch for the specified table -func GetTableSpan(tableID int64, exceptIndexSpan bool) Span { +func GetTableSpan(tableID int64) Span { sep := byte('_') recordMarker := byte('r') tablePrefix := tablecodec.GenTablePrefix(tableID) var start, end kv.Key - // ignore index keys if we don't need them - if exceptIndexSpan { - start = append(tablePrefix, sep, recordMarker) - end = append(tablePrefix, sep, recordMarker+1) - } else { - start = append(tablePrefix, sep) - end = append(tablePrefix, sep+1) - } + // ignore index keys. + start = append(tablePrefix, sep, recordMarker) + end = append(tablePrefix, sep, recordMarker+1) return Span{ Start: start, End: end, diff --git a/pkg/regionspan/span_test.go b/pkg/regionspan/span_test.go index 1266acc89f9..46940795816 100644 --- a/pkg/regionspan/span_test.go +++ b/pkg/regionspan/span_test.go @@ -105,16 +105,9 @@ func (s *spanSuite) TestIntersect(c *check.C) { func (s *spanSuite) TestGetTableSpan(c *check.C) { defer testleak.AfterTest(c)() - span := GetTableSpan(123, false) + span := GetTableSpan(123) c.Assert(span.Start, check.Less, span.End) - prefix := []byte(tablecodec.GenTablePrefix(123)) - c.Assert(span.Start, check.Greater, prefix) - prefix[len(prefix)-1]++ - c.Assert(span.End, check.Less, prefix) - - span = GetTableSpan(123, true) - c.Assert(span.Start, check.Less, span.End) - prefix = []byte(tablecodec.GenTableRecordPrefix(123)) + prefix := []byte(tablecodec.GenTableRecordPrefix(123)) c.Assert(span.Start, check.GreaterEqual, prefix) prefix[len(prefix)-1]++ c.Assert(span.End, check.LessEqual, prefix) From bb23408566e513e122f18df81c71491d49b3e106 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Sat, 17 Jul 2021 12:03:34 +0800 Subject: [PATCH 7/7] filter: refine comments and function visibility (#2308) --- pkg/filter/filter.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 15cbc933f9b..b5b7f5843d4 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -22,7 +22,7 @@ import ( filterV2 "github.com/pingcap/tidb-tools/pkg/table-filter" ) -// Filter is a event filter implementation +// Filter is a event filter implementation. type Filter struct { filter filterV2.Filter ignoreTxnStartTs []uint64 @@ -30,7 +30,8 @@ type Filter struct { isCyclicEnabled bool } -// VerifyRules ... +// VerifyRules checks the filter rules in the configuration +// and returns an invalid rule error if the verification fails, otherwise it will return the parsed filter. func VerifyRules(cfg *config.ReplicaConfig) (filterV2.Filter, error) { var f filterV2.Filter var err error @@ -50,7 +51,7 @@ func VerifyRules(cfg *config.ReplicaConfig) (filterV2.Filter, error) { return f, nil } -// NewFilter creates a filter +// NewFilter creates a filter. func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) { f, err := VerifyRules(cfg) if err != nil { @@ -78,9 +79,9 @@ func (f *Filter) shouldIgnoreStartTs(ts uint64) bool { } // ShouldIgnoreTable returns true if the specified table should be ignored by this change feed. -// Set `tbl` to an empty string to test against the whole database. +// NOTICE: Set `tbl` to an empty string to test against the whole database. func (f *Filter) ShouldIgnoreTable(db, tbl string) bool { - if IsSysSchema(db) { + if isSysSchema(db) { return true } if f.isCyclicEnabled && mark.IsMarkTable(db, tbl) { @@ -110,7 +111,7 @@ func (f *Filter) ShouldIgnoreDDLEvent(ts uint64, ddlType model.ActionType, schem return f.shouldIgnoreStartTs(ts) || shouldIgnoreTableOrSchema } -// ShouldDiscardDDL returns true if this DDL should be discarded +// ShouldDiscardDDL returns true if this DDL should be discarded. func (f *Filter) ShouldDiscardDDL(ddlType model.ActionType) bool { if !f.shouldDiscardByBuiltInDDLAllowlist(ddlType) { return false @@ -180,7 +181,7 @@ func (f *Filter) shouldDiscardByBuiltInDDLAllowlist(ddlType model.ActionType) bo return true } -// IsSysSchema returns true if the given schema is a system schema -func IsSysSchema(db string) bool { +// isSysSchema returns true if the given schema is a system schema +func isSysSchema(db string) bool { return filterV1.IsSystemSchema(db) }