diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index d95c4def0d0..3d09c741779 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -270,7 +270,8 @@ func (c *Capture) runEtcdWorker(ctx cdcContext.Context, reactor orchestrator.Rea if err != nil { return errors.Trace(err) } - if err := etcdWorker.Run(ctx, c.session, timerInterval); err != nil { + captureAddr := c.info.AdvertiseAddr + if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr); err != nil { // We check ttl of lease instead of check `session.Done`, because // `session.Done` is only notified when etcd client establish a // new keepalive request, there could be a time window as long as diff --git a/cdc/metrics.go b/cdc/metrics.go index ae7cfac34e7..bba157b796b 100644 --- a/cdc/metrics.go +++ b/cdc/metrics.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/ticdc/cdc/sorter/unified" "github.com/pingcap/ticdc/pkg/actor" "github.com/pingcap/ticdc/pkg/etcd" + "github.com/pingcap/ticdc/pkg/orchestrator" "github.com/prometheus/client_golang/prometheus" ) @@ -44,6 +45,7 @@ func init() { etcd.InitMetrics(registry) initServerMetrics(registry) actor.InitMetrics(registry) + orchestrator.InitMetrics(registry) // Sorter metrics memory.InitMetrics(registry) unified.InitMetrics(registry) diff --git a/errors.toml b/errors.toml index d35160a82b3..bc4c493cb2f 100755 --- a/errors.toml +++ b/errors.toml @@ -241,6 +241,11 @@ error = ''' the etcd txn should be aborted and retried immediately ''' +["CDC:ErrEtcdTxnSizeExceed"] +error = ''' +patch size of a single changefeed exceed etcd txn max size +''' + ["CDC:ErrEventFeedAborted"] error = ''' single event feed aborted diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 73840c7a08d..f1e6bda773c 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -929,6 +929,1119 @@ "title": "Server", "type": "row" }, + { + "collapsed": true, + "datasource": null, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 1 + }, + "id": 266, + "panels": [ + { + "cards": { + "cardPadding": 1, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "min": null, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 2 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 262, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\"}[1m])) by (le, capture)", + "format": "heatmap", + "interval": "1", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "title": "EtcdWorker tick reactor duration", + "tooltip": { + "show": true, + "showHistogram": false + }, + "tooltipDecimals": null, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": null, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "auto", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 2 + }, + "hiddenSeries": false, + "id": 264, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "interval": "", + "legendFormat": "{{capture}}-95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}}-99", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker tick reactor duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:2612", + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:2613", + "format": "none", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "cards": { + "cardPadding": 0, + "cardRound": 0 + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "max": null, + "min": 1, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 10 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 256, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le)", + "format": "heatmap", + "instant": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "EtcdWorker exec etcd txn duration", + "tooltip": { + "show": true, + "showHistogram": true + }, + "tooltipDecimals": 1, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": 1, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "upper", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": { + "unit": "s" + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 10 + }, + "hiddenSeries": false, + "id": 258, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "format": "time_series", + "interval": "", + "legendFormat": "{{capture}}-p95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{lcapture}-p99}", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker exec etcd txn duration percentile", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:1612", + "format": "s", + "label": null, + "logBase": 2, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:1613", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "cards": { + "cardPadding": null, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolatePurples", + "exponent": 0.5, + "min": 0, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 18 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 254, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\"}[1m])) by (le, capture)", + "format": "heatmap", + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "title": "EtcdWorker txn size ", + "tooltip": { + "show": true, + "showHistogram": true + }, + "tooltipDecimals": null, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": null, + "format": "decbytes", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "auto", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 18 + }, + "hiddenSeries": false, + "id": 260, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "interval": "", + "legendFormat": "{{capture}}-p95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}}-p99", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker txn size percentile", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:2055", + "format": "bytes", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:2056", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "title": "EtcdWorker", + "type": "row" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 1 + }, + "id": 266, + "panels": [ + { + "cards": { + "cardPadding": 1, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "min": null, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 2 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 262, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\"}[1m])) by (le, capture)", + "format": "heatmap", + "interval": "1", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "title": "EtcdWorker tick reactor duration", + "tooltip": { + "show": true, + "showHistogram": false + }, + "tooltipDecimals": null, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": null, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "auto", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 2 + }, + "hiddenSeries": false, + "id": 264, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "interval": "", + "legendFormat": "{{capture}}-95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}}-99", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker tick reactor duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:2612", + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:2613", + "format": "none", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "cards": { + "cardPadding": 0, + "cardRound": 0 + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "max": null, + "min": 1, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 10 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 256, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le)", + "format": "heatmap", + "instant": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "EtcdWorker exec etcd txn duration", + "tooltip": { + "show": true, + "showHistogram": true + }, + "tooltipDecimals": 1, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": 1, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "upper", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": { + "unit": "s" + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 10 + }, + "hiddenSeries": false, + "id": 258, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "format": "time_series", + "interval": "", + "legendFormat": "{{capture}}-p95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}-p99}", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker exec etcd txn duration percentile", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:1612", + "format": "s", + "label": null, + "logBase": 2, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:1613", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "cards": { + "cardPadding": null, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolatePurples", + "exponent": 0.5, + "min": 0, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 18 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 254, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\"}[1m])) by (le, capture)", + "format": "heatmap", + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "title": "EtcdWorker txn size ", + "tooltip": { + "show": true, + "showHistogram": true + }, + "tooltipDecimals": null, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": null, + "format": "decbytes", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "auto", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 18 + }, + "hiddenSeries": false, + "id": 260, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "interval": "", + "legendFormat": "{{capture}}-p95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}}-p99", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker txn size percentile", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:2055", + "format": "bytes", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:2056", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "title": "EtcdWorker", + "type": "row" + }, { "collapsed": true, "gridPos": { @@ -9681,5 +10794,5 @@ "timezone": "browser", "title": "Test-Cluster-TiCDC", "uid": "YiGL8hBZ1", - "version": 25 -} \ No newline at end of file + "version": 26 +} diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index ef42086d356..b8d0fdbd886 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -211,9 +211,10 @@ var ( // ErrEtcdSessionDone is used by etcd worker to signal a session done ErrEtcdSessionDone = errors.Normalize("the etcd session is done", errors.RFCCodeText("CDC:ErrEtcdSessionDone")) // ErrReactorFinished is used by reactor to signal a **normal** exit. - ErrReactorFinished = errors.Normalize("the reactor has done its job and should no longer be executed", errors.RFCCodeText("CDC:ErrReactorFinished")) - ErrLeaseTimeout = errors.Normalize("owner lease timeout", errors.RFCCodeText("CDC:ErrLeaseTimeout")) - ErrLeaseExpired = errors.Normalize("owner lease expired ", errors.RFCCodeText("CDC:ErrLeaseExpired")) + ErrReactorFinished = errors.Normalize("the reactor has done its job and should no longer be executed", errors.RFCCodeText("CDC:ErrReactorFinished")) + ErrLeaseTimeout = errors.Normalize("owner lease timeout", errors.RFCCodeText("CDC:ErrLeaseTimeout")) + ErrLeaseExpired = errors.Normalize("owner lease expired ", errors.RFCCodeText("CDC:ErrLeaseExpired")) + ErrEtcdTxnSizeExceed = errors.Normalize("patch size of a single changefeed exceed etcd txn max size", errors.RFCCodeText("CDC:ErrEtcdTxnSizeExceed")) // pipeline errors ErrSendToClosedPipeline = errors.Normalize("pipeline is closed, cannot send message", errors.RFCCodeText("CDC:ErrSendToClosedPipeline")) diff --git a/pkg/orchestrator/batch.go b/pkg/orchestrator/batch.go new file mode 100644 index 00000000000..4d28ab35f5c --- /dev/null +++ b/pkg/orchestrator/batch.go @@ -0,0 +1,79 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package orchestrator + +import ( + "github.com/pingcap/errors" + cerrors "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/orchestrator/util" +) + +const ( + // 1.25 MiB + // Ref: https://etcd.io/docs/v3.3/dev-guide/limit/ + etcdTxnMaxSize = 1024 * (1024 + 256) +) + +// getBatchChangedState has 4 return values: +// 1.batchChangedSate +// 2.number of patch apply to batchChangedState +// 3.size of batchChangedState in byte +// 4.error +func getBatchChangedState(state map[util.EtcdKey][]byte, patchGroups [][]DataPatch) (map[util.EtcdKey][]byte, int, int, error) { + num := 0 + totalSize := 0 + // store changedState of multiple changefeed + batchChangedState := make(map[util.EtcdKey][]byte) + for i, patches := range patchGroups { + changedState, changedSize, err := getChangedState(state, patches) + if err != nil { + return nil, 0, 0, err + } + // if a changefeed's changedState size is large than etcdTxnMaxSize + // we should return an error instantly + if i == 0 && changedSize >= etcdTxnMaxSize { + return nil, 0, 0, cerrors.ErrEtcdTxnSizeExceed.GenWithStackByArgs() + } + if totalSize+changedSize >= etcdTxnMaxSize { + break + } + for k, v := range changedState { + batchChangedState[k] = v + } + num++ + totalSize += changedSize + } + return batchChangedState, num, totalSize, nil +} + +func getChangedState(state map[util.EtcdKey][]byte, patches []DataPatch) (map[util.EtcdKey][]byte, int, error) { + changedSet := make(map[util.EtcdKey]struct{}) + changeState := make(map[util.EtcdKey][]byte) + changedSize := 0 + for _, patch := range patches { + err := patch.Patch(state, changedSet) + if err != nil { + if cerrors.ErrEtcdIgnore.Equal(errors.Cause(err)) { + continue + } + return nil, 0, errors.Trace(err) + } + } + for k := range changedSet { + v := state[k] + changedSize += len(k.String())*2 + len(v) + changeState[k] = v + } + return changeState, changedSize, nil +} diff --git a/pkg/orchestrator/batch_test.go b/pkg/orchestrator/batch_test.go new file mode 100644 index 00000000000..47185e607d2 --- /dev/null +++ b/pkg/orchestrator/batch_test.go @@ -0,0 +1,58 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package orchestrator + +import ( + "fmt" + "testing" + + "github.com/pingcap/ticdc/pkg/orchestrator/util" + "github.com/stretchr/testify/require" +) + +func TestGetBatchChangeState(t *testing.T) { + t.Parallel() + patchGroupSize := 1000 + patchGroup := make([][]DataPatch, patchGroupSize) + for i := 0; i < patchGroupSize; i++ { + i := i + patches := []DataPatch{&SingleDataPatch{ + Key: util.NewEtcdKey(fmt.Sprintf("/key%d", i)), + Func: func(old []byte) (newValue []byte, changed bool, err error) { + newValue = []byte(fmt.Sprintf("abc%d", i)) + return newValue, true, nil + }, + }} + patchGroup[i] = patches + } + rawState := make(map[util.EtcdKey][]byte) + changedState, n, size, err := getBatchChangedState(rawState, patchGroup) + require.Nil(t, err) + require.LessOrEqual(t, n, len(patchGroup)) + require.LessOrEqual(t, size, etcdTxnMaxSize) + require.Equal(t, []byte(fmt.Sprintf("abc%d", 0)), changedState[util.NewEtcdKey("/key0")]) + + // test single patch exceed txn size + largePatches := []DataPatch{&SingleDataPatch{ + Key: util.NewEtcdKey("largePatch"), + Func: func(old []byte) (newValue []byte, changed bool, err error) { + newValue = make([]byte, etcdTxnMaxSize) + return newValue, true, nil + }, + }} + patchGroup = [][]DataPatch{largePatches} + _, _, _, err = getBatchChangedState(rawState, patchGroup) + require.NotNil(t, err) + require.Contains(t, err.Error(), "patch size of a single changefeed exceed etcd txn max size") +} diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 102fe84db11..acccfbdb21e 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -24,6 +24,7 @@ import ( cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/etcd" "github.com/pingcap/ticdc/pkg/orchestrator/util" + "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" "go.etcd.io/etcd/mvcc/mvccpb" @@ -32,6 +33,11 @@ import ( "golang.org/x/time/rate" ) +const ( + etcdRequestProgressDuration = 2 * time.Second + deletionCounterKey = "/meta/ticdc-delete-etcd-key-count" +) + // EtcdWorker handles all interactions with Etcd type EtcdWorker struct { client *etcd.Client @@ -58,6 +64,14 @@ type EtcdWorker struct { // a `compare-and-swap` semantics, which is essential for implementing // snapshot isolation for Reactor ticks. deleteCounter int64 + metrics *etcdWorkerMetrics +} + +type etcdWorkerMetrics struct { + // kv events related metrics + metricEtcdTxnSize prometheus.Observer + metricEtcdTxnDuration prometheus.Observer + metricEtcdWorkerTickDuration prometheus.Observer } type etcdUpdate struct { @@ -85,18 +99,23 @@ func NewEtcdWorker(client *etcd.Client, prefix string, reactor Reactor, initStat }, nil } -const ( - etcdRequestProgressDuration = 2 * time.Second - deletionCounterKey = "/meta/ticdc-delete-etcd-key-count" -) +func (worker *EtcdWorker) initMetrics(captureAddr string) { + metrics := &etcdWorkerMetrics{} + metrics.metricEtcdTxnSize = etcdTxnSize.WithLabelValues(captureAddr) + metrics.metricEtcdTxnDuration = etcdTxnExecDuration.WithLabelValues(captureAddr) + metrics.metricEtcdWorkerTickDuration = etcdWorkerTickDuration.WithLabelValues(captureAddr) + worker.metrics = metrics +} // Run starts the EtcdWorker event loop. // A tick is generated either on a timer whose interval is timerInterval, or on an Etcd event. // If the specified etcd session is Done, this Run function will exit with cerrors.ErrEtcdSessionDone. // And the specified etcd session is nil-safety. -func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration) error { +func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string) error { defer worker.cleanUp() + worker.initMetrics(captureAddr) + err := worker.syncRawState(ctx) if err != nil { return errors.Trace(err) @@ -127,7 +146,6 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, tickRate := time.Second / timerInterval rl := rate.NewLimiter(rate.Limit(tickRate), 1) for { - var response clientv3.WatchResponse select { case <-ctx.Done(): return ctx.Err() @@ -140,19 +158,17 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, log.Warn("failed to request progress for etcd watcher", zap.Error(err)) } } - case response = <-watchCh: + case response := <-watchCh: // In this select case, we receive new events from Etcd, and call handleEvent if appropriate. if err := response.Err(); err != nil { return errors.Trace(err) } lastReceivedEventTime = time.Now() - // Check whether the response is stale. if worker.revision >= response.Header.GetRevision() { continue } worker.revision = response.Header.GetRevision() - // ProgressNotify implies no new events. if response.IsProgressNotify() { continue @@ -162,6 +178,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, // handleEvent will apply the event to our internal `rawState`. worker.handleEvent(ctx, event) } + } if len(pendingPatches) > 0 { @@ -196,8 +213,14 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if !rl.Allow() { continue } + startTime := time.Now() // it is safe that a batch of updates has been applied to worker.state before worker.reactor.Tick nextState, err := worker.reactor.Tick(ctx, worker.state) + costTime := time.Since(startTime).Seconds() + if costTime > time.Second.Seconds()*1 { + log.Warn("etcdWorker ticks reactor cost time more than 1 second") + } + worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime) if err != nil { if !cerrors.ErrReactorFinished.Equal(errors.Cause(err)) { return errors.Trace(err) @@ -284,33 +307,27 @@ func (worker *EtcdWorker) cloneRawState() map[util.EtcdKey][]byte { } func (worker *EtcdWorker) applyPatchGroups(ctx context.Context, patchGroups [][]DataPatch) ([][]DataPatch, error) { + state := worker.cloneRawState() for len(patchGroups) > 0 { - patches := patchGroups[0] - err := worker.applyPatches(ctx, patches) + changeSate, n, size, err := getBatchChangedState(state, patchGroups) + if err != nil { + return patchGroups, err + } + err = worker.commitChangedState(ctx, changeSate, size) if err != nil { return patchGroups, err } - patchGroups = patchGroups[1:] + patchGroups = patchGroups[n:] } return patchGroups, nil } -func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch) error { - state := worker.cloneRawState() - changedSet := make(map[util.EtcdKey]struct{}) - for _, patch := range patches { - err := patch.Patch(state, changedSet) - if err != nil { - if cerrors.ErrEtcdIgnore.Equal(errors.Cause(err)) { - continue - } - return errors.Trace(err) - } - } - cmps := make([]clientv3.Cmp, 0, len(changedSet)) - ops := make([]clientv3.Op, 0, len(changedSet)) +func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState map[util.EtcdKey][]byte, size int) error { + cmps := make([]clientv3.Cmp, 0, len(changedState)) + ops := make([]clientv3.Op, 0, len(changedState)) hasDelete := false - for key := range changedSet { + + for key, value := range changedState { // make sure someone else has not updated the key after the last snapshot var cmp clientv3.Cmp if entry, ok := worker.rawState[key]; ok { @@ -322,7 +339,6 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch) } cmps = append(cmps, cmp) - value := state[key] var op clientv3.Op if value != nil { op = clientv3.OpPut(key.String(), string(value)) @@ -344,7 +360,14 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch) panic("unreachable") } + worker.metrics.metricEtcdTxnSize.Observe(float64(size)) + startTime := time.Now() resp, err := worker.client.Txn(ctx).If(cmps...).Then(ops...).Commit() + costTime := time.Since(startTime).Seconds() + if costTime > time.Second.Seconds()*1 { + log.Warn("etcdWorker commit etcd txn cost time more than 1 second") + } + worker.metrics.metricEtcdTxnDuration.Observe(costTime) if err != nil { return errors.Trace(err) } diff --git a/pkg/orchestrator/etcd_worker_bank_test.go b/pkg/orchestrator/etcd_worker_bank_test.go index 6a7f3daec32..8785115ec31 100644 --- a/pkg/orchestrator/etcd_worker_bank_test.go +++ b/pkg/orchestrator/etcd_worker_bank_test.go @@ -150,7 +150,7 @@ func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) { accountNumber: totalAccountNumber, }, &bankReactorState{c: c, index: i, account: make([]int, totalAccountNumber)}) c.Assert(err, check.IsNil) - err = worker.Run(ctx, nil, 100*time.Millisecond) + err = worker.Run(ctx, nil, 100*time.Millisecond, "127.0.0.1") if err == nil || err.Error() == "etcdserver: request timed out" { continue } diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index 72cced6004f..4b1075cdc79 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -267,7 +267,7 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) { return errors.Trace(err) } - return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond)) + return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")) }) } @@ -350,7 +350,7 @@ func (s *etcdWorkerSuite) TestLinearizability(c *check.C) { c.Assert(err, check.IsNil) errg := &errgroup.Group{} errg.Go(func() error { - return reactor.Run(ctx, nil, 10*time.Millisecond) + return reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") }) time.Sleep(500 * time.Millisecond) @@ -435,7 +435,7 @@ func (s *etcdWorkerSuite) TestFinished(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond) + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -504,7 +504,7 @@ func (s *etcdWorkerSuite) TestCover(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond) + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -583,7 +583,7 @@ func (s *etcdWorkerSuite) TestEmptyTxn(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond) + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -650,7 +650,7 @@ func (s *etcdWorkerSuite) TestEmptyOrNil(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond) + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -731,7 +731,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) { wg.Add(1) go func() { defer wg.Done() - err := worker1.Run(ctx, nil, time.Millisecond*100) + err := worker1.Run(ctx, nil, time.Millisecond*100, "127.0.0.1") c.Assert(err, check.IsNil) }() @@ -746,7 +746,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) { }) c.Assert(err, check.IsNil) - err = worker2.Run(ctx, nil, time.Millisecond*100) + err = worker2.Run(ctx, nil, time.Millisecond*100, "127.0.0.1") c.Assert(err, check.IsNil) modifyReactor.waitOnCh <- struct{}{} diff --git a/pkg/orchestrator/interfaces.go b/pkg/orchestrator/interfaces.go index 6bf0eaf7b6e..cf29860a471 100644 --- a/pkg/orchestrator/interfaces.go +++ b/pkg/orchestrator/interfaces.go @@ -68,10 +68,10 @@ func (s *SingleDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map return nil } -// MultiDatePatch represents an update to many keys -type MultiDatePatch func(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error +// MultiDataPatch represents an update to many keys +type MultiDataPatch func(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error // Patch implements the DataPatch interface -func (m MultiDatePatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error { +func (m MultiDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error { return m(valueMap, changedSet) } diff --git a/pkg/orchestrator/metrics.go b/pkg/orchestrator/metrics.go new file mode 100644 index 00000000000..efbb242871a --- /dev/null +++ b/pkg/orchestrator/metrics.go @@ -0,0 +1,52 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package orchestrator + +import "github.com/prometheus/client_golang/prometheus" + +var ( + etcdTxnSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "etcd_worker", + Name: "etcd_txn_size_bytes", + Help: "Bucketed histogram of a etcd txn size.", + Buckets: prometheus.ExponentialBuckets(1, 2, 18), + }, []string{"capture"}) + + etcdTxnExecDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "etcd_worker", + Name: "etcd_txn_exec_duration", + Help: "Bucketed histogram of processing time (s) of a etcd txn.", + Buckets: prometheus.ExponentialBuckets(0.002 /* 2 ms */, 2, 18), + }, []string{"capture"}) + + etcdWorkerTickDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "etcd_worker", + Name: "tick_reactor_duration", + Help: "Bucketed histogram of etcdWorker tick reactor time (s).", + Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18), + }, []string{"capture"}) +) + +// InitMetrics registers all metrics in this file +func InitMetrics(registry *prometheus.Registry) { + registry.MustRegister(etcdTxnSize) + registry.MustRegister(etcdTxnExecDuration) + registry.MustRegister(etcdWorkerTickDuration) +} diff --git a/pkg/orchestrator/reactor_state.go b/pkg/orchestrator/reactor_state.go index 9dc513d541a..def5d833e7c 100644 --- a/pkg/orchestrator/reactor_state.go +++ b/pkg/orchestrator/reactor_state.go @@ -100,6 +100,7 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro } // GetPatches implements the ReactorState interface +// Every []DataPatch slice in [][]DataPatch slice is the patches of a ChangefeedReactorState func (s *GlobalReactorState) GetPatches() [][]DataPatch { pendingPatches := s.pendingPatches for _, changefeedState := range s.Changefeeds { diff --git a/testing_utils/cdc_state_checker/cdc_monitor.go b/testing_utils/cdc_state_checker/cdc_monitor.go index 5eac68220a3..4d72348725f 100644 --- a/testing_utils/cdc_state_checker/cdc_monitor.go +++ b/testing_utils/cdc_state_checker/cdc_monitor.go @@ -89,7 +89,7 @@ func newCDCMonitor(ctx context.Context, pd string, credential *security.Credenti func (m *cdcMonitor) run(ctx context.Context) error { log.Debug("start running cdcMonitor") - err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond) + err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond, "127.0.0.1") log.Error("etcdWorker exited: test-case-failed", zap.Error(err)) log.Info("CDC state", zap.Reflect("state", m.reactor.state)) return err