From bb8b7c14b1992b15fad2c38d05ae956b306044b9 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 20 Dec 2021 18:09:46 +0800 Subject: [PATCH] etcd_worker: batch etcd patch (#3277) (#3393) --- cdc/capture.go | 2 +- cdc/capture/capture.go | 3 +- cdc/metrics.go | 2 + cdc/task_test.go | 4 +- errors.toml | 10 + go.mod | 1 + metrics/grafana/ticdc.json | 1117 ++++++++++++++++- pkg/errors/errors.go | 8 +- pkg/etcd/client.go | 79 +- pkg/etcd/client_test.go | 128 ++ pkg/orchestrator/batch.go | 91 ++ pkg/orchestrator/batch_test.go | 75 ++ pkg/orchestrator/etcd_worker.go | 173 ++- pkg/orchestrator/etcd_worker_bank_test.go | 9 +- pkg/orchestrator/etcd_worker_test.go | 16 +- pkg/orchestrator/interfaces.go | 6 +- pkg/orchestrator/metrics.go | 52 + .../cdc_state_checker/cdc_monitor.go | 2 +- 18 files changed, 1707 insertions(+), 71 deletions(-) create mode 100644 pkg/orchestrator/batch.go create mode 100644 pkg/orchestrator/batch_test.go create mode 100644 pkg/orchestrator/metrics.go diff --git a/cdc/capture.go b/cdc/capture.go index f5e889759df..db16d3abe4d 100644 --- a/cdc/capture.go +++ b/cdc/capture.go @@ -162,7 +162,7 @@ func (c *Capture) Run(ctx context.Context) (err error) { return errors.Trace(err) } log.Info("start to listen processor task...") - if err := etcdWorker.Run(ctx, c.session, 200*time.Millisecond); err != nil { + if err := etcdWorker.Run(ctx, c.session, 200*time.Millisecond, c.info.AdvertiseAddr); err != nil { // We check ttl of lease instead of check `session.Done`, because // `session.Done` is only notified when etcd client establish a // new keepalive request, there could be a time window as long as diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index cc9adebedf0..4d58deed471 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -281,7 +281,8 @@ func (c *Capture) runEtcdWorker(ctx cdcContext.Context, reactor orchestrator.Rea if err != nil { return errors.Trace(err) } - if err := etcdWorker.Run(ctx, c.session, timerInterval); err != nil { + captureAddr := c.info.AdvertiseAddr + if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr); err != nil { // We check ttl of lease instead of check `session.Done`, because // `session.Done` is only notified when etcd client establish a // new keepalive request, there could be a time window as long as diff --git a/cdc/metrics.go b/cdc/metrics.go index 1d59c443302..37cc38e011d 100644 --- a/cdc/metrics.go +++ b/cdc/metrics.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tiflow/cdc/puller/sorter" "github.com/pingcap/tiflow/cdc/sink" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/prometheus/client_golang/prometheus" ) @@ -37,6 +38,7 @@ func init() { sink.InitMetrics(registry) entry.InitMetrics(registry) sorter.InitMetrics(registry) + orchestrator.InitMetrics(registry) if config.NewReplicaImpl { processor.InitMetrics(registry) tablepipeline.InitMetrics(registry) diff --git a/cdc/task_test.go b/cdc/task_test.go index c97f2a5ff2f..4499e9e533f 100644 --- a/cdc/task_test.go +++ b/cdc/task_test.go @@ -156,7 +156,9 @@ func (s *taskSuite) TestWatch(c *check.C) { } // Watch with a normal context - ch := s.w.Watch(context.Background()) + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + ch := s.w.Watch(ctx) // Trigger the ErrCompacted error c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc.restart_task_watch", "50%off"), check.IsNil) diff --git a/errors.toml b/errors.toml index b2c4981e156..689c375357a 100755 --- a/errors.toml +++ b/errors.toml @@ -206,6 +206,16 @@ error = ''' the etcd txn should be aborted and retried immediately ''' +["CDC:ErrEtcdTxnOpsExceed"] +error = ''' +patch ops:%d of a single changefeed exceed etcd txn max ops:%d +''' + +["CDC:ErrEtcdTxnSizeExceed"] +error = ''' +patch size:%d of a single changefeed exceed etcd txn max size:%d +''' + ["CDC:ErrEventFeedAborted"] error = ''' single event feed aborted diff --git a/go.mod b/go.mod index 6cbe143d0a6..a41faa6afb7 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/Shopify/sarama v1.27.2 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 github.com/apache/pulsar-client-go v0.1.1 + github.com/benbjohnson/clock v1.1.0 github.com/bradleyjkemp/grpc-tools v0.2.5 github.com/cenkalti/backoff v2.2.1+incompatible github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index ed21e8d6100..4fe6eed8ddb 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -929,6 +929,1119 @@ "title": "Server", "type": "row" }, + { + "collapsed": true, + "datasource": null, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 1 + }, + "id": 266, + "panels": [ + { + "cards": { + "cardPadding": 1, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "min": null, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 2 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 262, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\"}[1m])) by (le, capture)", + "format": "heatmap", + "interval": "1", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "title": "EtcdWorker tick reactor duration", + "tooltip": { + "show": true, + "showHistogram": false + }, + "tooltipDecimals": null, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": null, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "auto", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 2 + }, + "hiddenSeries": false, + "id": 264, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "interval": "", + "legendFormat": "{{capture}}-95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}}-99", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker tick reactor duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:2612", + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:2613", + "format": "none", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "cards": { + "cardPadding": 0, + "cardRound": 0 + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "max": null, + "min": 1, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 10 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 256, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le)", + "format": "heatmap", + "instant": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "EtcdWorker exec etcd txn duration", + "tooltip": { + "show": true, + "showHistogram": true + }, + "tooltipDecimals": 1, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": 1, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "upper", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": { + "unit": "s" + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 10 + }, + "hiddenSeries": false, + "id": 258, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "format": "time_series", + "interval": "", + "legendFormat": "{{capture}}-p95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{lcapture}-p99}", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker exec etcd txn duration percentile", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:1612", + "format": "s", + "label": null, + "logBase": 2, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:1613", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "cards": { + "cardPadding": null, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolatePurples", + "exponent": 0.5, + "min": 0, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 18 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 254, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\"}[1m])) by (le, capture)", + "format": "heatmap", + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "title": "EtcdWorker txn size ", + "tooltip": { + "show": true, + "showHistogram": true + }, + "tooltipDecimals": null, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": null, + "format": "decbytes", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "auto", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 18 + }, + "hiddenSeries": false, + "id": 260, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "interval": "", + "legendFormat": "{{capture}}-p95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}}-p99", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker txn size percentile", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:2055", + "format": "bytes", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:2056", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "title": "EtcdWorker", + "type": "row" + }, + { + "collapsed": true, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 1 + }, + "id": 266, + "panels": [ + { + "cards": { + "cardPadding": 1, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "min": null, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 2 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 262, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\"}[1m])) by (le, capture)", + "format": "heatmap", + "interval": "1", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "title": "EtcdWorker tick reactor duration", + "tooltip": { + "show": true, + "showHistogram": false + }, + "tooltipDecimals": null, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": null, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "auto", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 2 + }, + "hiddenSeries": false, + "id": 264, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "interval": "", + "legendFormat": "{{capture}}-95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_tick_reactor_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}}-99", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker tick reactor duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:2612", + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:2613", + "format": "none", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "cards": { + "cardPadding": 0, + "cardRound": 0 + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "max": null, + "min": 1, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 10 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 256, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le)", + "format": "heatmap", + "instant": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "EtcdWorker exec etcd txn duration", + "tooltip": { + "show": true, + "showHistogram": true + }, + "tooltipDecimals": 1, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": 1, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "upper", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": { + "unit": "s" + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 10 + }, + "hiddenSeries": false, + "id": 258, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "format": "time_series", + "interval": "", + "legendFormat": "{{capture}}-p95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_etcd_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}-p99}", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker exec etcd txn duration percentile", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:1612", + "format": "s", + "label": null, + "logBase": 2, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:1613", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "cards": { + "cardPadding": null, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolatePurples", + "exponent": 0.5, + "min": 0, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 18 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 254, + "legend": { + "show": true + }, + "pluginVersion": "6.1.6", + "reverseYBuckets": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\"}[1m])) by (le, capture)", + "format": "heatmap", + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{le}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "title": "EtcdWorker txn size ", + "tooltip": { + "show": true, + "showHistogram": true + }, + "tooltipDecimals": null, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": null, + "format": "decbytes", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "auto", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 18 + }, + "hiddenSeries": false, + "id": 260, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "6.1.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.95, sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "interval": "", + "legendFormat": "{{capture}}-p95", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(ticdc_etcd_worker_etcd_txn_size_bytes_bucket{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[1m])) by (le,capture))", + "hide": false, + "interval": "", + "legendFormat": "{{capture}}-p99", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "EtcdWorker txn size percentile", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:2055", + "format": "bytes", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:2056", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "title": "EtcdWorker", + "type": "row" + }, { "collapsed": true, "gridPos": { @@ -9679,5 +10792,5 @@ "timezone": "browser", "title": "Test-Cluster-TiCDC", "uid": "YiGL8hBZ1", - "version": 25 -} \ No newline at end of file + "version": 26 +} diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 64d583ac919..46c963ff7ee 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -201,9 +201,11 @@ var ( // ErrEtcdSessionDone is used by etcd worker to signal a session done ErrEtcdSessionDone = errors.Normalize("the etcd session is done", errors.RFCCodeText("CDC:ErrEtcdSessionDone")) // ErrReactorFinished is used by reactor to signal a **normal** exit. - ErrReactorFinished = errors.Normalize("the reactor has done its job and should no longer be executed", errors.RFCCodeText("CDC:ErrReactorFinished")) - ErrLeaseTimeout = errors.Normalize("owner lease timeout", errors.RFCCodeText("CDC:ErrLeaseTimeout")) - ErrLeaseExpired = errors.Normalize("owner lease expired ", errors.RFCCodeText("CDC:ErrLeaseExpired")) + ErrReactorFinished = errors.Normalize("the reactor has done its job and should no longer be executed", errors.RFCCodeText("CDC:ErrReactorFinished")) + ErrLeaseTimeout = errors.Normalize("owner lease timeout", errors.RFCCodeText("CDC:ErrLeaseTimeout")) + ErrLeaseExpired = errors.Normalize("owner lease expired ", errors.RFCCodeText("CDC:ErrLeaseExpired")) + ErrEtcdTxnSizeExceed = errors.Normalize("patch size:%d of a single changefeed exceed etcd txn max size:%d", errors.RFCCodeText("CDC:ErrEtcdTxnSizeExceed")) + ErrEtcdTxnOpsExceed = errors.Normalize("patch ops:%d of a single changefeed exceed etcd txn max ops:%d", errors.RFCCodeText("CDC:ErrEtcdTxnOpsExceed")) // pipeline errors ErrSendToClosedPipeline = errors.Normalize("pipeline is closed, cannot send message", errors.RFCCodeText("CDC:ErrSendToClosedPipeline")) diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 23a0e219a58..7769f09a3b0 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -15,7 +15,9 @@ package etcd import ( "context" + "time" + "github.com/benbjohnson/clock" "github.com/pingcap/errors" "github.com/pingcap/log" cerrors "github.com/pingcap/tiflow/pkg/errors" @@ -41,6 +43,14 @@ const ( backoffBaseDelayInMs = 500 // in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second backoffMaxDelayInMs = 60 * 1000 + // If no msg comes from a etcd watchCh for etcdWatchChTimeoutDuration long, + // we should cancel the watchCh and request a new watchCh from etcd client + etcdWatchChTimeoutDuration = 10 * time.Second + // If no msg comes from a etcd watchCh for etcdRequestProgressDuration long, + // we should call RequestProgress of etcd client + etcdRequestProgressDuration = 1 * time.Second + // etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future + etcdWatchChBufferSize = 16 ) // set to var instead of const for mocking the value to speedup test @@ -50,11 +60,13 @@ var maxTries int64 = 8 type Client struct { cli *clientv3.Client metrics map[string]prometheus.Counter + // clock is for making it easier to mock time-related data structures in unit tests + clock clock.Clock } // Wrap warps a clientv3.Client that provides etcd APIs required by TiCDC. func Wrap(cli *clientv3.Client, metrics map[string]prometheus.Counter) *Client { - return &Client{cli: cli, metrics: metrics} + return &Client{cli: cli, metrics: metrics, clock: clock.New()} } // Unwrap returns a clientv3.Client @@ -165,7 +177,70 @@ func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts .. // Watch delegates request to clientv3.Watcher.Watch func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { - return c.cli.Watch(ctx, key, opts...) + watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize) + go c.WatchWithChan(ctx, watchCh, key, opts...) + return watchCh +} + +// WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh +func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, opts ...clientv3.OpOption) { + defer func() { + close(outCh) + log.Info("WatchWithChan exited") + }() + var lastRevision int64 + watchCtx, cancel := context.WithCancel(ctx) + defer cancel() + watchCh := c.cli.Watch(watchCtx, key, opts...) + + ticker := c.clock.Ticker(etcdRequestProgressDuration) + defer ticker.Stop() + lastReceivedResponseTime := c.clock.Now() + + for { + select { + case <-ctx.Done(): + cancel() + return + case response := <-watchCh: + lastReceivedResponseTime = c.clock.Now() + if response.Err() == nil && !response.IsProgressNotify() { + lastRevision = response.Header.Revision + } + + Loop: + // we must loop here until the response is sent to outCh + // or otherwise the response will be lost + for { + select { + case <-ctx.Done(): + cancel() + return + case outCh <- response: // it may block here + break Loop + case <-ticker.C: + if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration { + log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime))) + } + } + } + + ticker.Reset(etcdRequestProgressDuration) + case <-ticker.C: + if err := c.RequestProgress(ctx); err != nil { + log.Warn("failed to request progress for etcd watcher", zap.Error(err)) + } + if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration { + // cancel the last cancel func to reset it + log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack")) + cancel() + watchCtx, cancel = context.WithCancel(ctx) + watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) + // we need to reset lastReceivedResponseTime after reset Watch + lastReceivedResponseTime = c.clock.Now() + } + } + } } // RequestProgress requests a progress notify response be sent in all watch channels. diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index 04a81787d16..56973e1f777 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -17,6 +17,7 @@ import ( "context" "time" + "github.com/benbjohnson/clock" "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/tiflow/pkg/util/testleak" @@ -45,6 +46,23 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3. return nil, errors.New("mock error") } +type mockWatcher struct { + clientv3.Watcher + watchCh chan clientv3.WatchResponse + resetCount *int + requestCount *int +} + +func (m mockWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { + *m.resetCount++ + return m.watchCh +} + +func (m mockWatcher) RequestProgress(ctx context.Context) error { + *m.requestCount++ + return nil +} + func (s *clientSuite) TestRetry(c *check.C) { defer testleak.AfterTest(c)() originValue := maxTries @@ -91,3 +109,113 @@ func (s *etcdSuite) TestDelegateLease(c *check.C) { c.Assert(err, check.IsNil) c.Assert(ttlResp.TTL, check.Equals, int64(-1)) } + +// test no data lost when WatchCh blocked +func (s *etcdSuite) TestWatchChBlocked(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + cli := clientv3.NewCtxClient(context.TODO()) + resetCount := 0 + requestCount := 0 + watchCh := make(chan clientv3.WatchResponse, 1) + watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount} + cli.Watcher = watcher + + sentRes := []clientv3.WatchResponse{ + {CompactRevision: 1}, + {CompactRevision: 2}, + {CompactRevision: 3}, + {CompactRevision: 4}, + {CompactRevision: 5}, + {CompactRevision: 6}, + } + + go func() { + for _, r := range sentRes { + watchCh <- r + } + }() + + mockClock := clock.NewMock() + watchCli := Wrap(cli, nil) + watchCli.clock = mockClock + + key := "testWatchChBlocked" + outCh := make(chan clientv3.WatchResponse, 6) + revision := int64(1) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + + go func() { + watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision)) + }() + receivedRes := make([]clientv3.WatchResponse, 0) + // wait for WatchWithChan set up + r := <-outCh + receivedRes = append(receivedRes, r) + // move time forward + mockClock.Add(time.Second * 30) + + for r := range outCh { + receivedRes = append(receivedRes, r) + } + + c.Check(sentRes, check.DeepEquals, receivedRes) + // make sure watchCh has been reset since timeout + c.Assert(*watcher.resetCount > 1, check.IsTrue) + // make sure RequestProgress has been call since timeout + c.Assert(*watcher.requestCount > 1, check.IsTrue) + // make sure etcdRequestProgressDuration is less than etcdWatchChTimeoutDuration + c.Assert(etcdRequestProgressDuration, check.Less, etcdWatchChTimeoutDuration) +} + +// test no data lost when OutCh blocked +func (s *etcdSuite) TestOutChBlocked(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + + cli := clientv3.NewCtxClient(context.TODO()) + resetCount := 0 + requestCount := 0 + watchCh := make(chan clientv3.WatchResponse, 1) + watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount} + cli.Watcher = watcher + + mockClock := clock.NewMock() + watchCli := Wrap(cli, nil) + watchCli.clock = mockClock + + sentRes := []clientv3.WatchResponse{ + {CompactRevision: 1}, + {CompactRevision: 2}, + {CompactRevision: 3}, + } + + go func() { + for _, r := range sentRes { + watchCh <- r + } + }() + + key := "testOutChBlocked" + outCh := make(chan clientv3.WatchResponse, 1) + revision := int64(1) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + go func() { + watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision)) + }() + receivedRes := make([]clientv3.WatchResponse, 0) + // wait for WatchWithChan set up + r := <-outCh + receivedRes = append(receivedRes, r) + // move time forward + mockClock.Add(time.Second * 30) + + for r := range outCh { + receivedRes = append(receivedRes, r) + } + + c.Check(sentRes, check.DeepEquals, receivedRes) +} diff --git a/pkg/orchestrator/batch.go b/pkg/orchestrator/batch.go new file mode 100644 index 00000000000..3bc98706167 --- /dev/null +++ b/pkg/orchestrator/batch.go @@ -0,0 +1,91 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package orchestrator + +import ( + "github.com/pingcap/errors" + cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/orchestrator/util" +) + +const ( + // 1.25 MiB + // Ref: https://etcd.io/docs/v3.3/dev-guide/limit/ + etcdTxnMaxSize = 1024 * (1024 + 256) + // Ref: https://etcd.io/docs/v3.3/op-guide/configuration/#--max-txn-ops + etcdTxnMaxOps = 128 +) + +// getBatchChangedState has 4 return values: +// 1.batchChangedSate +// 2.number of patch apply to batchChangedState +// 3.size of batchChangedState in byte +// 4.error +func getBatchChangedState(state map[util.EtcdKey][]byte, patchGroups [][]DataPatch) (map[util.EtcdKey][]byte, int, int, error) { + num := 0 + totalSize := 0 + // store changedState of multiple changefeed + batchChangedState := make(map[util.EtcdKey][]byte) + for i, patches := range patchGroups { + changedState, changedSize, err := getChangedState(state, patches) + if err != nil { + return nil, 0, 0, err + } + // if a changefeed's changedState size is larger than etcdTxnMaxSize + // or the length of changedState is larger than etcdTxnMaxOps + // we should return an error instantly + if i == 0 { + if changedSize > etcdTxnMaxSize { + return nil, 0, 0, cerrors.ErrEtcdTxnSizeExceed.GenWithStackByArgs(changedSize, etcdTxnMaxSize) + } + if len(changedState) > etcdTxnMaxOps { + return nil, 0, 0, cerrors.ErrEtcdTxnOpsExceed.GenWithStackByArgs(len(changedState), etcdTxnMaxOps) + } + } + + // batchChangedState size should not exceeds the etcdTxnMaxSize limit + // and keys numbers should not exceeds the etcdTxnMaxOps limit + if totalSize+changedSize >= etcdTxnMaxSize || + len(batchChangedState)+len(changedState) >= etcdTxnMaxOps { + break + } + for k, v := range changedState { + batchChangedState[k] = v + } + num++ + totalSize += changedSize + } + return batchChangedState, num, totalSize, nil +} + +func getChangedState(state map[util.EtcdKey][]byte, patches []DataPatch) (map[util.EtcdKey][]byte, int, error) { + changedSet := make(map[util.EtcdKey]struct{}) + changeState := make(map[util.EtcdKey][]byte) + changedSize := 0 + for _, patch := range patches { + err := patch.Patch(state, changedSet) + if err != nil { + if cerrors.ErrEtcdIgnore.Equal(errors.Cause(err)) { + continue + } + return nil, 0, errors.Trace(err) + } + } + for k := range changedSet { + v := state[k] + changedSize += len(k.String())*2 + len(v) + changeState[k] = v + } + return changeState, changedSize, nil +} diff --git a/pkg/orchestrator/batch_test.go b/pkg/orchestrator/batch_test.go new file mode 100644 index 00000000000..95a7721872f --- /dev/null +++ b/pkg/orchestrator/batch_test.go @@ -0,0 +1,75 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package orchestrator + +import ( + "fmt" + "testing" + + "github.com/pingcap/tiflow/pkg/orchestrator/util" + "github.com/stretchr/testify/require" +) + +func TestGetBatchChangeState(t *testing.T) { + t.Parallel() + patchGroupSize := 1000 + patchGroup := make([][]DataPatch, patchGroupSize) + for i := 0; i < patchGroupSize; i++ { + i := i + patches := []DataPatch{&SingleDataPatch{ + Key: util.NewEtcdKey(fmt.Sprintf("/key%d", i)), + Func: func(old []byte) (newValue []byte, changed bool, err error) { + newValue = []byte(fmt.Sprintf("abc%d", i)) + return newValue, true, nil + }, + }} + patchGroup[i] = patches + } + rawState := make(map[util.EtcdKey][]byte) + changedState, n, size, err := getBatchChangedState(rawState, patchGroup) + require.Nil(t, err) + require.LessOrEqual(t, n, len(patchGroup)) + require.LessOrEqual(t, size, etcdTxnMaxSize) + require.LessOrEqual(t, len(changedState), etcdTxnMaxOps) + require.Equal(t, []byte(fmt.Sprintf("abc%d", 0)), changedState[util.NewEtcdKey("/key0")]) + + // test single patch exceed txn max size + largeSizePatches := []DataPatch{&SingleDataPatch{ + Key: util.NewEtcdKey("largePatch"), + Func: func(old []byte) (newValue []byte, changed bool, err error) { + newValue = make([]byte, etcdTxnMaxSize) + return newValue, true, nil + }, + }} + patchGroup = [][]DataPatch{largeSizePatches} + _, _, _, err = getBatchChangedState(rawState, patchGroup) + require.NotNil(t, err) + require.Contains(t, err.Error(), "a single changefeed exceed etcd txn max size") + + // test single patch exceed txn max ops + manyOpsPatches := make([]DataPatch, 0) + for i := 0; i <= etcdTxnMaxOps*2; i++ { + manyOpsPatches = append(manyOpsPatches, &SingleDataPatch{ + Key: util.NewEtcdKey(fmt.Sprintf("/key%d", i)), + Func: func(old []byte) (newValue []byte, changed bool, err error) { + newValue = []byte(fmt.Sprintf("abc%d", i)) + return newValue, true, nil + }, + }) + } + patchGroup = [][]DataPatch{manyOpsPatches} + _, _, _, err = getBatchChangedState(rawState, patchGroup) + require.NotNil(t, err) + require.Contains(t, err.Error(), "a single changefeed exceed etcd txn max ops") +} diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 077f9ceb27e..cb402edac0c 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -20,18 +20,31 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator/util" + "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" + "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/time/rate" ) +const ( + // etcdTxnTimeoutDuration represents the timeout duration for committing a + // transaction to Etcd + etcdTxnTimeoutDuration = 30 * time.Second + // etcdWorkerLogsWarnDuration when EtcdWorker commits a txn to etcd or ticks + // it reactor takes more than etcdWorkerLogsWarnDuration, it will print a log + etcdWorkerLogsWarnDuration = 1 * time.Second + deletionCounterKey = "/meta/ticdc-delete-etcd-key-count" +) + // EtcdWorker handles all interactions with Etcd type EtcdWorker struct { client *etcd.Client @@ -58,6 +71,15 @@ type EtcdWorker struct { // a `compare-and-swap` semantics, which is essential for implementing // snapshot isolation for Reactor ticks. deleteCounter int64 + + metrics *etcdWorkerMetrics +} + +type etcdWorkerMetrics struct { + // kv events related metrics + metricEtcdTxnSize prometheus.Observer + metricEtcdTxnDuration prometheus.Observer + metricEtcdWorkerTickDuration prometheus.Observer } type etcdUpdate struct { @@ -85,30 +107,35 @@ func NewEtcdWorker(client *etcd.Client, prefix string, reactor Reactor, initStat }, nil } -const ( - etcdRequestProgressDuration = 2 * time.Second - deletionCounterKey = "/meta/ticdc-delete-etcd-key-count" -) +func (worker *EtcdWorker) initMetrics(captureAddr string) { + metrics := &etcdWorkerMetrics{} + metrics.metricEtcdTxnSize = etcdTxnSize.WithLabelValues(captureAddr) + metrics.metricEtcdTxnDuration = etcdTxnExecDuration.WithLabelValues(captureAddr) + metrics.metricEtcdWorkerTickDuration = etcdWorkerTickDuration.WithLabelValues(captureAddr) + worker.metrics = metrics +} // Run starts the EtcdWorker event loop. // A tick is generated either on a timer whose interval is timerInterval, or on an Etcd event. // If the specified etcd session is Done, this Run function will exit with cerrors.ErrEtcdSessionDone. // And the specified etcd session is nil-safety. -func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration) error { +func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, timerInterval time.Duration, captureAddr string) error { defer worker.cleanUp() + worker.initMetrics(captureAddr) + err := worker.syncRawState(ctx) if err != nil { return errors.Trace(err) } - ctx1, cancel := context.WithCancel(ctx) - defer cancel() - ticker := time.NewTicker(timerInterval) defer ticker.Stop() - watchCh := worker.client.Watch(ctx1, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) + watchCtx, cancel := context.WithCancel(ctx) + defer cancel() + watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) + var ( pendingPatches [][]DataPatch exiting bool @@ -120,14 +147,12 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, // should never be closed sessionDone = make(chan struct{}) } - lastReceivedEventTime := time.Now() // tickRate represents the number of times EtcdWorker can tick // the reactor per second tickRate := time.Second / timerInterval rl := rate.NewLimiter(rate.Limit(tickRate), 1) for { - var response clientv3.WatchResponse select { case <-ctx.Done(): return ctx.Err() @@ -135,33 +160,46 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, return cerrors.ErrEtcdSessionDone.GenWithStackByArgs() case <-ticker.C: // There is no new event to handle on timer ticks, so we have nothing here. - if time.Since(lastReceivedEventTime) > etcdRequestProgressDuration { - if err := worker.client.RequestProgress(ctx); err != nil { - log.Warn("failed to request progress for etcd watcher", zap.Error(err)) - } - } - case response = <-watchCh: + case response := <-watchCh: // In this select case, we receive new events from Etcd, and call handleEvent if appropriate. if err := response.Err(); err != nil { return errors.Trace(err) } - lastReceivedEventTime = time.Now() - // Check whether the response is stale. - if worker.revision >= response.Header.GetRevision() { + // ProgressNotify implies no new events. + if response.IsProgressNotify() { + log.Debug("Etcd progress notification", + zap.Int64("revision", response.Header.GetRevision())) + // Note that we don't need to update the revision here, and we + // should not do so, because the revision of the progress notification + // may not satisfy the strict monotonicity we have expected. + // + // Updating `worker.revision` can cause a useful event with the + // same revision to be dropped erroneously. + // + // Refer to https://etcd.io/docs/v3.3/dev-guide/interacting_v3/#watch-progress + // "Note: The revision number in the progress notify response is the revision + // from the local etcd server node that the watch stream is connected to. [...]" + // This implies that the progress notification will NOT go through the raft + // consensus, thereby NOT affecting the revision (index). continue } - worker.revision = response.Header.GetRevision() - // ProgressNotify implies no new events. - if response.IsProgressNotify() { + // Check whether the response is stale. + if worker.revision >= response.Header.GetRevision() { + log.Info("Stale Etcd event dropped", + zap.Int64("event-revision", response.Header.GetRevision()), + zap.Int64("previous-revision", worker.revision), + zap.Any("events", response.Events)) continue } + worker.revision = response.Header.GetRevision() for _, event := range response.Events { // handleEvent will apply the event to our internal `rawState`. worker.handleEvent(ctx, event) } + } if len(pendingPatches) > 0 { @@ -196,8 +234,14 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if !rl.Allow() { continue } + startTime := time.Now() // it is safe that a batch of updates has been applied to worker.state before worker.reactor.Tick nextState, err := worker.reactor.Tick(ctx, worker.state) + costTime := time.Since(startTime) + if costTime > etcdWorkerLogsWarnDuration { + log.Warn("EtcdWorker reactor tick took too long", zap.Duration("duration", costTime)) + } + worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime.Seconds()) if err != nil { if !cerrors.ErrReactorFinished.Equal(errors.Cause(err)) { return errors.Trace(err) @@ -284,33 +328,31 @@ func (worker *EtcdWorker) cloneRawState() map[util.EtcdKey][]byte { } func (worker *EtcdWorker) applyPatchGroups(ctx context.Context, patchGroups [][]DataPatch) ([][]DataPatch, error) { + state := worker.cloneRawState() for len(patchGroups) > 0 { - patches := patchGroups[0] - err := worker.applyPatches(ctx, patches) + changeSate, n, size, err := getBatchChangedState(state, patchGroups) if err != nil { return patchGroups, err } - patchGroups = patchGroups[1:] + err = worker.commitChangedState(ctx, changeSate, size) + if err != nil { + return patchGroups, err + } + patchGroups = patchGroups[n:] } return patchGroups, nil } -func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch) error { - state := worker.cloneRawState() - changedSet := make(map[util.EtcdKey]struct{}) - for _, patch := range patches { - err := patch.Patch(state, changedSet) - if err != nil { - if cerrors.ErrEtcdIgnore.Equal(errors.Cause(err)) { - continue - } - return errors.Trace(err) - } +func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState map[util.EtcdKey][]byte, size int) error { + if len(changedState) == 0 { + return nil } - cmps := make([]clientv3.Cmp, 0, len(changedSet)) - ops := make([]clientv3.Op, 0, len(changedSet)) + + cmps := make([]clientv3.Cmp, 0, len(changedState)) + ops := make([]clientv3.Op, 0, len(changedState)) hasDelete := false - for key := range changedSet { + + for key, value := range changedState { // make sure someone else has not updated the key after the last snapshot var cmp clientv3.Cmp if entry, ok := worker.rawState[key]; ok { @@ -322,7 +364,6 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch) } cmps = append(cmps, cmp) - value := state[key] var op clientv3.Op if value != nil { op = clientv3.OpPut(key.String(), string(value)) @@ -344,7 +385,26 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch) panic("unreachable") } - resp, err := worker.client.Txn(ctx).If(cmps...).Then(ops...).Commit() + worker.metrics.metricEtcdTxnSize.Observe(float64(size)) + startTime := time.Now() + + txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) + resp, err := worker.client.Txn(txnCtx).If(cmps...).Then(ops...).Commit() + cancel() + + // For testing the situation where we have a progress notification that + // has the same revision as the committed Etcd transaction. + failpoint.Inject("InjectProgressRequestAfterCommit", func() { + if err := worker.client.RequestProgress(ctx); err != nil { + failpoint.Return(errors.Trace(err)) + } + }) + + costTime := time.Since(startTime) + if costTime > etcdWorkerLogsWarnDuration { + log.Warn("Etcd transaction took too long", zap.Duration("duration", costTime)) + } + worker.metrics.metricEtcdTxnDuration.Observe(costTime.Seconds()) if err != nil { return errors.Trace(err) } @@ -355,6 +415,8 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch) return nil } + // Logs the conditions for the failed Etcd transaction. + worker.logEtcdCmps(cmps) return cerrors.ErrEtcdTryAgain.GenWithStackByArgs() } @@ -370,19 +432,34 @@ func (worker *EtcdWorker) applyUpdates() error { return nil } -func logEtcdOps(ops []clientv3.Op, commited bool) { - if log.GetLevel() != zapcore.DebugLevel || len(ops) == 0 { +func logEtcdOps(ops []clientv3.Op, committed bool) { + if committed && (log.GetLevel() != zapcore.DebugLevel || len(ops) == 0) { return } - log.Debug("[etcd worker] ==========Update State to ETCD==========") + logFn := log.Debug + if !committed { + logFn = log.Info + } + + logFn("[etcd worker] ==========Update State to ETCD==========") for _, op := range ops { if op.IsDelete() { - log.Debug("[etcd worker] delete key", zap.ByteString("key", op.KeyBytes())) + logFn("[etcd worker] delete key", zap.ByteString("key", op.KeyBytes())) } else { - log.Debug("[etcd worker] put key", zap.ByteString("key", op.KeyBytes()), zap.ByteString("value", op.ValueBytes())) + logFn("[etcd worker] put key", zap.ByteString("key", op.KeyBytes()), zap.ByteString("value", op.ValueBytes())) } } - log.Debug("[etcd worker] ============State Commit=============", zap.Bool("committed", commited)) + logFn("[etcd worker] ============State Commit=============", zap.Bool("committed", committed)) +} + +func (worker *EtcdWorker) logEtcdCmps(cmps []clientv3.Cmp) { + log.Info("[etcd worker] ==========Failed Etcd Txn Cmps==========") + for _, cmp := range cmps { + cmp := etcdserverpb.Compare(cmp) + log.Info("[etcd worker] compare", + zap.String("cmp", cmp.String())) + } + log.Info("[etcd worker] ============End Failed Etcd Txn Cmps=============") } func (worker *EtcdWorker) cleanUp() { diff --git a/pkg/orchestrator/etcd_worker_bank_test.go b/pkg/orchestrator/etcd_worker_bank_test.go index 49a7561a50e..0fa2243815e 100644 --- a/pkg/orchestrator/etcd_worker_bank_test.go +++ b/pkg/orchestrator/etcd_worker_bank_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/log" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/orchestrator/util" @@ -121,6 +122,12 @@ func (b *bankReactor) Tick(ctx context.Context, state ReactorState) (nextState R func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) { defer testleak.AfterTest(c)() + + _ = failpoint.Enable("github.com/pingcap/tiflow/pkg/orchestrator/InjectProgressRequestAfterCommit", "10%return(true)") + defer func() { + _ = failpoint.Disable("github.com/pingcap/tiflow/pkg/orchestrator/InjectProgressRequestAfterCommit") + }() + totalAccountNumber := 25 workerNumber := 10 var wg sync.WaitGroup @@ -150,7 +157,7 @@ func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) { accountNumber: totalAccountNumber, }, &bankReactorState{c: c, index: i, account: make([]int, totalAccountNumber)}) c.Assert(err, check.IsNil) - err = worker.Run(ctx, nil, 100*time.Millisecond) + err = worker.Run(ctx, nil, 100*time.Millisecond, "127.0.0.1") if err == nil || err.Error() == "etcdserver: request timed out" { continue } diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index 001c3a401c2..9b21a02200a 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -268,7 +268,7 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) { return errors.Trace(err) } - return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond)) + return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1")) }) } @@ -353,7 +353,7 @@ func (s *etcdWorkerSuite) TestLinearizability(c *check.C) { c.Assert(err, check.IsNil) errg := &errgroup.Group{} errg.Go(func() error { - return reactor.Run(ctx, nil, 10*time.Millisecond) + return reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") }) time.Sleep(500 * time.Millisecond) @@ -438,7 +438,7 @@ func (s *etcdWorkerSuite) TestFinished(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond) + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -507,7 +507,7 @@ func (s *etcdWorkerSuite) TestCover(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond) + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -586,7 +586,7 @@ func (s *etcdWorkerSuite) TestEmptyTxn(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond) + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -653,7 +653,7 @@ func (s *etcdWorkerSuite) TestEmptyOrNil(c *check.C) { state: make(map[string]string), }) c.Assert(err, check.IsNil) - err = reactor.Run(ctx, nil, 10*time.Millisecond) + err = reactor.Run(ctx, nil, 10*time.Millisecond, "127.0.0.1") c.Assert(err, check.IsNil) resp, err := cli.Get(ctx, prefix+"/key1") c.Assert(err, check.IsNil) @@ -734,7 +734,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) { wg.Add(1) go func() { defer wg.Done() - err := worker1.Run(ctx, nil, time.Millisecond*100) + err := worker1.Run(ctx, nil, time.Millisecond*100, "127.0.0.1") c.Assert(err, check.IsNil) }() @@ -749,7 +749,7 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) { }) c.Assert(err, check.IsNil) - err = worker2.Run(ctx, nil, time.Millisecond*100) + err = worker2.Run(ctx, nil, time.Millisecond*100, "127.0.0.1") c.Assert(err, check.IsNil) modifyReactor.waitOnCh <- struct{}{} diff --git a/pkg/orchestrator/interfaces.go b/pkg/orchestrator/interfaces.go index 4ab21ba6420..5e74fab592b 100644 --- a/pkg/orchestrator/interfaces.go +++ b/pkg/orchestrator/interfaces.go @@ -68,10 +68,10 @@ func (s *SingleDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map return nil } -// MultiDatePatch represents an update to many keys -type MultiDatePatch func(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error +// MultiDataPatch represents an update to many keys +type MultiDataPatch func(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error // Patch implements the DataPatch interface -func (m MultiDatePatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error { +func (m MultiDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error { return m(valueMap, changedSet) } diff --git a/pkg/orchestrator/metrics.go b/pkg/orchestrator/metrics.go new file mode 100644 index 00000000000..efbb242871a --- /dev/null +++ b/pkg/orchestrator/metrics.go @@ -0,0 +1,52 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package orchestrator + +import "github.com/prometheus/client_golang/prometheus" + +var ( + etcdTxnSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "etcd_worker", + Name: "etcd_txn_size_bytes", + Help: "Bucketed histogram of a etcd txn size.", + Buckets: prometheus.ExponentialBuckets(1, 2, 18), + }, []string{"capture"}) + + etcdTxnExecDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "etcd_worker", + Name: "etcd_txn_exec_duration", + Help: "Bucketed histogram of processing time (s) of a etcd txn.", + Buckets: prometheus.ExponentialBuckets(0.002 /* 2 ms */, 2, 18), + }, []string{"capture"}) + + etcdWorkerTickDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "etcd_worker", + Name: "tick_reactor_duration", + Help: "Bucketed histogram of etcdWorker tick reactor time (s).", + Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18), + }, []string{"capture"}) +) + +// InitMetrics registers all metrics in this file +func InitMetrics(registry *prometheus.Registry) { + registry.MustRegister(etcdTxnSize) + registry.MustRegister(etcdTxnExecDuration) + registry.MustRegister(etcdWorkerTickDuration) +} diff --git a/testing_utils/cdc_state_checker/cdc_monitor.go b/testing_utils/cdc_state_checker/cdc_monitor.go index 655770b96e6..c628c4e67a0 100644 --- a/testing_utils/cdc_state_checker/cdc_monitor.go +++ b/testing_utils/cdc_state_checker/cdc_monitor.go @@ -92,7 +92,7 @@ func newCDCMonitor(ctx context.Context, pd string, credential *security.Credenti func (m *cdcMonitor) run(ctx context.Context) error { log.Debug("start running cdcMonitor") - err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond) + err := m.etcdWorker.Run(ctx, nil, 200*time.Millisecond, "127.0.0.1") log.Error("etcdWorker exited: test-case-failed", zap.Error(err)) log.Info("CDC state", zap.Reflect("state", m.reactor.state)) return err