From 01846a9584caeea3b7edcafa9c9cd1650f1d2281 Mon Sep 17 00:00:00 2001 From: Oliver Tan Date: Fri, 18 Feb 2022 22:18:15 +1100 Subject: [PATCH] ttljob: add metrics to row level TTL This commit adds metrics about the row-level TTL job. Note new metrics are created on a per-TTL table basis, which forms the basis of the "relation" label. This allows monitoring of TTL jobs per table. Release note: None --- pkg/jobs/metrics.go | 7 ++ pkg/sql/ttl/ttljob/BUILD.bazel | 4 + pkg/sql/ttl/ttljob/ttljob.go | 185 +++++++++++++++++++++++++++++++- pkg/ts/catalog/chart_catalog.go | 35 ++++++ 4 files changed, 226 insertions(+), 5 deletions(-) diff --git a/pkg/jobs/metrics.go b/pkg/jobs/metrics.go index dee2cc430054..eec66c8e10b1 100644 --- a/pkg/jobs/metrics.go +++ b/pkg/jobs/metrics.go @@ -25,6 +25,7 @@ import ( type Metrics struct { JobMetrics [jobspb.NumJobTypes]*JobTypeMetrics + RowLevelTTL metric.Struct Changefeed metric.Struct StreamIngest metric.Struct @@ -179,6 +180,9 @@ func (Metrics) MetricStruct() {} // init initializes the metrics for job monitoring. func (m *Metrics) init(histogramWindowInterval time.Duration) { + if MakeRowLevelTTLMetricsHook != nil { + m.RowLevelTTL = MakeRowLevelTTLMetricsHook(histogramWindowInterval) + } if MakeChangefeedMetricsHook != nil { m.Changefeed = MakeChangefeedMetricsHook(histogramWindowInterval) } @@ -215,6 +219,9 @@ var MakeChangefeedMetricsHook func(time.Duration) metric.Struct // ccl code. var MakeStreamIngestMetricsHook func(duration time.Duration) metric.Struct +// MakeRowLevelTTLMetricsHook allows for registration of row-level TTL metrics. +var MakeRowLevelTTLMetricsHook func(time.Duration) metric.Struct + // JobTelemetryMetrics is a telemetry metrics for individual job types. type JobTelemetryMetrics struct { Successful telemetry.Counter diff --git a/pkg/sql/ttl/ttljob/BUILD.bazel b/pkg/sql/ttl/ttljob/BUILD.bazel index 8f2ce2850270..c453b8c044ea 100644 --- a/pkg/sql/ttl/ttljob/BUILD.bazel +++ b/pkg/sql/ttl/ttljob/BUILD.bazel @@ -26,9 +26,13 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/types", "//pkg/util/ctxgroup", + "//pkg/util/metric", + "//pkg/util/metric/aggmetric", "//pkg/util/quotapool", + "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_prometheus_client_model//go", ], ) diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index dd9d1f48b34f..ae045ab2b79d 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -13,6 +13,7 @@ package ttljob import ( "context" "math" + "regexp" "time" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -31,9 +32,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric" "github.com/cockroachdb/cockroach/pkg/util/quotapool" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + io_prometheus_client "github.com/prometheus/client_model/go" ) var ( @@ -72,12 +77,133 @@ type rowLevelTTLResumer struct { st *cluster.Settings } +// RowLevelTTLAggMetrics are the row-level TTL job agg metrics. +type RowLevelTTLAggMetrics struct { + RangeTotalDuration *aggmetric.AggHistogram + SelectDuration *aggmetric.AggHistogram + DeleteDuration *aggmetric.AggHistogram + RowSelections *aggmetric.AggCounter + RowDeletions *aggmetric.AggCounter + NumActiveRanges *aggmetric.AggGauge + + mu struct { + syncutil.Mutex + m map[string]rowLevelTTLMetrics + } +} + +type rowLevelTTLMetrics struct { + RangeTotalDuration *aggmetric.Histogram + SelectDuration *aggmetric.Histogram + DeleteDuration *aggmetric.Histogram + RowSelections *aggmetric.Counter + RowDeletions *aggmetric.Counter + NumActiveRanges *aggmetric.Gauge +} + +// MetricStruct implements the metric.Struct interface. +func (m *RowLevelTTLAggMetrics) MetricStruct() {} + +var invalidPrometheusRe = regexp.MustCompile(`[^a-zA-Z0-9_]`) + +func (m *RowLevelTTLAggMetrics) loadMetrics(relation string) rowLevelTTLMetrics { + m.mu.Lock() + defer m.mu.Unlock() + + relation = invalidPrometheusRe.ReplaceAllString(relation, "_") + if ret, ok := m.mu.m[relation]; ok { + return ret + } + ret := rowLevelTTLMetrics{ + RangeTotalDuration: m.RangeTotalDuration.AddChild(relation), + SelectDuration: m.SelectDuration.AddChild(relation), + DeleteDuration: m.DeleteDuration.AddChild(relation), + RowSelections: m.RowSelections.AddChild(relation), + RowDeletions: m.RowDeletions.AddChild(relation), + NumActiveRanges: m.NumActiveRanges.AddChild(relation), + } + m.mu.m[relation] = ret + return ret +} + +func makeRowLevelTTLAggMetrics(histogramWindowInterval time.Duration) metric.Struct { + sigFigs := 2 + b := aggmetric.MakeBuilder("relation") + ret := &RowLevelTTLAggMetrics{ + RangeTotalDuration: b.Histogram( + metric.Metadata{ + Name: "jobs.row_level_ttl.range_total_duration", + Help: "Duration for processing a range during row level TTL.", + Measurement: "nanoseconds", + Unit: metric.Unit_NANOSECONDS, + MetricType: io_prometheus_client.MetricType_HISTOGRAM, + }, + histogramWindowInterval, + time.Hour.Nanoseconds(), + sigFigs, + ), + SelectDuration: b.Histogram( + metric.Metadata{ + Name: "jobs.row_level_ttl.select_duration", + Help: "Duration for select requests during row level TTL.", + Measurement: "nanoseconds", + Unit: metric.Unit_NANOSECONDS, + MetricType: io_prometheus_client.MetricType_HISTOGRAM, + }, + histogramWindowInterval, + time.Minute.Nanoseconds(), + sigFigs, + ), + DeleteDuration: b.Histogram( + metric.Metadata{ + Name: "jobs.row_level_ttl.delete_duration", + Help: "Duration for delete requests during row level TTL.", + Measurement: "nanoseconds", + Unit: metric.Unit_NANOSECONDS, + MetricType: io_prometheus_client.MetricType_HISTOGRAM, + }, + histogramWindowInterval, + time.Minute.Nanoseconds(), + sigFigs, + ), + RowSelections: b.Counter( + metric.Metadata{ + Name: "jobs.row_level_ttl.rows_selected", + Help: "Number of rows selected for deletion by the row level TTL job.", + Measurement: "num_rows", + Unit: metric.Unit_COUNT, + MetricType: io_prometheus_client.MetricType_COUNTER, + }, + ), + RowDeletions: b.Counter( + metric.Metadata{ + Name: "jobs.row_level_ttl.rows_deleted", + Help: "Number of rows deleted by the row level TTL job.", + Measurement: "num_rows", + Unit: metric.Unit_COUNT, + MetricType: io_prometheus_client.MetricType_COUNTER, + }, + ), + NumActiveRanges: b.Gauge( + metric.Metadata{ + Name: "jobs.row_level_ttl.num_active_ranges", + Help: "Number of active workers attempting to delete for row level TTL.", + Measurement: "num_active_workers", + Unit: metric.Unit_COUNT, + }, + ), + } + ret.mu.m = make(map[string]rowLevelTTLMetrics) + return ret +} + var _ jobs.Resumer = (*rowLevelTTLResumer)(nil) // Resume implements the jobs.Resumer interface. func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) error { p := execCtx.(sql.JobExecContext) db := p.ExecCfg().DB + descs := p.ExtendedEvalContext().Descs var knobs sql.TTLTestingKnobs if ttlKnobs := p.ExecCfg().TTLTestingKnobs; ttlKnobs != nil { knobs = *ttlKnobs @@ -103,8 +229,9 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err var pkTypes []*types.T var pkDirs []descpb.IndexDescriptor_Direction var ranges []kv.KeyValue + var name string if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - desc, err := p.ExtendedEvalContext().Descs.GetImmutableTableByID( + desc, err := descs.GetImmutableTableByID( ctx, txn, details.TableID, @@ -136,17 +263,49 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err if ttl == nil { return errors.Newf("unable to find TTL on table %s", desc.GetName()) } + ttlSettings = *ttl ranges, err = kvclient.ScanMetaKVs(ctx, txn, desc.TableSpan(p.ExecCfg().Codec)) if err != nil { return err } - ttlSettings = *ttl + + _, dbDesc, err := descs.GetImmutableDatabaseByID( + ctx, + txn, + desc.GetParentID(), + tree.CommonLookupFlags{ + Required: true, + }, + ) + if err != nil { + return err + } + schemaDesc, err := descs.GetImmutableSchemaByID( + ctx, + txn, + desc.GetParentSchemaID(), + tree.CommonLookupFlags{ + Required: true, + }, + ) + if err != nil { + return err + } + + tn := tree.MakeTableNameWithSchema( + tree.Name(dbDesc.GetName()), + tree.Name(schemaDesc.GetName()), + tree.Name(desc.GetName()), + ) + name = tn.FQString() return nil }); err != nil { return err } + metrics := p.ExecCfg().JobRegistry.MetricsStruct().RowLevelTTL.(*RowLevelTTLAggMetrics).loadMetrics(name) + var rangeDesc roachpb.RangeDescriptor var alloc tree.DatumAlloc type rangeToProcess struct { @@ -169,11 +328,13 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err for i := 0; i < rangeConcurrency; i++ { g.GoCtx(func(ctx context.Context) error { for r := range ch { - if err := runTTLOnRange( + start := timeutil.Now() + err := runTTLOnRange( ctx, p.ExecCfg(), details, p.ExtendedEvalContext().Descs, + metrics, initialVersion, r.startPK, r.endPK, @@ -182,7 +343,9 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err deleteBatchSize, deleteRateLimiter, *aost, - ); err != nil { + ) + metrics.RangeTotalDuration.RecordValue(int64(timeutil.Since(start))) + if err != nil { // Continue until channel is fully read. // Otherwise, the keys input will be blocked. for r = range ch { @@ -261,6 +424,7 @@ func runTTLOnRange( execCfg *sql.ExecutorConfig, details jobspb.RowLevelTTLDetails, descriptors *descs.Collection, + metrics rowLevelTTLMetrics, tableVersion descpb.DescriptorVersion, startPK tree.Datums, endPK tree.Datums, @@ -269,6 +433,9 @@ func runTTLOnRange( deleteRateLimiter *quotapool.RateLimiter, aost tree.DTimestampTZ, ) error { + metrics.NumActiveRanges.Inc(1) + defer metrics.NumActiveRanges.Dec(1) + ie := execCfg.InternalExecutor db := execCfg.DB @@ -297,11 +464,14 @@ func runTTLOnRange( if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { var err error + start := timeutil.Now() expiredRowsPKs, err = selectBuilder.run(ctx, ie, txn) + metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start))) return err }); err != nil { return errors.Wrapf(err, "error selecting rows to delete") } + metrics.RowSelections.Inc(int64(len(expiredRowsPKs))) // Step 2. Delete the rows which have expired. @@ -341,10 +511,14 @@ func runTTLOnRange( defer tokens.Consume() // TODO(#75428): configure admission priority - return deleteBuilder.run(ctx, ie, txn, deleteBatch) + start := timeutil.Now() + err = deleteBuilder.run(ctx, ie, txn, deleteBatch) + metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start))) + return err }); err != nil { return errors.Wrapf(err, "error during row deletion") } + metrics.RowDeletions.Inc(int64(len(deleteBatch))) } // Step 3. Early exit if necessary. @@ -408,4 +582,5 @@ func init() { st: settings, } }) + jobs.MakeRowLevelTTLMetricsHook = makeRowLevelTTLAggMetrics } diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index bd6b4b9a3038..7905de001fe4 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -2383,6 +2383,41 @@ var charts = []sectionDescription{ }, }, }, + { + Organization: [][]string{{SQLLayer, "SQL", "Row Level TTL"}}, + Charts: []chartDescription{ + { + Title: "Active Range Deletes", + Metrics: []string{ + "jobs.row_level_ttl.num_active_ranges", + }, + AxisLabel: "Num Running", + }, + { + Title: "Processing Count", + Metrics: []string{ + "jobs.row_level_ttl.rows_selected", + "jobs.row_level_ttl.rows_deleted", + }, + AxisLabel: "Count", + }, + { + Title: "Processing Latency", + Metrics: []string{ + "jobs.row_level_ttl.select_duration", + "jobs.row_level_ttl.delete_duration", + }, + AxisLabel: "Latency (nanoseconds)", + }, + { + Title: "Net Processing Latency", + Metrics: []string{ + "jobs.row_level_ttl.range_total_duration", + }, + AxisLabel: "Latency (nanoseconds)", + }, + }, + }, { Organization: [][]string{{SQLLayer, "SQL", "Feature Flag"}}, Charts: []chartDescription{