diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 972889f84109..c05384bd4ec3 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1145,6 +1145,7 @@ GO_TARGETS = [ "//pkg/jobs/jobsprotectedts:jobsprotectedts", "//pkg/jobs/jobsprotectedts:jobsprotectedts_test", "//pkg/jobs/jobstest:jobstest", + "//pkg/jobs/metricspoller:metricspoller", "//pkg/jobs:jobs", "//pkg/jobs:jobs_test", "//pkg/keys:keys", @@ -2597,6 +2598,7 @@ GET_X_DATA_TARGETS = [ "//pkg/jobs/jobspb:get_x_data", "//pkg/jobs/jobsprotectedts:get_x_data", "//pkg/jobs/jobstest:get_x_data", + "//pkg/jobs/metricspoller:get_x_data", "//pkg/keys:get_x_data", "//pkg/keysbase:get_x_data", "//pkg/keyvisualizer:get_x_data", diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index 266f30446c9e..745af871c14f 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -104,12 +104,14 @@ go_test( "//pkg/base", "//pkg/clusterversion", "//pkg/jobs/jobspb", + "//pkg/jobs/jobsprotectedts", "//pkg/jobs/jobstest", "//pkg/keys", "//pkg/keyvisualizer", "//pkg/kv", "//pkg/kv/kvpb", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb", "//pkg/scheduledjobs", "//pkg/security/securityassets", diff --git a/pkg/jobs/config.go b/pkg/jobs/config.go index d178f0abcbec..a4a5bbb42636 100644 --- a/pkg/jobs/config.go +++ b/pkg/jobs/config.go @@ -74,7 +74,7 @@ const ( // defaultPollForMetricsInterval is the default interval to poll the jobs // table for metrics. - defaultPollForMetricsInterval = 10 * time.Second + defaultPollForMetricsInterval = 30 * time.Second ) var ( diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 0672b68f4bdc..f645c5d3eed7 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -32,9 +32,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/keyvisualizer" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" @@ -61,6 +63,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" "github.com/gogo/protobuf/types" @@ -3495,28 +3498,33 @@ func TestPausepoints(t *testing.T) { } } -func TestPausedMetrics(t *testing.T) { +func TestJobTypeMetrics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) skip.UnderShort(t) ctx := context.Background() - s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + // Make sure we set polling interval before we start the server. Otherwise, we + // might pick up the default value (30 second), which would make this test + // slow. + args := base.TestServerArgs{ Knobs: base.TestingKnobs{ JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), }, - }) + Settings: cluster.MakeTestingClusterSettings(), + } + jobs.PollJobsMetricsInterval.Override(ctx, &args.Settings.SV, 10*time.Millisecond) + s, sqlDB, _ := serverutils.StartServer(t, args) defer s.Stopper().Stop(ctx) - jobs.PollJobsMetricsInterval.Override(ctx, &s.ClusterSettings().SV, 10*time.Millisecond) runner := sqlutils.MakeSQLRunner(sqlDB) reg := s.JobRegistry().(*jobs.Registry) waitForPausedCount := func(typ jobspb.Type, numPaused int64) { testutils.SucceedsSoon(t, func() error { currentlyPaused := reg.MetricsStruct().JobMetrics[typ].CurrentlyPaused.Value() - if reg.MetricsStruct().JobMetrics[typ].CurrentlyPaused.Value() != numPaused { + if currentlyPaused != numPaused { return fmt.Errorf( "expected (%+v) paused jobs of type (%+v), found (%+v)", numPaused, @@ -3545,6 +3553,36 @@ func TestPausedMetrics(t *testing.T) { Username: username.TestUserName(), }, } + + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + writePTSRecord := func(jobID jobspb.JobID) (uuid.UUID, error) { + id := uuid.MakeV4() + record := jobsprotectedts.MakeRecord( + id, int64(jobID), s.Clock().Now(), nil, + jobsprotectedts.Jobs, ptpb.MakeClusterTarget(), + ) + return id, + execCfg.InternalDB.Txn(context.Background(), func(ctx context.Context, txn isql.Txn) error { + return execCfg.ProtectedTimestampProvider.WithTxn(txn).Protect(context.Background(), record) + }) + } + relesePTSRecord := func(id uuid.UUID) error { + return execCfg.InternalDB.Txn(context.Background(), func(ctx context.Context, txn isql.Txn) error { + return execCfg.ProtectedTimestampProvider.WithTxn(txn).Release(context.Background(), id) + }) + } + + checkPTSCounts := func(typ jobspb.Type, count int64) { + testutils.SucceedsSoon(t, func() error { + m := reg.MetricsStruct().JobMetrics[typ] + if m.NumJobsWithPTS.Value() == count && (count == 0 || m.ProtectedAge.Value() > 0) { + return nil + } + return errors.Newf("still waiting for PTS count to reach %d: c=%d age=%d", + count, m.NumJobsWithPTS.Value(), m.ProtectedAge.Value()) + }) + } + for typ := range typeToRecord { jobs.RegisterConstructor(typ, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{ @@ -3571,6 +3609,17 @@ func TestPausedMetrics(t *testing.T) { importJob := makeJob(context.Background(), jobspb.TypeImport) scJob := makeJob(context.Background(), jobspb.TypeSchemaChange) + // Write few PTS records + cfJobPTSID, err := writePTSRecord(cfJob.ID()) + require.NoError(t, err) + _, err = writePTSRecord(cfJob.ID()) + require.NoError(t, err) + importJobPTSID, err := writePTSRecord(importJob.ID()) + require.NoError(t, err) + + checkPTSCounts(jobspb.TypeChangefeed, 2) + checkPTSCounts(jobspb.TypeImport, 1) + // Pause all job types. runner.Exec(t, "PAUSE JOB $1", cfJob.ID()) waitForPausedCount(jobspb.TypeChangefeed, 1) @@ -3581,6 +3630,12 @@ func TestPausedMetrics(t *testing.T) { runner.Exec(t, "PAUSE JOB $1", scJob.ID()) waitForPausedCount(jobspb.TypeSchemaChange, 1) + // Release some of the pts records. + require.NoError(t, relesePTSRecord(cfJobPTSID)) + require.NoError(t, relesePTSRecord(importJobPTSID)) + checkPTSCounts(jobspb.TypeChangefeed, 1) + checkPTSCounts(jobspb.TypeImport, 0) + // Resume / cancel jobs. runner.Exec(t, "RESUME JOB $1", cfJob.ID()) waitForPausedCount(jobspb.TypeChangefeed, 1) diff --git a/pkg/jobs/jobsprotectedts/jobs_protected_ts.go b/pkg/jobs/jobsprotectedts/jobs_protected_ts.go index 5192b18a35f5..dd84cabe9046 100644 --- a/pkg/jobs/jobsprotectedts/jobs_protected_ts.go +++ b/pkg/jobs/jobsprotectedts/jobs_protected_ts.go @@ -55,7 +55,7 @@ func MakeStatusFunc(jr *jobs.Registry, metaType MetaType) ptreconcile.StatusFunc switch metaType { case Jobs: return func(ctx context.Context, txn isql.Txn, meta []byte) (shouldRemove bool, _ error) { - jobID, err := decodeID(meta) + jobID, err := DecodeID(meta) if err != nil { return false, err } @@ -71,7 +71,7 @@ func MakeStatusFunc(jr *jobs.Registry, metaType MetaType) ptreconcile.StatusFunc } case Schedules: return func(ctx context.Context, txn isql.Txn, meta []byte) (shouldRemove bool, _ error) { - scheduleID, err := decodeID(meta) + scheduleID, err := DecodeID(meta) if err != nil { return false, err } @@ -114,7 +114,8 @@ func encodeID(id int64) []byte { return []byte(strconv.FormatInt(id, 10)) } -func decodeID(meta []byte) (id int64, err error) { +// DecodeID decodes ID stored in the PTS record. +func DecodeID(meta []byte) (id int64, err error) { id, err = strconv.ParseInt(string(meta), 10, 64) if err != nil { return 0, errors.Wrapf(err, "failed to interpret meta %q as bytes", meta) diff --git a/pkg/jobs/metrics.go b/pkg/jobs/metrics.go index 3a6aa1c0e217..dcb0e1fa2c74 100644 --- a/pkg/jobs/metrics.go +++ b/pkg/jobs/metrics.go @@ -67,6 +67,9 @@ type JobTypeMetrics struct { // TODO (sajjad): FailOrCancelFailed metric is not updated after the modification // of retrying all reverting jobs. Remove this metric in v22.1. FailOrCancelFailed *metric.Counter + + NumJobsWithPTS *metric.Gauge + ProtectedAge *metric.Gauge } // MetricStruct implements the metric.Struct interface. @@ -174,6 +177,26 @@ func makeMetaFailOrCancelFailed(typeStr string) metric.Metadata { } } +func makeMetaProtectedCount(typeStr string) metric.Metadata { + return metric.Metadata{ + Name: fmt.Sprintf("jobs.%s.protected_record_count", typeStr), + Help: fmt.Sprintf("Number of protected timestamp records held by %s jobs", typeStr), + Measurement: "bytes", + Unit: metric.Unit_BYTES, + MetricType: io_prometheus_client.MetricType_GAUGE, + } +} + +func makeMetaProtectedAge(typeStr string) metric.Metadata { + return metric.Metadata{ + Name: fmt.Sprintf("jobs.%s.protected_age_sec", typeStr), + Help: fmt.Sprintf("The age of the oldest PTS record protected by %s jobs", typeStr), + Measurement: "seconds", + Unit: metric.Unit_SECONDS, + MetricType: io_prometheus_client.MetricType_GAUGE, + } +} + var ( metaAdoptIterations = metric.Metadata{ Name: "jobs.adopt_iterations", @@ -244,6 +267,8 @@ func (m *Metrics) init(histogramWindowInterval time.Duration) { FailOrCancelCompleted: metric.NewCounter(makeMetaFailOrCancelCompeted(typeStr)), FailOrCancelRetryError: metric.NewCounter(makeMetaFailOrCancelRetryError(typeStr)), FailOrCancelFailed: metric.NewCounter(makeMetaFailOrCancelFailed(typeStr)), + NumJobsWithPTS: metric.NewGauge(makeMetaProtectedCount(typeStr)), + ProtectedAge: metric.NewGauge(makeMetaProtectedAge(typeStr)), } if opts, ok := options[jt]; ok && opts.metrics != nil { m.JobSpecificMetrics[jt] = opts.metrics diff --git a/pkg/jobs/metricspoller/BUILD.bazel b/pkg/jobs/metricspoller/BUILD.bazel new file mode 100644 index 000000000000..f8f42658ae96 --- /dev/null +++ b/pkg/jobs/metricspoller/BUILD.bazel @@ -0,0 +1,33 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "metricspoller", + srcs = [ + "job_statistics.go", + "poller.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/jobs/metricspoller", + visibility = ["//visibility:public"], + deps = [ + "//pkg/jobs", + "//pkg/jobs/jobspb", + "//pkg/jobs/jobsprotectedts", + "//pkg/roachpb", + "//pkg/security/username", + "//pkg/settings/cluster", + "//pkg/sql", + "//pkg/sql/isql", + "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/metric", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_logtags//:logtags", + "@com_github_prometheus_client_model//go", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/jobs/metricspoller/job_statistics.go b/pkg/jobs/metricspoller/job_statistics.go new file mode 100644 index 000000000000..f912606a14ad --- /dev/null +++ b/pkg/jobs/metricspoller/job_statistics.go @@ -0,0 +1,146 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package metricspoller + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +const pausedJobsCountQuery = string(` + SELECT job_type, count(*) + FROM system.jobs + WHERE status = '` + jobs.StatusPaused + `' + GROUP BY job_type`) + +// updatePausedMetrics counts the number of paused jobs per job type. +func updatePausedMetrics(ctx context.Context, execCtx sql.JobExecContext) error { + var metricUpdates map[jobspb.Type]int + err := execCtx.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + // In case of transaction retries, reset this map here. + metricUpdates = make(map[jobspb.Type]int) + + // Run transaction at low priority to ensure that it does not + // contend with foreground reads. + if err := txn.KV().SetUserPriority(roachpb.MinUserPriority); err != nil { + return err + } + rows, err := txn.QueryBufferedEx( + ctx, "poll-jobs-metrics-job", txn.KV(), sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, + pausedJobsCountQuery, + ) + if err != nil { + return errors.Wrap(err, "could not query jobs table") + } + + for _, row := range rows { + typeString := *row[0].(*tree.DString) + count := *row[1].(*tree.DInt) + typ, err := jobspb.TypeFromString(string(typeString)) + if err != nil { + return err + } + metricUpdates[typ] = int(count) + } + + return nil + }) + if err != nil { + return err + } + + metrics := execCtx.ExecCfg().JobRegistry.MetricsStruct() + for _, v := range jobspb.Type_value { + if metrics.JobMetrics[v] != nil { + metrics.JobMetrics[v].CurrentlyPaused.Update(int64(metricUpdates[jobspb.Type(v)])) + } + } + return nil +} + +// updatePTSStats update protected timestamp statistics per job type. +func updatePTSStats(ctx context.Context, execCtx sql.JobExecContext) error { + type ptsStat struct { + numRecords int64 + oldest hlc.Timestamp + } + var ptsStats map[jobspb.Type]*ptsStat + + execCfg := execCtx.ExecCfg() + if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + ptsStats = make(map[jobspb.Type]*ptsStat) + ptsState, err := execCfg.ProtectedTimestampProvider.WithTxn(txn).GetState(ctx) + if err != nil { + return err + } + for _, rec := range ptsState.Records { + if rec.MetaType != jobsprotectedts.GetMetaType(jobsprotectedts.Jobs) { + continue + } + id, err := jobsprotectedts.DecodeID(rec.Meta) + if err != nil { + return err + } + j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(id), txn) + if err != nil { + continue + } + p := j.Payload() + stats := ptsStats[p.Type()] + if stats == nil { + stats = &ptsStat{} + ptsStats[p.Type()] = stats + } + stats.numRecords++ + if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) { + stats.oldest = rec.Timestamp + } + } + + return nil + }); err != nil { + return err + } + + jobMetrics := execCtx.ExecCfg().JobRegistry.MetricsStruct() + for typ := 0; typ < jobspb.NumJobTypes; typ++ { + if jobspb.Type(typ) == jobspb.TypeUnspecified { // do not track TypeUnspecified + continue + } + m := jobMetrics.JobMetrics[typ] + stats, found := ptsStats[jobspb.Type(typ)] + if found { + m.NumJobsWithPTS.Update(stats.numRecords) + if stats.oldest.WallTime > 0 { + m.ProtectedAge.Update((execCfg.Clock.Now().WallTime - stats.oldest.WallTime) / 1e9) + } else { + m.ProtectedAge.Update(0) + } + } else { + // If we haven't found PTS records for a job type, then reset stats. + m.NumJobsWithPTS.Update(0) + m.ProtectedAge.Update(0) + } + } + + return nil +} diff --git a/pkg/jobs/metricspoller/poller.go b/pkg/jobs/metricspoller/poller.go new file mode 100644 index 000000000000..186687f58607 --- /dev/null +++ b/pkg/jobs/metricspoller/poller.go @@ -0,0 +1,110 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package metricspoller + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/logtags" + io_prometheus_client "github.com/prometheus/client_model/go" +) + +// metricsPoller is a singleton job whose purpose is to poll various metrics +// periodically. These metrics are meant to be cluster wide metrics -- for +// example, number of jobs currently paused in the cluster. While such metrics +// could be implemented locally by each node, doing so would result in the +// metric being inflated by the number of nodes. That's not ideal, and that's +// what the purpose of this job is: namely, to provide a convenient way to query +// various aspects of cluster state, and make that state available via correctly +// counted metrics. + +type metricsPoller struct { + job *jobs.Job +} + +var _ jobs.Resumer = &metricsPoller{} + +// OnFailOrCancel is a part of the Resumer interface. +func (mp *metricsPoller) OnFailOrCancel( + ctx context.Context, execCtx interface{}, jobErr error, +) error { + return nil +} + +// Resume is part of the Resumer interface. +func (mp *metricsPoller) Resume(ctx context.Context, execCtx interface{}) error { + // The metrics polling job is a forever running background job. It's always + // safe to wind the SQL pod down whenever it's running, something we + // indicate through the job's idle status. + mp.job.MarkIdle(true) + + exec := execCtx.(sql.JobExecContext) + metrics := exec.ExecCfg().JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypePollJobsStats].(pollerMetrics) + + t := timeutil.NewTimer() + defer t.Stop() + + runTask := func(name string, task func(ctx context.Context, execCtx sql.JobExecContext) error) error { + ctx = logtags.AddTag(ctx, "task", name) + return task(ctx, exec) + } + + for { + t.Reset(jobs.PollJobsMetricsInterval.Get(&exec.ExecCfg().Settings.SV)) + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + t.Read = true + if err := runTask("paused-jobs", updatePausedMetrics); err != nil { + log.Errorf(ctx, "Periodic stats collector task paused-jobs completed with error %s", err) + metrics.numErrors.Inc(1) + } + if err := runTask("pts-stats", updatePTSStats); err != nil { + log.Errorf(ctx, "Periodic stats collector task pts-stats completed with error %s", err) + metrics.numErrors.Inc(1) + } + } + } +} + +type pollerMetrics struct { + numErrors *metric.Counter +} + +func (m pollerMetrics) MetricStruct() {} + +func newPollerMetrics() metric.Struct { + return pollerMetrics{ + numErrors: metric.NewCounter(metric.Metadata{ + Name: "jobs.metrics.task_failed", + Help: "Number of metrics poller tasks that failed", + Measurement: "errors", + Unit: metric.Unit_COUNT, + MetricType: io_prometheus_client.MetricType_COUNTER, + }), + } +} + +func init() { + createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + return &metricsPoller{job: job} + } + jobs.RegisterConstructor(jobspb.TypePollJobsStats, createResumerFn, + jobs.DisablesTenantCostControl, jobs.WithJobMetrics(newPollerMetrics())) +} diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 52eea1881eed..3062077873ac 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -1148,83 +1148,6 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { }) } -// PollMetricsTask polls the jobs table for certain metrics at an interval. -func (r *Registry) PollMetricsTask(ctx context.Context) error { - var err error - t := timeutil.NewTimer() - defer t.Stop() - updateMetrics := func(ctx context.Context, s sqlliveness.Session) { - for { - t.Reset(PollJobsMetricsInterval.Get(&r.settings.SV)) - select { - case <-ctx.Done(): - err = ctx.Err() - return - case <-t.C: - t.Read = true - if err = r.updatePausedMetrics(ctx, s); err != nil { - log.Errorf(ctx, "failed to update paused metrics: %v", err) - return - } - } - } - } - r.withSession(ctx, updateMetrics) - return err -} - -const pausedJobsCountQuery = string(` - SELECT job_type, count(*) - FROM system.jobs - WHERE status = '` + StatusPaused + `' - GROUP BY job_type`) - -func (r *Registry) updatePausedMetrics(ctx context.Context, s sqlliveness.Session) error { - var metricUpdates map[jobspb.Type]int - err := r.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - // In case of transaction retries, reset this map here. - metricUpdates = make(map[jobspb.Type]int) - - // Run the claim transaction at low priority to ensure that it does not - // contend with foreground reads. - if err := txn.KV().SetUserPriority(roachpb.MinUserPriority); err != nil { - return err - } - rows, err := txn.QueryBufferedEx( - ctx, "poll-jobs-metrics-job", txn.KV(), sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, - pausedJobsCountQuery, - ) - if err != nil { - return errors.Wrap(err, "could not query jobs table") - } - - for _, row := range rows { - typeString := *row[0].(*tree.DString) - count := *row[1].(*tree.DInt) - typ, err := jobspb.TypeFromString(string(typeString)) - if err != nil { - return err - } - metricUpdates[typ] = int(count) - } - - return nil - }) - if err == nil { - for _, v := range jobspb.Type_value { - if r.metrics.JobMetrics[v] != nil { - if _, ok := metricUpdates[jobspb.Type(v)]; ok { - r.metrics.JobMetrics[v].CurrentlyPaused.Update(int64(metricUpdates[jobspb.Type(v)])) - } else { - r.metrics.JobMetrics[v].CurrentlyPaused.Update(0) - } - } - } - } - - return err -} - func (r *Registry) maybeCancelJobs(ctx context.Context, s sqlliveness.Session) { r.mu.Lock() defer r.mu.Unlock() diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 489a4dfb6c95..e09527891b7a 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -138,7 +138,6 @@ go_library( "inverted_join.go", "job_exec_context.go", "job_exec_context_test_util.go", - "job_statistics.go", "jobs_collection.go", "join.go", "join_predicate.go", diff --git a/pkg/sql/job_statistics.go b/pkg/sql/job_statistics.go deleted file mode 100644 index 436e92c4a9bd..000000000000 --- a/pkg/sql/job_statistics.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2023 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package sql - -import ( - "context" - - "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" -) - -type metricsPoller struct { - job *jobs.Job -} - -var _ jobs.Resumer = &metricsPoller{} - -// OnFailOrCancel is a part of the Resumer interface. -func (mp *metricsPoller) OnFailOrCancel( - ctx context.Context, execCtx interface{}, jobErr error, -) error { - return nil -} - -// Resume is part of the Resumer interface. -func (mp *metricsPoller) Resume(ctx context.Context, execCtx interface{}) error { - // The metrics polling job is a forever running background job. It's always - // safe to wind the SQL pod down whenever it's running, something we - // indicate through the job's idle status. - mp.job.MarkIdle(true) - - exec := execCtx.(JobExecContext) - return exec.ExecCfg().JobRegistry.PollMetricsTask(ctx) -} - -func init() { - createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { - return &metricsPoller{job: job} - } - jobs.RegisterConstructor(jobspb.TypePollJobsStats, createResumerFn, jobs.UsesTenantCostControl) -} diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 67dfa14e08d9..ceab4da6dcfe 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -10,6 +10,13 @@ package catalog +import ( + "fmt" + "strings" + + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" +) + // chart_catalog.go represents a catalog of pre-defined DB Console charts // to aid users in debugging CockroachDB clusters. This file represents // a simplified structure of the catalog, meant to make it easier for @@ -3383,74 +3390,11 @@ var charts = []sectionDescription{ "jobs.running_non_idle", }, }, - { - Title: "Currently Running", - Metrics: []string{ - "jobs.auto_create_stats.currently_running", - "jobs.backup.currently_running", - "jobs.changefeed.currently_running", - "jobs.create_stats.currently_running", - "jobs.import.currently_running", - "jobs.restore.currently_running", - "jobs.schema_change.currently_running", - "jobs.new_schema_change.currently_running", - "jobs.schema_change_gc.currently_running", - "jobs.typedesc_schema_change.currently_running", - "jobs.stream_ingestion.currently_running", - "jobs.migration.currently_running", - "jobs.auto_span_config_reconciliation.currently_running", - "jobs.auto_sql_stats_compaction.currently_running", - "jobs.stream_replication.currently_running", - "jobs.key_visualizer.currently_running", - "jobs.poll_jobs_stats.currently_running", - }, - }, - { - Title: "Currently Idle", - Metrics: []string{ - "jobs.auto_create_stats.currently_idle", - "jobs.auto_span_config_reconciliation.currently_idle", - "jobs.auto_sql_stats_compaction.currently_idle", - "jobs.backup.currently_idle", - "jobs.changefeed.currently_idle", - "jobs.create_stats.currently_idle", - "jobs.import.currently_idle", - "jobs.migration.currently_idle", - "jobs.new_schema_change.currently_idle", - "jobs.restore.currently_idle", - "jobs.schema_change.currently_idle", - "jobs.schema_change_gc.currently_idle", - "jobs.stream_ingestion.currently_idle", - "jobs.stream_replication.currently_idle", - "jobs.typedesc_schema_change.currently_idle", - "jobs.key_visualizer.currently_idle", - "jobs.poll_jobs_stats.currently_idle", - }, - }, - { - Title: "Currently Paused", - Metrics: []string{ - "jobs.auto_create_stats.currently_paused", - "jobs.auto_span_config_reconciliation.currently_paused", - "jobs.auto_sql_stats_compaction.currently_paused", - "jobs.backup.currently_paused", - "jobs.changefeed.currently_paused", - "jobs.create_stats.currently_paused", - "jobs.import.currently_paused", - "jobs.migration.currently_paused", - "jobs.new_schema_change.currently_paused", - "jobs.restore.currently_paused", - "jobs.schema_change.currently_paused", - "jobs.schema_change_gc.currently_paused", - "jobs.stream_ingestion.currently_paused", - "jobs.stream_replication.currently_paused", - "jobs.typedesc_schema_change.currently_paused", - "jobs.auto_schema_telemetry.currently_paused", - "jobs.row_level_ttl.currently_paused", - "jobs.poll_jobs_stats.currently_paused", - "jobs.key_visualizer.currently_paused", - }, - }, + jobTypeCharts("Currently Running", "currently_running"), + jobTypeCharts("Currently Idle", "currently_idle"), + jobTypeCharts("Currently Paused", "currently_paused"), + jobTypeCharts("PTS Age", "protected_age_sec"), + jobTypeCharts("PTS Record Count", "protected_record_count"), { Title: "Auto Create Stats", Metrics: []string{ @@ -3941,3 +3885,19 @@ var charts = []sectionDescription{ }, }, } + +func jobTypeCharts(title string, varName string) chartDescription { + var metrics []string + for i := 0; i < jobspb.NumJobTypes; i++ { + jt := jobspb.Type(i) + if jt == jobspb.TypeUnspecified { + continue + } + metrics = append(metrics, + fmt.Sprintf("jobs.%s.%s", strings.ToLower(jobspb.Type_name[int32(i)]), varName)) + } + return chartDescription{ + Title: title, + Metrics: metrics, + } +} diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index f39274e13dc2..442805783691 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -43,6 +43,7 @@ go_library( "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/jobs/metricspoller", "//pkg/keys", "//pkg/keyvisualizer/keyvisjob", "//pkg/kv", diff --git a/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go b/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go index 332b795fd602..83d5cc2ecca8 100644 --- a/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go +++ b/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + _ "github.com/cockroachdb/cockroach/pkg/jobs/metricspoller" // Ensure job implementation is linked. "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"