Skip to content

Commit

Permalink
jobs: Extend job metric poller to collect PTS stats
Browse files Browse the repository at this point in the history
Prior PR #89752 added
a metrics poller job which produces per job type stats on the
number of paused jobs.

This PR extends metrics poller to also collect stats related
to protected timestamps created by jobs.
Namely, two new metrics, per job type are added:

* `jobs.<job type>.protected_record_count` -- keeps track of the number
  of protected timestamp records help by the jobs.
* `jobs.<job type>.protected_age_sec` -- keeps track of the age
  of the oldest protected timestamp held by those jobs.

The metrics improve observability into protected timestamp system,
and allow operators to alert when protected timestamp records are
too old since that prevents garbage collection from occuring
(and if GC is not performed for too long, the cluster performance
would degrade).

Follow on work will also make this functionality available for
schedules.

Epic: CRDB-21953
Fixes #78354

Release note (enterprise change): Jobs that utilize protected timestamp
system (BACKUP, CHANGEFEED, IMPORT, etc) now produce metrics that
can be monitored to detect cases when job leaves stale protected
timestamp, preventing garbage collection from occuring.
  • Loading branch information
Yevgeniy Miretskiy committed Feb 22, 2023
1 parent 9a20948 commit a0d6c19
Show file tree
Hide file tree
Showing 15 changed files with 413 additions and 205 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,7 @@ GO_TARGETS = [
"//pkg/jobs/jobsprotectedts:jobsprotectedts",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs/jobstest:jobstest",
"//pkg/jobs/metricspoller:metricspoller",
"//pkg/jobs:jobs",
"//pkg/jobs:jobs_test",
"//pkg/keys:keys",
Expand Down Expand Up @@ -2597,6 +2598,7 @@ GET_X_DATA_TARGETS = [
"//pkg/jobs/jobspb:get_x_data",
"//pkg/jobs/jobsprotectedts:get_x_data",
"//pkg/jobs/jobstest:get_x_data",
"//pkg/jobs/metricspoller:get_x_data",
"//pkg/keys:get_x_data",
"//pkg/keysbase:get_x_data",
"//pkg/keyvisualizer:get_x_data",
Expand Down
2 changes: 2 additions & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,14 @@ go_test(
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/jobs/jobstest",
"//pkg/keys",
"//pkg/keyvisualizer",
"//pkg/kv",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/security/securityassets",
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const (

// defaultPollForMetricsInterval is the default interval to poll the jobs
// table for metrics.
defaultPollForMetricsInterval = 10 * time.Second
defaultPollForMetricsInterval = 30 * time.Second
)

var (
Expand Down
65 changes: 60 additions & 5 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keyvisualizer"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
Expand All @@ -61,6 +63,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -3495,28 +3498,33 @@ func TestPausepoints(t *testing.T) {
}
}

func TestPausedMetrics(t *testing.T) {
func TestJobTypeMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderShort(t)

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
// Make sure we set polling interval before we start the server. Otherwise, we
// might pick up the default value (30 second), which would make this test
// slow.
args := base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
})
Settings: cluster.MakeTestingClusterSettings(),
}
jobs.PollJobsMetricsInterval.Override(ctx, &args.Settings.SV, 10*time.Millisecond)
s, sqlDB, _ := serverutils.StartServer(t, args)
defer s.Stopper().Stop(ctx)

jobs.PollJobsMetricsInterval.Override(ctx, &s.ClusterSettings().SV, 10*time.Millisecond)
runner := sqlutils.MakeSQLRunner(sqlDB)
reg := s.JobRegistry().(*jobs.Registry)

waitForPausedCount := func(typ jobspb.Type, numPaused int64) {
testutils.SucceedsSoon(t, func() error {
currentlyPaused := reg.MetricsStruct().JobMetrics[typ].CurrentlyPaused.Value()
if reg.MetricsStruct().JobMetrics[typ].CurrentlyPaused.Value() != numPaused {
if currentlyPaused != numPaused {
return fmt.Errorf(
"expected (%+v) paused jobs of type (%+v), found (%+v)",
numPaused,
Expand Down Expand Up @@ -3545,6 +3553,36 @@ func TestPausedMetrics(t *testing.T) {
Username: username.TestUserName(),
},
}

execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
writePTSRecord := func(jobID jobspb.JobID) (uuid.UUID, error) {
id := uuid.MakeV4()
record := jobsprotectedts.MakeRecord(
id, int64(jobID), s.Clock().Now(), nil,
jobsprotectedts.Jobs, ptpb.MakeClusterTarget(),
)
return id,
execCfg.InternalDB.Txn(context.Background(), func(ctx context.Context, txn isql.Txn) error {
return execCfg.ProtectedTimestampProvider.WithTxn(txn).Protect(context.Background(), record)
})
}
relesePTSRecord := func(id uuid.UUID) error {
return execCfg.InternalDB.Txn(context.Background(), func(ctx context.Context, txn isql.Txn) error {
return execCfg.ProtectedTimestampProvider.WithTxn(txn).Release(context.Background(), id)
})
}

checkPTSCounts := func(typ jobspb.Type, count int64) {
testutils.SucceedsSoon(t, func() error {
m := reg.MetricsStruct().JobMetrics[typ]
if m.NumJobsWithPTS.Value() == count && (count == 0 || m.ProtectedAge.Value() > 0) {
return nil
}
return errors.Newf("still waiting for PTS count to reach %d: c=%d age=%d",
count, m.NumJobsWithPTS.Value(), m.ProtectedAge.Value())
})
}

for typ := range typeToRecord {
jobs.RegisterConstructor(typ, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
Expand All @@ -3571,6 +3609,17 @@ func TestPausedMetrics(t *testing.T) {
importJob := makeJob(context.Background(), jobspb.TypeImport)
scJob := makeJob(context.Background(), jobspb.TypeSchemaChange)

// Write few PTS records
cfJobPTSID, err := writePTSRecord(cfJob.ID())
require.NoError(t, err)
_, err = writePTSRecord(cfJob.ID())
require.NoError(t, err)
importJobPTSID, err := writePTSRecord(importJob.ID())
require.NoError(t, err)

checkPTSCounts(jobspb.TypeChangefeed, 2)
checkPTSCounts(jobspb.TypeImport, 1)

// Pause all job types.
runner.Exec(t, "PAUSE JOB $1", cfJob.ID())
waitForPausedCount(jobspb.TypeChangefeed, 1)
Expand All @@ -3581,6 +3630,12 @@ func TestPausedMetrics(t *testing.T) {
runner.Exec(t, "PAUSE JOB $1", scJob.ID())
waitForPausedCount(jobspb.TypeSchemaChange, 1)

// Release some of the pts records.
require.NoError(t, relesePTSRecord(cfJobPTSID))
require.NoError(t, relesePTSRecord(importJobPTSID))
checkPTSCounts(jobspb.TypeChangefeed, 1)
checkPTSCounts(jobspb.TypeImport, 0)

// Resume / cancel jobs.
runner.Exec(t, "RESUME JOB $1", cfJob.ID())
waitForPausedCount(jobspb.TypeChangefeed, 1)
Expand Down
7 changes: 4 additions & 3 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func MakeStatusFunc(jr *jobs.Registry, metaType MetaType) ptreconcile.StatusFunc
switch metaType {
case Jobs:
return func(ctx context.Context, txn isql.Txn, meta []byte) (shouldRemove bool, _ error) {
jobID, err := decodeID(meta)
jobID, err := DecodeID(meta)
if err != nil {
return false, err
}
Expand All @@ -71,7 +71,7 @@ func MakeStatusFunc(jr *jobs.Registry, metaType MetaType) ptreconcile.StatusFunc
}
case Schedules:
return func(ctx context.Context, txn isql.Txn, meta []byte) (shouldRemove bool, _ error) {
scheduleID, err := decodeID(meta)
scheduleID, err := DecodeID(meta)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -114,7 +114,8 @@ func encodeID(id int64) []byte {
return []byte(strconv.FormatInt(id, 10))
}

func decodeID(meta []byte) (id int64, err error) {
// DecodeID decodes ID stored in the PTS record.
func DecodeID(meta []byte) (id int64, err error) {
id, err = strconv.ParseInt(string(meta), 10, 64)
if err != nil {
return 0, errors.Wrapf(err, "failed to interpret meta %q as bytes", meta)
Expand Down
25 changes: 25 additions & 0 deletions pkg/jobs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type JobTypeMetrics struct {
// TODO (sajjad): FailOrCancelFailed metric is not updated after the modification
// of retrying all reverting jobs. Remove this metric in v22.1.
FailOrCancelFailed *metric.Counter

NumJobsWithPTS *metric.Gauge
ProtectedAge *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
Expand Down Expand Up @@ -174,6 +177,26 @@ func makeMetaFailOrCancelFailed(typeStr string) metric.Metadata {
}
}

func makeMetaProtectedCount(typeStr string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("jobs.%s.protected_record_count", typeStr),
Help: fmt.Sprintf("Number of protected timestamp records held by %s jobs", typeStr),
Measurement: "bytes",
Unit: metric.Unit_BYTES,
MetricType: io_prometheus_client.MetricType_GAUGE,
}
}

func makeMetaProtectedAge(typeStr string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("jobs.%s.protected_age_sec", typeStr),
Help: fmt.Sprintf("The age of the oldest PTS record protected by %s jobs", typeStr),
Measurement: "seconds",
Unit: metric.Unit_SECONDS,
MetricType: io_prometheus_client.MetricType_GAUGE,
}
}

var (
metaAdoptIterations = metric.Metadata{
Name: "jobs.adopt_iterations",
Expand Down Expand Up @@ -244,6 +267,8 @@ func (m *Metrics) init(histogramWindowInterval time.Duration) {
FailOrCancelCompleted: metric.NewCounter(makeMetaFailOrCancelCompeted(typeStr)),
FailOrCancelRetryError: metric.NewCounter(makeMetaFailOrCancelRetryError(typeStr)),
FailOrCancelFailed: metric.NewCounter(makeMetaFailOrCancelFailed(typeStr)),
NumJobsWithPTS: metric.NewGauge(makeMetaProtectedCount(typeStr)),
ProtectedAge: metric.NewGauge(makeMetaProtectedAge(typeStr)),
}
if opts, ok := options[jt]; ok && opts.metrics != nil {
m.JobSpecificMetrics[jt] = opts.metrics
Expand Down
33 changes: 33 additions & 0 deletions pkg/jobs/metricspoller/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "metricspoller",
srcs = [
"job_statistics.go",
"poller.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/jobs/metricspoller",
visibility = ["//visibility:public"],
deps = [
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/isql",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_prometheus_client_model//go",
],
)

get_x_data(name = "get_x_data")
Loading

0 comments on commit a0d6c19

Please sign in to comment.