Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
76701: ttljob: add metrics to row level TTL r=otan a=otan

This commit adds metrics about the row-level TTL job. Note new metrics
are created on a per-TTL table basis, which forms the basis of the
"relation" label. This allows monitoring of TTL jobs per table.

Release note: None



Co-authored-by: Oliver Tan <[email protected]>
  • Loading branch information
craig[bot] and otan committed Feb 21, 2022
2 parents f577b7b + 01846a9 commit 1a89121
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 5 deletions.
7 changes: 7 additions & 0 deletions pkg/jobs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type Metrics struct {
JobMetrics [jobspb.NumJobTypes]*JobTypeMetrics

RowLevelTTL metric.Struct
Changefeed metric.Struct
StreamIngest metric.Struct

Expand Down Expand Up @@ -179,6 +180,9 @@ func (Metrics) MetricStruct() {}

// init initializes the metrics for job monitoring.
func (m *Metrics) init(histogramWindowInterval time.Duration) {
if MakeRowLevelTTLMetricsHook != nil {
m.RowLevelTTL = MakeRowLevelTTLMetricsHook(histogramWindowInterval)
}
if MakeChangefeedMetricsHook != nil {
m.Changefeed = MakeChangefeedMetricsHook(histogramWindowInterval)
}
Expand Down Expand Up @@ -215,6 +219,9 @@ var MakeChangefeedMetricsHook func(time.Duration) metric.Struct
// ccl code.
var MakeStreamIngestMetricsHook func(duration time.Duration) metric.Struct

// MakeRowLevelTTLMetricsHook allows for registration of row-level TTL metrics.
var MakeRowLevelTTLMetricsHook func(time.Duration) metric.Struct

// JobTelemetryMetrics is a telemetry metrics for individual job types.
type JobTelemetryMetrics struct {
Successful telemetry.Counter
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/ttl/ttljob/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/ctxgroup",
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_prometheus_client_model//go",
],
)

Expand Down
185 changes: 180 additions & 5 deletions pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package ttljob
import (
"context"
"math"
"regexp"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
Expand All @@ -31,9 +32,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
io_prometheus_client "github.com/prometheus/client_model/go"
)

var (
Expand Down Expand Up @@ -72,12 +77,133 @@ type rowLevelTTLResumer struct {
st *cluster.Settings
}

// RowLevelTTLAggMetrics are the row-level TTL job agg metrics.
type RowLevelTTLAggMetrics struct {
RangeTotalDuration *aggmetric.AggHistogram
SelectDuration *aggmetric.AggHistogram
DeleteDuration *aggmetric.AggHistogram
RowSelections *aggmetric.AggCounter
RowDeletions *aggmetric.AggCounter
NumActiveRanges *aggmetric.AggGauge

mu struct {
syncutil.Mutex
m map[string]rowLevelTTLMetrics
}
}

type rowLevelTTLMetrics struct {
RangeTotalDuration *aggmetric.Histogram
SelectDuration *aggmetric.Histogram
DeleteDuration *aggmetric.Histogram
RowSelections *aggmetric.Counter
RowDeletions *aggmetric.Counter
NumActiveRanges *aggmetric.Gauge
}

// MetricStruct implements the metric.Struct interface.
func (m *RowLevelTTLAggMetrics) MetricStruct() {}

var invalidPrometheusRe = regexp.MustCompile(`[^a-zA-Z0-9_]`)

func (m *RowLevelTTLAggMetrics) loadMetrics(relation string) rowLevelTTLMetrics {
m.mu.Lock()
defer m.mu.Unlock()

relation = invalidPrometheusRe.ReplaceAllString(relation, "_")
if ret, ok := m.mu.m[relation]; ok {
return ret
}
ret := rowLevelTTLMetrics{
RangeTotalDuration: m.RangeTotalDuration.AddChild(relation),
SelectDuration: m.SelectDuration.AddChild(relation),
DeleteDuration: m.DeleteDuration.AddChild(relation),
RowSelections: m.RowSelections.AddChild(relation),
RowDeletions: m.RowDeletions.AddChild(relation),
NumActiveRanges: m.NumActiveRanges.AddChild(relation),
}
m.mu.m[relation] = ret
return ret
}

func makeRowLevelTTLAggMetrics(histogramWindowInterval time.Duration) metric.Struct {
sigFigs := 2
b := aggmetric.MakeBuilder("relation")
ret := &RowLevelTTLAggMetrics{
RangeTotalDuration: b.Histogram(
metric.Metadata{
Name: "jobs.row_level_ttl.range_total_duration",
Help: "Duration for processing a range during row level TTL.",
Measurement: "nanoseconds",
Unit: metric.Unit_NANOSECONDS,
MetricType: io_prometheus_client.MetricType_HISTOGRAM,
},
histogramWindowInterval,
time.Hour.Nanoseconds(),
sigFigs,
),
SelectDuration: b.Histogram(
metric.Metadata{
Name: "jobs.row_level_ttl.select_duration",
Help: "Duration for select requests during row level TTL.",
Measurement: "nanoseconds",
Unit: metric.Unit_NANOSECONDS,
MetricType: io_prometheus_client.MetricType_HISTOGRAM,
},
histogramWindowInterval,
time.Minute.Nanoseconds(),
sigFigs,
),
DeleteDuration: b.Histogram(
metric.Metadata{
Name: "jobs.row_level_ttl.delete_duration",
Help: "Duration for delete requests during row level TTL.",
Measurement: "nanoseconds",
Unit: metric.Unit_NANOSECONDS,
MetricType: io_prometheus_client.MetricType_HISTOGRAM,
},
histogramWindowInterval,
time.Minute.Nanoseconds(),
sigFigs,
),
RowSelections: b.Counter(
metric.Metadata{
Name: "jobs.row_level_ttl.rows_selected",
Help: "Number of rows selected for deletion by the row level TTL job.",
Measurement: "num_rows",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_COUNTER,
},
),
RowDeletions: b.Counter(
metric.Metadata{
Name: "jobs.row_level_ttl.rows_deleted",
Help: "Number of rows deleted by the row level TTL job.",
Measurement: "num_rows",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_COUNTER,
},
),
NumActiveRanges: b.Gauge(
metric.Metadata{
Name: "jobs.row_level_ttl.num_active_ranges",
Help: "Number of active workers attempting to delete for row level TTL.",
Measurement: "num_active_workers",
Unit: metric.Unit_COUNT,
},
),
}
ret.mu.m = make(map[string]rowLevelTTLMetrics)
return ret
}

var _ jobs.Resumer = (*rowLevelTTLResumer)(nil)

// Resume implements the jobs.Resumer interface.
func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) error {
p := execCtx.(sql.JobExecContext)
db := p.ExecCfg().DB
descs := p.ExtendedEvalContext().Descs
var knobs sql.TTLTestingKnobs
if ttlKnobs := p.ExecCfg().TTLTestingKnobs; ttlKnobs != nil {
knobs = *ttlKnobs
Expand All @@ -103,8 +229,9 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
var pkTypes []*types.T
var pkDirs []descpb.IndexDescriptor_Direction
var ranges []kv.KeyValue
var name string
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
desc, err := p.ExtendedEvalContext().Descs.GetImmutableTableByID(
desc, err := descs.GetImmutableTableByID(
ctx,
txn,
details.TableID,
Expand Down Expand Up @@ -136,17 +263,49 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
if ttl == nil {
return errors.Newf("unable to find TTL on table %s", desc.GetName())
}
ttlSettings = *ttl

ranges, err = kvclient.ScanMetaKVs(ctx, txn, desc.TableSpan(p.ExecCfg().Codec))
if err != nil {
return err
}
ttlSettings = *ttl

_, dbDesc, err := descs.GetImmutableDatabaseByID(
ctx,
txn,
desc.GetParentID(),
tree.CommonLookupFlags{
Required: true,
},
)
if err != nil {
return err
}
schemaDesc, err := descs.GetImmutableSchemaByID(
ctx,
txn,
desc.GetParentSchemaID(),
tree.CommonLookupFlags{
Required: true,
},
)
if err != nil {
return err
}

tn := tree.MakeTableNameWithSchema(
tree.Name(dbDesc.GetName()),
tree.Name(schemaDesc.GetName()),
tree.Name(desc.GetName()),
)
name = tn.FQString()
return nil
}); err != nil {
return err
}

metrics := p.ExecCfg().JobRegistry.MetricsStruct().RowLevelTTL.(*RowLevelTTLAggMetrics).loadMetrics(name)

var rangeDesc roachpb.RangeDescriptor
var alloc tree.DatumAlloc
type rangeToProcess struct {
Expand All @@ -169,11 +328,13 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
for i := 0; i < rangeConcurrency; i++ {
g.GoCtx(func(ctx context.Context) error {
for r := range ch {
if err := runTTLOnRange(
start := timeutil.Now()
err := runTTLOnRange(
ctx,
p.ExecCfg(),
details,
p.ExtendedEvalContext().Descs,
metrics,
initialVersion,
r.startPK,
r.endPK,
Expand All @@ -182,7 +343,9 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
deleteBatchSize,
deleteRateLimiter,
*aost,
); err != nil {
)
metrics.RangeTotalDuration.RecordValue(int64(timeutil.Since(start)))
if err != nil {
// Continue until channel is fully read.
// Otherwise, the keys input will be blocked.
for r = range ch {
Expand Down Expand Up @@ -261,6 +424,7 @@ func runTTLOnRange(
execCfg *sql.ExecutorConfig,
details jobspb.RowLevelTTLDetails,
descriptors *descs.Collection,
metrics rowLevelTTLMetrics,
tableVersion descpb.DescriptorVersion,
startPK tree.Datums,
endPK tree.Datums,
Expand All @@ -269,6 +433,9 @@ func runTTLOnRange(
deleteRateLimiter *quotapool.RateLimiter,
aost tree.DTimestampTZ,
) error {
metrics.NumActiveRanges.Inc(1)
defer metrics.NumActiveRanges.Dec(1)

ie := execCfg.InternalExecutor
db := execCfg.DB

Expand Down Expand Up @@ -297,11 +464,14 @@ func runTTLOnRange(

if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
start := timeutil.Now()
expiredRowsPKs, err = selectBuilder.run(ctx, ie, txn)
metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start)))
return err
}); err != nil {
return errors.Wrapf(err, "error selecting rows to delete")
}
metrics.RowSelections.Inc(int64(len(expiredRowsPKs)))

// Step 2. Delete the rows which have expired.

Expand Down Expand Up @@ -341,10 +511,14 @@ func runTTLOnRange(
defer tokens.Consume()

// TODO(#75428): configure admission priority
return deleteBuilder.run(ctx, ie, txn, deleteBatch)
start := timeutil.Now()
err = deleteBuilder.run(ctx, ie, txn, deleteBatch)
metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start)))
return err
}); err != nil {
return errors.Wrapf(err, "error during row deletion")
}
metrics.RowDeletions.Inc(int64(len(deleteBatch)))
}

// Step 3. Early exit if necessary.
Expand Down Expand Up @@ -408,4 +582,5 @@ func init() {
st: settings,
}
})
jobs.MakeRowLevelTTLMetricsHook = makeRowLevelTTLAggMetrics
}
35 changes: 35 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2383,6 +2383,41 @@ var charts = []sectionDescription{
},
},
},
{
Organization: [][]string{{SQLLayer, "SQL", "Row Level TTL"}},
Charts: []chartDescription{
{
Title: "Active Range Deletes",
Metrics: []string{
"jobs.row_level_ttl.num_active_ranges",
},
AxisLabel: "Num Running",
},
{
Title: "Processing Count",
Metrics: []string{
"jobs.row_level_ttl.rows_selected",
"jobs.row_level_ttl.rows_deleted",
},
AxisLabel: "Count",
},
{
Title: "Processing Latency",
Metrics: []string{
"jobs.row_level_ttl.select_duration",
"jobs.row_level_ttl.delete_duration",
},
AxisLabel: "Latency (nanoseconds)",
},
{
Title: "Net Processing Latency",
Metrics: []string{
"jobs.row_level_ttl.range_total_duration",
},
AxisLabel: "Latency (nanoseconds)",
},
},
},
{
Organization: [][]string{{SQLLayer, "SQL", "Feature Flag"}},
Charts: []chartDescription{
Expand Down

0 comments on commit 1a89121

Please sign in to comment.