From cb435fe56b9b6eab5b55602e6cf21673e42deb4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Tue, 13 Dec 2022 15:56:52 +0800 Subject: [PATCH] ttl: Add metrics for TTL (#39849) close pingcap/tidb#39848 --- metrics/BUILD.bazel | 1 + metrics/grafana/tidb.json | 830 +++++++++++++++++++++++++++++++++++ metrics/metrics.go | 5 + metrics/ttl.go | 53 +++ ttl/metrics/BUILD.bazel | 19 + ttl/metrics/metrics.go | 147 +++++++ ttl/metrics/metrics_test.go | 70 +++ ttl/session/BUILD.bazel | 1 + ttl/session/session.go | 8 + ttl/ttlworker/BUILD.bazel | 1 + ttl/ttlworker/del.go | 14 +- ttl/ttlworker/job_manager.go | 16 + ttl/ttlworker/scan.go | 33 +- ttl/ttlworker/session.go | 8 + 14 files changed, 1201 insertions(+), 5 deletions(-) create mode 100644 metrics/ttl.go create mode 100644 ttl/metrics/BUILD.bazel create mode 100644 ttl/metrics/metrics.go create mode 100644 ttl/metrics/metrics_test.go diff --git a/metrics/BUILD.bazel b/metrics/BUILD.bazel index ff37513801ede..4b6b6cf9f2ee1 100644 --- a/metrics/BUILD.bazel +++ b/metrics/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "stats.go", "telemetry.go", "topsql.go", + "ttl.go", ], importpath = "github.com/pingcap/tidb/metrics", visibility = ["//visibility:public"], diff --git a/metrics/grafana/tidb.json b/metrics/grafana/tidb.json index bd686ffd05c92..2f52a768e0ee7 100644 --- a/metrics/grafana/tidb.json +++ b/metrics/grafana/tidb.json @@ -16985,6 +16985,836 @@ ], "title": "SourceSQL", "type": "row" + }, + { + "collapsed": false, + "datasource": null, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 75 + }, + "id": 274, + "panels": [], + "title": "TTL", + "type": "row" + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The query count per second for each type of query in TTL jobs", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 76 + }, + "hiddenSeries": false, + "id": 279, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.10", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "delete ok", + "color": "#73BF69" + }, + { + "alias": "select ok", + "color": "#5794F2" + }, + { + "alias": "delete error", + "color": "#F2495C" + }, + { + "alias": "select error", + "color": "#FF7383" + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(tidb_server_ttl_query_duration_count{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (sql_type, result)", + "interval": "", + "legendFormat": "{{sql_type}} {{result}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "TTL QPS By Type", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The processed rows per second by TTL jobs", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 76 + }, + "hiddenSeries": false, + "id": 287, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.10", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "delete error", + "color": "#F2495C" + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(tidb_server_ttl_processed_expired_rows{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (sql_type, result)", + "interval": "", + "legendFormat": "{{sql_type}} {{result}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "TTL Processed Rows Per Second", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The duration of the TTL scan queries", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 84 + }, + "hiddenSeries": false, + "id": 284, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.10", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.50, sum(rate(tidb_server_ttl_query_duration_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", sql_type=\"select\", result=\"ok\"}[1m])) by (le))", + "interval": "", + "legendFormat": "50", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.80, sum(rate(tidb_server_ttl_query_duration_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", sql_type=\"select\", result=\"ok\"}[1m])) by (le))", + "hide": false, + "interval": "", + "legendFormat": "80", + "refId": "B" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.90, sum(rate(tidb_server_ttl_query_duration_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", sql_type=\"select\", result=\"ok\"}[1m])) by (le))\n", + "hide": false, + "interval": "", + "legendFormat": "90", + "refId": "C" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(tidb_server_ttl_query_duration_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", sql_type=\"select\", result=\"ok\"}[1m])) by (le))", + "hide": false, + "interval": "", + "legendFormat": "99", + "refId": "D" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "TTL Scan Query Duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The duration of the TTL delete queries", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 84 + }, + "hiddenSeries": false, + "id": 285, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.10", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "histogram_quantile(0.50, sum(rate(tidb_server_ttl_query_duration_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", sql_type=\"delete\", result=\"ok\"}[1m])) by (le))", + "interval": "", + "legendFormat": "50", + "queryType": "randomWalk", + "refId": "A" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.80, sum(rate(tidb_server_ttl_query_duration_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", sql_type=\"delete\", result=\"ok\"}[1m])) by (le))", + "hide": false, + "interval": "", + "legendFormat": "80", + "refId": "B" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.90, sum(rate(tidb_server_ttl_query_duration_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", sql_type=\"delete\", result=\"ok\"}[1m])) by (le))", + "hide": false, + "interval": "", + "legendFormat": "90", + "refId": "C" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(tidb_server_ttl_query_duration_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", sql_type=\"delete\", result=\"ok\"}[1m])) by (le))", + "hide": false, + "interval": "", + "legendFormat": "99", + "refId": "D" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "TTL Delete Query Duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": true, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The time spent on each phase for scan workers", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 92 + }, + "hiddenSeries": false, + "id": 276, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": false, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.10", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "idle", + "color": "#73BF69" + }, + { + "alias": "query", + "color": "#FADE2A" + }, + { + "alias": "begin_txn", + "color": "#FFA6B0" + }, + { + "alias": "commit_txn", + "color": "#FF7383" + }, + { + "alias": "wait_retry", + "color": "#FF9830" + }, + { + "alias": "check_ttl", + "color": "#C4162A" + }, + { + "alias": "dispatch", + "color": "#8F3BB8" + } + ], + "spaceLength": 10, + "stack": true, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(tidb_server_ttl_phase_time{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type=\"scan_worker\"}[1m])) by (phase)", + "interval": "", + "legendFormat": "{{phase}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Scan Worker Time By Phase", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": true, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The time spent on each phase for delete workers", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 92 + }, + "hiddenSeries": false, + "id": 282, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": false, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.10", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "idle", + "color": "#73BF69" + }, + { + "alias": "query", + "color": "#FADE2A" + }, + { + "alias": "begin_txn", + "color": "#FFA6B0" + }, + { + "alias": "commit_txn", + "color": "#FF7383" + }, + { + "alias": "wait_retry", + "color": "#FF9830" + }, + { + "alias": "check_ttl", + "color": "#C4162A" + }, + { + "alias": "dispatch", + "color": "#8F3BB8" + } + ], + "spaceLength": 10, + "stack": true, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(rate(tidb_server_ttl_phase_time{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type=\"delete_worker\"}[1m])) by (phase)\n", + "interval": "", + "legendFormat": "{{phase}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Delete Worker Time By Phase", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The TTL job statuses in each worker", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 100 + }, + "hiddenSeries": false, + "id": 281, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.10", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "running", + "color": "#5794F2" + }, + { + "alias": "cancelling", + "color": "#F2495C" + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(tidb_server_ttl_job_status{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (type)", + "interval": "", + "legendFormat": "{{ type }}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "TTL Job Count By Status", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "refresh": "30s", diff --git a/metrics/metrics.go b/metrics/metrics.go index 93b278bf87d32..2984b66ddb27c 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -210,6 +210,11 @@ func RegisterMetrics() { prometheus.MustRegister(RegionCheckpointSubscriptionEvent) prometheus.MustRegister(RCCheckTSWriteConfilictCounter) + prometheus.MustRegister(TTLQueryDuration) + prometheus.MustRegister(TTLProcessedExpiredRowsCounter) + prometheus.MustRegister(TTLJobStatus) + prometheus.MustRegister(TTLPhaseTime) + tikvmetrics.InitMetrics(TiDB, TiKVClient) tikvmetrics.RegisterMetrics() tikvmetrics.TiKVPanicCounter = PanicCounter // reset tidb metrics for tikv metrics diff --git a/metrics/ttl.go b/metrics/ttl.go new file mode 100644 index 0000000000000..ab7e47e615e28 --- /dev/null +++ b/metrics/ttl.go @@ -0,0 +1,53 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +// TTL metrics +var ( + TTLQueryDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "ttl_query_duration", + Help: "Bucketed histogram of processing time (s) of handled TTL queries.", + Buckets: prometheus.ExponentialBuckets(0.01, 2, 20), // 10ms ~ 1.45hour + }, []string{LblSQLType, LblResult}) + + TTLProcessedExpiredRowsCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "ttl_processed_expired_rows", + Help: "The count of expired rows processed in TTL jobs", + }, []string{LblSQLType, LblResult}) + + TTLJobStatus = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "ttl_job_status", + Help: "The jobs count in the specified status", + }, []string{LblType}) + + TTLPhaseTime = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "ttl_phase_time", + Help: "The time spent in each phase", + }, []string{LblType, LblPhase}) +) diff --git a/ttl/metrics/BUILD.bazel b/ttl/metrics/BUILD.bazel new file mode 100644 index 0000000000000..f0666b5c59530 --- /dev/null +++ b/ttl/metrics/BUILD.bazel @@ -0,0 +1,19 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "metrics", + srcs = ["metrics.go"], + importpath = "github.com/pingcap/tidb/ttl/metrics", + visibility = ["//visibility:public"], + deps = [ + "//metrics", + "@com_github_prometheus_client_golang//prometheus", + ], +) + +go_test( + name = "metrics_test", + srcs = ["metrics_test.go"], + embed = [":metrics"], + deps = ["@com_github_stretchr_testify//require"], +) diff --git a/ttl/metrics/metrics.go b/ttl/metrics/metrics.go new file mode 100644 index 0000000000000..6f845932383db --- /dev/null +++ b/ttl/metrics/metrics.go @@ -0,0 +1,147 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "time" + + "github.com/pingcap/tidb/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +// Phases to trace +var ( + PhaseIdle = "idle" + PhaseBeginTxn = "begin_txn" + PhaseCommitTxn = "commit_txn" + PhaseQuery = "query" + PhaseCheckTTL = "check_ttl" + PhaseWaitRetry = "wait_retry" + PhaseDispatch = "dispatch" + PhaseOther = "other" +) + +// TTL metrics +var ( + SelectSuccessDuration = metrics.TTLQueryDuration.With(prometheus.Labels{metrics.LblSQLType: "select", metrics.LblResult: metrics.LblOK}) + SelectErrorDuration = metrics.TTLQueryDuration.With(prometheus.Labels{metrics.LblSQLType: "select", metrics.LblResult: metrics.LblError}) + DeleteSuccessDuration = metrics.TTLQueryDuration.With(prometheus.Labels{metrics.LblSQLType: "delete", metrics.LblResult: metrics.LblOK}) + DeleteErrorDuration = metrics.TTLQueryDuration.With(prometheus.Labels{metrics.LblSQLType: "delete", metrics.LblResult: metrics.LblError}) + + ScannedExpiredRows = metrics.TTLProcessedExpiredRowsCounter.With(prometheus.Labels{metrics.LblSQLType: "select", metrics.LblResult: metrics.LblOK}) + DeleteSuccessExpiredRows = metrics.TTLProcessedExpiredRowsCounter.With(prometheus.Labels{metrics.LblSQLType: "delete", metrics.LblResult: metrics.LblOK}) + DeleteErrorExpiredRows = metrics.TTLProcessedExpiredRowsCounter.With(prometheus.Labels{metrics.LblSQLType: "delete", metrics.LblResult: metrics.LblError}) + + RunningJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "running"}) + CancellingJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "cancelling"}) +) + +func initWorkerPhases(workerType string) map[string]prometheus.Counter { + return map[string]prometheus.Counter{ + PhaseIdle: metrics.TTLPhaseTime.WithLabelValues(workerType, PhaseIdle), + PhaseBeginTxn: metrics.TTLPhaseTime.WithLabelValues(workerType, PhaseBeginTxn), + PhaseCommitTxn: metrics.TTLPhaseTime.WithLabelValues(workerType, PhaseCommitTxn), + PhaseQuery: metrics.TTLPhaseTime.WithLabelValues(workerType, PhaseQuery), + PhaseWaitRetry: metrics.TTLPhaseTime.WithLabelValues(workerType, PhaseWaitRetry), + PhaseDispatch: metrics.TTLPhaseTime.WithLabelValues(workerType, PhaseDispatch), + PhaseCheckTTL: metrics.TTLPhaseTime.WithLabelValues(workerType, PhaseCheckTTL), + PhaseOther: metrics.TTLPhaseTime.WithLabelValues(workerType, PhaseOther), + } +} + +var scanWorkerPhases = initWorkerPhases("scan_worker") +var deleteWorkerPhases = initWorkerPhases("delete_worker") + +// PhaseTracer is used to tracer the phases duration +type PhaseTracer struct { + getTime func() time.Time + recordDuration func(phase string, duration time.Duration) + + phase string + phaseTime time.Time +} + +// NewScanWorkerPhaseTracer returns a tracer for scan worker +func NewScanWorkerPhaseTracer() *PhaseTracer { + return newPhaseTracer(time.Now, func(status string, duration time.Duration) { + if counter, ok := scanWorkerPhases[status]; ok { + counter.Add(duration.Seconds()) + } + }) +} + +// NewDeleteWorkerPhaseTracer returns a tracer for delete worker +func NewDeleteWorkerPhaseTracer() *PhaseTracer { + return newPhaseTracer(time.Now, func(status string, duration time.Duration) { + if counter, ok := deleteWorkerPhases[status]; ok { + counter.Add(duration.Seconds()) + } + }) +} + +func newPhaseTracer(getTime func() time.Time, recordDuration func(status string, duration time.Duration)) *PhaseTracer { + return &PhaseTracer{ + getTime: getTime, + recordDuration: recordDuration, + phaseTime: getTime(), + } +} + +// Phase returns the current phase +func (t *PhaseTracer) Phase() string { + if t == nil { + return "" + } + return t.phase +} + +// EnterPhase enters into a new phase +func (t *PhaseTracer) EnterPhase(phase string) { + if t == nil { + return + } + + now := t.getTime() + if t.phase != "" { + t.recordDuration(t.phase, now.Sub(t.phaseTime)) + } + + t.phase = phase + t.phaseTime = now +} + +// EndPhase ends the current phase +func (t *PhaseTracer) EndPhase() { + if t == nil { + return + } + t.EnterPhase("") +} + +const ttlPhaseTraceKey = "ttlPhaseTraceKey" + +// CtxWithPhaseTracer create a new context with tracer +func CtxWithPhaseTracer(ctx context.Context, tracer *PhaseTracer) context.Context { + return context.WithValue(ctx, ttlPhaseTraceKey, tracer) +} + +// PhaseTracerFromCtx returns a tracer from a given context +func PhaseTracerFromCtx(ctx context.Context) *PhaseTracer { + if tracer, ok := ctx.Value(ttlPhaseTraceKey).(*PhaseTracer); ok { + return tracer + } + return nil +} diff --git a/ttl/metrics/metrics_test.go b/ttl/metrics/metrics_test.go new file mode 100644 index 0000000000000..68ca303756ce0 --- /dev/null +++ b/ttl/metrics/metrics_test.go @@ -0,0 +1,70 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestPhaseTracer(t *testing.T) { + tm := time.Now() + getTime := func() time.Time { + return tm + } + + lastReportStatus := "" + lastReportDuration := time.Duration(0) + resetReport := func() { + lastReportStatus = "" + lastReportDuration = time.Duration(0) + } + + tracer := newPhaseTracer(getTime, func(status string, duration time.Duration) { + require.Equal(t, "", lastReportStatus) + require.Equal(t, int64(0), lastReportDuration.Nanoseconds()) + lastReportStatus = status + lastReportDuration = duration + }) + + resetReport() + tm = tm.Add(time.Second * 2) + tracer.EnterPhase("p1") + require.Equal(t, "", lastReportStatus) + require.Equal(t, int64(0), lastReportDuration.Nanoseconds()) + require.Equal(t, "p1", tracer.Phase()) + + tm = tm.Add(time.Second * 5) + tracer.EnterPhase("p2") + require.Equal(t, "p1", lastReportStatus) + require.Equal(t, time.Second*5, lastReportDuration) + require.Equal(t, "p2", tracer.Phase()) + + resetReport() + tm = tm.Add(time.Second * 10) + tracer.EnterPhase("p2") + require.Equal(t, "p2", lastReportStatus) + require.Equal(t, time.Second*10, lastReportDuration) + require.Equal(t, "p2", tracer.Phase()) + + resetReport() + tm = tm.Add(time.Second * 20) + tracer.EndPhase() + require.Equal(t, "p2", lastReportStatus) + require.Equal(t, time.Second*20, lastReportDuration) + require.Equal(t, "", tracer.Phase()) +} diff --git a/ttl/session/BUILD.bazel b/ttl/session/BUILD.bazel index 6d28bff0730dc..2c9dae3fc426f 100644 --- a/ttl/session/BUILD.bazel +++ b/ttl/session/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//sessionctx", "//sessionctx/variable", "//sessiontxn", + "//ttl/metrics", "//util/chunk", "//util/sqlexec", "@com_github_pingcap_errors//:errors", diff --git a/ttl/session/session.go b/ttl/session/session.go index 927b5f570bc92..9a7b115eb4ea9 100644 --- a/ttl/session/session.go +++ b/ttl/session/session.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/sqlexec" ) @@ -94,9 +95,14 @@ func (s *session) ExecuteSQL(ctx context.Context, sql string, args ...interface{ // RunInTxn executes the specified function in a txn func (s *session) RunInTxn(ctx context.Context, fn func() error) (err error) { + tracer := metrics.PhaseTracerFromCtx(ctx) + defer tracer.EnterPhase(tracer.Phase()) + + tracer.EnterPhase(metrics.PhaseBeginTxn) if _, err = s.ExecuteSQL(ctx, "BEGIN"); err != nil { return err } + tracer.EnterPhase(metrics.PhaseOther) success := false defer func() { @@ -110,9 +116,11 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error) (err error) { return err } + tracer.EnterPhase(metrics.PhaseCommitTxn) if _, err = s.ExecuteSQL(ctx, "COMMIT"); err != nil { return err } + tracer.EnterPhase(metrics.PhaseOther) success = true return err diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index c5e95bdaa10a1..8b7c39270807e 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//sessionctx", "//sessionctx/variable", "//ttl/cache", + "//ttl/metrics", "//ttl/session", "//ttl/sqlbuilder", "//types", diff --git a/ttl/ttlworker/del.go b/ttl/ttlworker/del.go index eb86b1f3c6cf0..17769fcc378c9 100644 --- a/ttl/ttlworker/del.go +++ b/ttl/ttlworker/del.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/ttl/sqlbuilder" "github.com/pingcap/tidb/types" @@ -113,8 +114,11 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re return } + sqlStart := time.Now() _, needRetry, err := se.ExecuteSQLWithCheck(ctx, sql) + sqlInterval := time.Since(sqlStart) if err != nil { + metrics.DeleteErrorDuration.Observe(sqlInterval.Seconds()) needRetry = needRetry && ctx.Err() == nil logutil.BgLogger().Warn( "delete SQL in TTL failed", @@ -134,6 +138,7 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re continue } + metrics.DeleteSuccessDuration.Observe(sqlInterval.Seconds()) t.statistics.IncSuccessRows(len(delBatch)) } return retryRows @@ -243,12 +248,16 @@ func newDeleteWorker(delCh <-chan *ttlDeleteTask, sessPool sessionPool) *ttlDele } func (w *ttlDeleteWorker) loop() error { + tracer := metrics.NewDeleteWorkerPhaseTracer() + defer tracer.EndPhase() + + tracer.EnterPhase(metrics.PhaseOther) se, err := getSession(w.sessionPool) if err != nil { return err } - ctx := w.baseWorker.ctx + ctx := metrics.CtxWithPhaseTracer(w.baseWorker.ctx, tracer) doRetry := func(task *ttlDeleteTask) [][]types.Datum { return task.doDelete(ctx, se) @@ -258,13 +267,16 @@ func (w *ttlDeleteWorker) loop() error { defer timer.Stop() for w.Status() == workerStatusRunning { + tracer.EnterPhase(metrics.PhaseIdle) select { case <-ctx.Done(): return nil case <-timer.C: + tracer.EnterPhase(metrics.PhaseOther) nextInterval := w.retryBuffer.DoRetry(doRetry) timer.Reset(nextInterval) case task, ok := <-w.delCh: + tracer.EnterPhase(metrics.PhaseOther) if !ok { return nil } diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index be29a4ac61b6f..7b80fa3165c50 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/timeutil" @@ -118,6 +119,7 @@ func (m *JobManager) jobLoop() error { tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval()) resizeWorkersTicker := time.Tick(resizeWorkersInterval) for { + m.reportMetrics() now := se.Now() select { @@ -162,6 +164,20 @@ func (m *JobManager) jobLoop() error { } } +func (m *JobManager) reportMetrics() { + var runningJobs, cancellingJobs float64 + for _, job := range m.runningJobs { + switch job.status { + case cache.JobStatusRunning: + runningJobs++ + case cache.JobStatusCancelling: + cancellingJobs++ + } + } + metrics.RunningJobsCnt.Set(runningJobs) + metrics.CancellingJobsCnt.Set(cancellingJobs) +} + func (m *JobManager) resizeScanWorkers(count int) error { var err error m.scanWorkers, err = m.resizeWorkers(m.scanWorkers, count, func() worker { diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go index af4d3ed22c078..8a7487d973e35 100644 --- a/ttl/ttlworker/scan.go +++ b/ttl/ttlworker/scan.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/sqlbuilder" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -47,14 +48,17 @@ type ttlStatistics struct { } func (s *ttlStatistics) IncTotalRows(cnt int) { + metrics.ScannedExpiredRows.Add(float64(cnt)) s.TotalRows.Add(uint64(cnt)) } func (s *ttlStatistics) IncSuccessRows(cnt int) { + metrics.DeleteSuccessExpiredRows.Add(float64(cnt)) s.SuccessRows.Add(uint64(cnt)) } func (s *ttlStatistics) IncErrorRows(cnt int) { + metrics.DeleteErrorExpiredRows.Add(float64(cnt)) s.ErrorRows.Add(uint64(cnt)) } @@ -111,9 +115,11 @@ func (t *ttlScanTask) getDatumRows(rows []chunk.Row) [][]types.Datum { func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, sessPool sessionPool) *ttlScanTaskExecResult { // TODO: merge the ctx and the taskCtx in ttl scan task, to allow both "cancel" and gracefully stop workers // now, the taskCtx is only check at the beginning of every loop - taskCtx := t.ctx + tracer := metrics.PhaseTracerFromCtx(ctx) + defer tracer.EnterPhase(tracer.Phase()) + tracer.EnterPhase(metrics.PhaseOther) rawSess, err := getSession(sessPool) if err != nil { return t.result(err) @@ -165,8 +171,11 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s return t.result(nil) } + sqlStart := time.Now() rows, retryable, sqlErr := sess.ExecuteSQLWithCheck(ctx, sql) + selectInterval := time.Since(sqlStart) if sqlErr != nil { + metrics.SelectErrorDuration.Observe(selectInterval.Seconds()) needRetry := retryable && retryTimes < scanTaskExecuteSQLMaxRetry && ctx.Err() == nil logutil.BgLogger().Error("execute query for ttl scan task failed", zap.String("SQL", sql), @@ -180,14 +189,18 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s } retrySQL = sql retryTimes++ + + tracer.EnterPhase(metrics.PhaseWaitRetry) select { case <-ctx.Done(): return t.result(ctx.Err()) case <-time.After(scanTaskExecuteSQLRetryInterval): } + tracer.EnterPhase(metrics.PhaseOther) continue } + metrics.SelectSuccessDuration.Observe(selectInterval.Seconds()) retrySQL = "" retryTimes = 0 lastResult = t.getDatumRows(rows) @@ -201,12 +214,15 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s rows: lastResult, statistics: t.statistics, } + + tracer.EnterPhase(metrics.PhaseDispatch) select { case <-ctx.Done(): return t.result(ctx.Err()) case delCh <- delTask: t.statistics.IncTotalRows(len(lastResult)) } + tracer.EnterPhase(metrics.PhaseOther) } } @@ -282,17 +298,25 @@ func (w *ttlScanWorker) PollTaskResult() *ttlScanTaskExecResult { func (w *ttlScanWorker) loop() error { ctx := w.baseWorker.ctx + tracer := metrics.NewScanWorkerPhaseTracer() + defer tracer.EndPhase() + + ticker := time.Tick(time.Second * 5) for w.Status() == workerStatusRunning { + tracer.EnterPhase(metrics.PhaseIdle) select { case <-ctx.Done(): return nil + case <-ticker: + // ticker is used to update metrics on time case msg, ok := <-w.baseWorker.ch: + tracer.EnterPhase(metrics.PhaseOther) if !ok { return nil } switch task := msg.(type) { case *ttlScanTask: - w.handleScanTask(task) + w.handleScanTask(tracer, task) default: logutil.BgLogger().Warn("unrecognized message for ttlScanWorker", zap.Any("msg", msg)) } @@ -301,8 +325,9 @@ func (w *ttlScanWorker) loop() error { return nil } -func (w *ttlScanWorker) handleScanTask(task *ttlScanTask) { - result := task.doScan(w.ctx, w.delCh, w.sessionPool) +func (w *ttlScanWorker) handleScanTask(tracer *metrics.PhaseTracer, task *ttlScanTask) { + ctx := metrics.CtxWithPhaseTracer(w.ctx, tracer) + result := task.doScan(ctx, w.delCh, w.sessionPool) if result == nil { result = task.result(nil) } diff --git a/ttl/ttlworker/session.go b/ttl/ttlworker/session.go index a8109e4c2e863..b20f436a61859 100644 --- a/ttl/ttlworker/session.go +++ b/ttl/ttlworker/session.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/sqlexec" @@ -84,6 +85,10 @@ type ttlTableSession struct { } func (s *ttlTableSession) ExecuteSQLWithCheck(ctx context.Context, sql string) (rows []chunk.Row, shouldRetry bool, err error) { + tracer := metrics.PhaseTracerFromCtx(ctx) + defer tracer.EnterPhase(tracer.Phase()) + + tracer.EnterPhase(metrics.PhaseOther) if !variable.EnableTTLJob.Load() { return nil, false, errors.New("global TTL job is disabled") } @@ -93,7 +98,10 @@ func (s *ttlTableSession) ExecuteSQLWithCheck(ctx context.Context, sql string) ( } err = s.RunInTxn(ctx, func() error { + tracer.EnterPhase(metrics.PhaseQuery) + defer tracer.EnterPhase(tracer.Phase()) rows, err = s.ExecuteSQL(ctx, sql) + tracer.EnterPhase(metrics.PhaseCheckTTL) // We must check the configuration after ExecuteSQL because of MDL and the meta the current transaction used // can only be determined after executed one query. if validateErr := validateTTLWork(ctx, s.Session, s.tbl, s.expire); validateErr != nil {