From fb89d23c60a971caa4b9c773c66c277f8b603646 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 6 Feb 2023 11:16:16 -0500 Subject: [PATCH] jobs: add metrics for paused jobs This change adds new metrics to count paused jobs for every job type. For example, the metric for paused changefeed jobs is `jobs.changefeed.currently_paused`. These metrics are counted at an interval defined by the cluster setting `jobs.metrics.interval.poll`. This is implemented by a job which periodically queries `system.jobs` to count the number of paused jobs. This job is of the newly added type `jobspb.TypePollJobsStats`. When a node starts it's job registry, it will create an adoptable stats polling job if it does not exist already using a transaction. This change adds a test which pauses and resumes changefeeds while asserting the value of the `jobs.changefeed.currently_paused` metric. It also adds a logictest to ensure one instance of the stats polling job is created in a cluster. Resolves: https://github.com/cockroachdb/cockroach/issues/85467 Release note (general change): This change adds new metrics to count paused jobs for every job type. For example, the metric for paused changefeed jobs is `jobs.changefeed.currently_paused`. These metrics are updated at an interval defined by the cluster setting `jobs.metrics.interval.poll`, which is defauled to 10 seconds. Epic: None --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/BUILD.bazel | 2 + pkg/cli/testdata/doctor/test_examine_cluster | 2 +- pkg/clusterversion/cockroach_versions.go | 8 ++ pkg/jobs/config.go | 15 +++ pkg/jobs/jobs_test.go | 102 +++++++++++++++++- pkg/jobs/jobspb/BUILD.bazel | 12 ++- pkg/jobs/jobspb/jobs.proto | 13 +++ pkg/jobs/jobspb/wrap.go | 25 ++++- pkg/jobs/jobspb/wrap_test.go | 30 ++++++ pkg/jobs/metrics.go | 13 +++ pkg/jobs/registry.go | 82 ++++++++++++++ pkg/jobs/registry_test.go | 22 ++-- pkg/jobs/testing_knobs.go | 10 +- pkg/jobs/update.go | 4 +- pkg/jobs/utils.go | 49 +++++++++ pkg/sql/BUILD.bazel | 1 + pkg/sql/job_statistics.go | 50 +++++++++ pkg/sql/logictest/testdata/logic_test/jobs | 6 ++ pkg/ts/catalog/chart_catalog.go | 37 +++++++ pkg/upgrade/upgradebase/testing_knobs.go | 5 + pkg/upgrade/upgrademanager/manager.go | 1 + pkg/upgrade/upgrades/BUILD.bazel | 2 + .../create_jobs_metrics_polling_job.go | 61 +++++++++++ .../create_jobs_metrics_polling_job_test.go | 69 ++++++++++++ pkg/upgrade/upgrades/upgrades.go | 5 + 27 files changed, 607 insertions(+), 23 deletions(-) create mode 100644 pkg/jobs/jobspb/wrap_test.go create mode 100644 pkg/sql/job_statistics.go create mode 100644 pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go create mode 100644 pkg/upgrade/upgrades/create_jobs_metrics_polling_job_test.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index aa505b41e955..c479e777acb0 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -298,4 +298,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 1000022.2-36 set the active cluster version in the format '.' +version version 1000022.2-38 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 62825f595d8a..c87cd09422a8 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -237,6 +237,6 @@
trace.opentelemetry.collector
stringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled
booleantrueif set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector
stringthe address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. -
version
version1000022.2-36set the active cluster version in the format '<major>.<minor>' +
version
version1000022.2-38set the active cluster version in the format '<major>.<minor>' diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 861d35784cd7..fed1bf490628 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -169,6 +169,7 @@ ALL_TESTS = [ "//pkg/internal/team:team_test", "//pkg/jobs/joberror:joberror_test", "//pkg/jobs/jobsauth:jobsauth_test", + "//pkg/jobs/jobspb:jobspb_test", "//pkg/jobs/jobsprotectedts:jobsprotectedts_test", "//pkg/jobs:jobs_test", "//pkg/keys:keys_test", @@ -1123,6 +1124,7 @@ GO_TARGETS = [ "//pkg/jobs/jobsauth:jobsauth", "//pkg/jobs/jobsauth:jobsauth_test", "//pkg/jobs/jobspb:jobspb", + "//pkg/jobs/jobspb:jobspb_test", "//pkg/jobs/jobsprotectedts:jobsprotectedts", "//pkg/jobs/jobsprotectedts:jobsprotectedts_test", "//pkg/jobs/jobstest:jobstest", diff --git a/pkg/cli/testdata/doctor/test_examine_cluster b/pkg/cli/testdata/doctor/test_examine_cluster index acf9b22e551f..6da9dc1183d4 100644 --- a/pkg/cli/testdata/doctor/test_examine_cluster +++ b/pkg/cli/testdata/doctor/test_examine_cluster @@ -3,5 +3,5 @@ debug doctor examine cluster debug doctor examine cluster Examining 53 descriptors and 52 namespace entries... ParentID 100, ParentSchemaID 101: relation "foo" (105): expected matching namespace entry, found none -Examining 12 jobs... +Examining 14 jobs... ERROR: validation failed diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 69249a29b130..2681bf116029 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -410,6 +410,10 @@ const ( V23_1_DeleteDroppedFunctionDescriptors + // V23_1_CreateJobsMetricsPollingJob creates the permanent job + // responsible for polling the jobs table for metrics. + V23_1_CreateJobsMetricsPollingJob + // ************************************************* // Step (1): Add new versions here. // Do not add new versions to a patch release. @@ -711,6 +715,10 @@ var rawVersionsSingleton = keyedVersions{ Key: V23_1_DeleteDroppedFunctionDescriptors, Version: roachpb.Version{Major: 22, Minor: 2, Internal: 36}, }, + { + Key: V23_1_CreateJobsMetricsPollingJob, + Version: roachpb.Version{Major: 22, Minor: 2, Internal: 38}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/jobs/config.go b/pkg/jobs/config.go index 3bfbc7c439d2..d178f0abcbec 100644 --- a/pkg/jobs/config.go +++ b/pkg/jobs/config.go @@ -32,6 +32,7 @@ const ( executionErrorsMaxEntriesKey = "jobs.execution_errors.max_entries" executionErrorsMaxEntrySizeKey = "jobs.execution_errors.max_entry_size" debugPausePointsSettingKey = "jobs.debug.pausepoints" + metricsPollingIntervalKey = "jobs.metrics.interval.poll" ) const ( @@ -70,6 +71,10 @@ const ( // error. If this size is exceeded, the error will be formatted as a string // and then truncated to fit the size. defaultExecutionErrorsMaxEntrySize = 64 << 10 // 64 KiB + + // defaultPollForMetricsInterval is the default interval to poll the jobs + // table for metrics. + defaultPollForMetricsInterval = 10 * time.Second ) var ( @@ -100,6 +105,16 @@ var ( settings.PositiveDuration, ) + // PollJobsMetricsInterval is the interval at which a tenant in the cluster + // will poll the jobs table for metrics + PollJobsMetricsInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + metricsPollingIntervalKey, + "the interval at which a node in the cluster will poll the jobs table for metrics", + defaultPollForMetricsInterval, + settings.PositiveDuration, + ) + gcIntervalSetting = settings.RegisterDurationSetting( settings.TenantWritable, gcIntervalSettingKey, diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 940d5f254e60..55e1425221cf 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -229,7 +229,8 @@ func (rts *registryTestSuite) setUp(t *testing.T) { ManagerDisableJobCreation: true, } args.Knobs.UpgradeManager = &upgradebase.TestingKnobs{ - DontUseJobs: true, + DontUseJobs: true, + SkipJobMetricsPollingJobBootstrap: true, } args.Knobs.KeyVisualizer = &keyvisualizer.TestingKnobs{SkipJobBootstrap: true} @@ -3456,3 +3457,102 @@ func TestPausepoints(t *testing.T) { }) } } + +func TestPausedMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }) + defer s.Stopper().Stop(ctx) + + jobs.PollJobsMetricsInterval.Override(ctx, &s.ClusterSettings().SV, 10*time.Millisecond) + + runner := sqlutils.MakeSQLRunner(sqlDB) + reg := s.JobRegistry().(*jobs.Registry) + + waitForPausedCount := func(typ jobspb.Type, numPaused int64) { + testutils.SucceedsSoon(t, func() error { + currentlyPaused := reg.MetricsStruct().JobMetrics[typ].CurrentlyPaused.Value() + if reg.MetricsStruct().JobMetrics[typ].CurrentlyPaused.Value() != numPaused { + return fmt.Errorf( + "expected (%+v) paused jobs of type (%+v), found (%+v)", + numPaused, + typ, + currentlyPaused, + ) + } + return nil + }) + } + + typeToRecord := map[jobspb.Type]jobs.Record{ + jobspb.TypeChangefeed: { + Details: jobspb.ChangefeedDetails{}, + Progress: jobspb.ChangefeedProgress{}, + Username: username.TestUserName(), + }, + jobspb.TypeImport: { + Details: jobspb.ImportDetails{}, + Progress: jobspb.ImportProgress{}, + Username: username.TestUserName(), + }, + jobspb.TypeSchemaChange: { + Details: jobspb.SchemaChangeDetails{}, + Progress: jobspb.SchemaChangeProgress{}, + Username: username.TestUserName(), + }, + } + for typ := range typeToRecord { + jobs.RegisterConstructor(typ, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { + return jobs.FakeResumer{ + OnResume: func(ctx context.Context) error { + <-ctx.Done() + return ctx.Err() + }, + } + }, jobs.UsesTenantCostControl) + } + + makeJob := func(ctx context.Context, + typ jobspb.Type, + ) *jobs.StartableJob { + j, err := jobs.TestingCreateAndStartJob(ctx, reg, s.InternalDB().(isql.DB), typeToRecord[typ]) + if err != nil { + t.Fatal(err) + } + return j + } + + cfJob := makeJob(context.Background(), jobspb.TypeChangefeed) + cfJob2 := makeJob(context.Background(), jobspb.TypeChangefeed) + importJob := makeJob(context.Background(), jobspb.TypeImport) + scJob := makeJob(context.Background(), jobspb.TypeSchemaChange) + + // Pause all job types. + runner.Exec(t, "PAUSE JOB $1", cfJob.ID()) + waitForPausedCount(jobspb.TypeChangefeed, 1) + runner.Exec(t, "PAUSE JOB $1", cfJob2.ID()) + waitForPausedCount(jobspb.TypeChangefeed, 2) + runner.Exec(t, "PAUSE JOB $1", importJob.ID()) + waitForPausedCount(jobspb.TypeImport, 1) + runner.Exec(t, "PAUSE JOB $1", scJob.ID()) + waitForPausedCount(jobspb.TypeSchemaChange, 1) + + // Resume / cancel jobs. + runner.Exec(t, "RESUME JOB $1", cfJob.ID()) + waitForPausedCount(jobspb.TypeChangefeed, 1) + runner.Exec(t, "CANCEL JOB $1", cfJob2.ID()) + waitForPausedCount(jobspb.TypeChangefeed, 0) + runner.Exec(t, "RESUME JOB $1", importJob.ID()) + waitForPausedCount(jobspb.TypeImport, 0) + runner.Exec(t, "CANCEL JOB $1", scJob.ID()) + waitForPausedCount(jobspb.TypeSchemaChange, 0) + + runner.Exec(t, "CANCEL JOB $1", cfJob.ID()) + runner.Exec(t, "CANCEL JOB $1", importJob.ID()) +} diff --git a/pkg/jobs/jobspb/BUILD.bazel b/pkg/jobs/jobspb/BUILD.bazel index 4c27551604a4..52677f1acc61 100644 --- a/pkg/jobs/jobspb/BUILD.bazel +++ b/pkg/jobs/jobspb/BUILD.bazel @@ -1,7 +1,7 @@ load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") load("@rules_proto//proto:defs.bzl", "proto_library") load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "jobspb", @@ -64,4 +64,14 @@ go_proto_library( ], ) +go_test( + name = "jobspb_test", + srcs = ["wrap_test.go"], + args = ["-test.timeout=295s"], + deps = [ + ":jobspb", + "@com_github_stretchr_testify//assert", + ], +) + get_x_data(name = "get_x_data") diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 2b2503812488..7982a35d4eb1 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -1149,6 +1149,12 @@ message SchemaTelemetryDetails { message SchemaTelemetryProgress { } +message PollJobsStatsDetails { +} + +message PollJobsStatsProgress { +} + message Payload { string description = 1; // If empty, the description is assumed to be the statement. @@ -1200,7 +1206,12 @@ message Payload { // and publish it to the telemetry event log. These jobs are typically // created by a built-in schedule named "sql-schema-telemetry". SchemaTelemetryDetails schema_telemetry = 37; + KeyVisualizerDetails keyVisualizerDetails = 38; + + // PollJobsStats jobs poll the jobs table for statistics metrics as the number of + // paused jobs. + PollJobsStatsDetails poll_jobs_stats = 39; } reserved 26; // PauseReason is used to describe the reason that the job is currently paused @@ -1263,6 +1274,7 @@ message Progress { RowLevelTTLProgress row_level_ttl = 25 [(gogoproto.customname)="RowLevelTTL"]; SchemaTelemetryProgress schema_telemetry = 26; KeyVisualizerProgress keyVisualizerProgress = 27; + PollJobsStatsProgress pollJobsStats = 28; } uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"]; @@ -1293,6 +1305,7 @@ enum Type { ROW_LEVEL_TTL = 16 [(gogoproto.enumvalue_customname) = "TypeRowLevelTTL"]; AUTO_SCHEMA_TELEMETRY = 17 [(gogoproto.enumvalue_customname) = "TypeAutoSchemaTelemetry"]; KEY_VISUALIZER = 18 [(gogoproto.enumvalue_customname) = "TypeKeyVisualizer"]; + POLL_JOBS_STATS = 19 [(gogoproto.enumvalue_customname) = "TypePollJobsStats"]; } message Job { diff --git a/pkg/jobs/jobspb/wrap.go b/pkg/jobs/jobspb/wrap.go index 8bfb07805a95..94a101b146f5 100644 --- a/pkg/jobs/jobspb/wrap.go +++ b/pkg/jobs/jobspb/wrap.go @@ -145,6 +145,7 @@ var AutomaticJobTypes = [...]Type{ TypeAutoSpanConfigReconciliation, TypeAutoSQLStatsCompaction, TypeAutoSchemaTelemetry, + TypePollJobsStats, } // DetailsType returns the type for a payload detail. @@ -188,6 +189,8 @@ func DetailsType(d isPayload_Details) (Type, error) { return TypeAutoSchemaTelemetry, nil case *Payload_KeyVisualizerDetails: return TypeKeyVisualizer, nil + case *Payload_PollJobsStats: + return TypePollJobsStats, nil default: return TypeUnspecified, errors.Newf("Payload.Type called on a payload with an unknown details type: %T", d) } @@ -227,6 +230,7 @@ var JobDetailsForEveryJobType = map[Type]Details{ TypeRowLevelTTL: RowLevelTTLDetails{}, TypeAutoSchemaTelemetry: SchemaTelemetryDetails{}, TypeKeyVisualizer: KeyVisualizerDetails{}, + TypePollJobsStats: PollJobsStatsDetails{}, } // WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper @@ -272,6 +276,8 @@ func WrapProgressDetails(details ProgressDetails) interface { return &Progress_SchemaTelemetry{SchemaTelemetry: &d} case KeyVisualizerProgress: return &Progress_KeyVisualizerProgress{KeyVisualizerProgress: &d} + case PollJobsStatsProgress: + return &Progress_PollJobsStats{PollJobsStats: &d} default: panic(errors.AssertionFailedf("WrapProgressDetails: unknown progress type %T", d)) } @@ -315,6 +321,8 @@ func (p *Payload) UnwrapDetails() Details { return *d.SchemaTelemetry case *Payload_KeyVisualizerDetails: return *d.KeyVisualizerDetails + case *Payload_PollJobsStats: + return *d.PollJobsStats default: return nil } @@ -358,6 +366,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails { return *d.SchemaTelemetry case *Progress_KeyVisualizerProgress: return *d.KeyVisualizerProgress + case *Progress_PollJobsStats: + return *d.PollJobsStats default: return nil } @@ -371,6 +381,17 @@ func (t Type) String() string { return strings.Replace(Type_name[int32(t)], "_", " ", -1) } +// TypeFromString is used to get the type corresponding to the string s +// where s := Type.String(). +func TypeFromString(s string) (Type, error) { + s = strings.Replace(s, " ", "_", -1) + t, ok := Type_value[s] + if !ok { + return TypeUnspecified, errors.New("invalid type string") + } + return Type(t), nil +} + // WrapPayloadDetails wraps a Details object in the protobuf wrapper struct // necessary to make it usable as the Details field of a Payload. // @@ -414,6 +435,8 @@ func WrapPayloadDetails(details Details) interface { return &Payload_SchemaTelemetry{SchemaTelemetry: &d} case KeyVisualizerDetails: return &Payload_KeyVisualizerDetails{KeyVisualizerDetails: &d} + case PollJobsStatsDetails: + return &Payload_PollJobsStats{PollJobsStats: &d} default: panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d)) } @@ -449,7 +472,7 @@ const ( func (Type) SafeValue() {} // NumJobTypes is the number of jobs types. -const NumJobTypes = 19 +const NumJobTypes = 20 // ChangefeedDetailsMarshaler allows for dependency injection of // cloud.SanitizeExternalStorageURI to avoid the dependency from this diff --git a/pkg/jobs/jobspb/wrap_test.go b/pkg/jobs/jobspb/wrap_test.go new file mode 100644 index 000000000000..7dc4e8259a8d --- /dev/null +++ b/pkg/jobs/jobspb/wrap_test.go @@ -0,0 +1,30 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package jobspb_test + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/stretchr/testify/assert" +) + +func TestTypeString(t *testing.T) { + for i := 0; i < jobspb.NumJobTypes; i++ { + typ := jobspb.Type(i) + typStr := typ.String() + convertedType, err := jobspb.TypeFromString(typStr) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, convertedType, typ) + } +} diff --git a/pkg/jobs/metrics.go b/pkg/jobs/metrics.go index 4ff9399b6e87..4c480f833ac5 100644 --- a/pkg/jobs/metrics.go +++ b/pkg/jobs/metrics.go @@ -47,6 +47,7 @@ type Metrics struct { type JobTypeMetrics struct { CurrentlyRunning *metric.Gauge CurrentlyIdle *metric.Gauge + CurrentlyPaused *metric.Gauge ResumeCompleted *metric.Counter ResumeRetryError *metric.Counter ResumeFailed *metric.Counter @@ -82,6 +83,17 @@ func makeMetaCurrentlyIdle(typeStr string) metric.Metadata { } } +func makeMetaCurrentlyPaused(typeStr string) metric.Metadata { + return metric.Metadata{ + Name: fmt.Sprintf("jobs.%s.currently_paused", typeStr), + Help: fmt.Sprintf("Number of %s jobs currently considered Paused", + typeStr), + Measurement: "jobs", + Unit: metric.Unit_COUNT, + MetricType: io_prometheus_client.MetricType_GAUGE, + } +} + func makeMetaResumeCompeted(typeStr string) metric.Metadata { return metric.Metadata{ Name: fmt.Sprintf("jobs.%s.resume_completed", typeStr), @@ -214,6 +226,7 @@ func (m *Metrics) init(histogramWindowInterval time.Duration) { m.JobMetrics[jt] = &JobTypeMetrics{ CurrentlyRunning: metric.NewGauge(makeMetaCurrentlyRunning(typeStr)), CurrentlyIdle: metric.NewGauge(makeMetaCurrentlyIdle(typeStr)), + CurrentlyPaused: metric.NewGauge(makeMetaCurrentlyPaused(typeStr)), ResumeCompleted: metric.NewCounter(makeMetaResumeCompeted(typeStr)), ResumeRetryError: metric.NewCounter(makeMetaResumeRetryError(typeStr)), ResumeFailed: metric.NewCounter(makeMetaResumeFailed(typeStr)), diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index a8f47562e251..65f8cd15ad6f 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -289,6 +289,9 @@ const ( // KeyVisualizerJobID A static job ID is used to easily check if the // Key Visualizer job already exists. KeyVisualizerJobID = jobspb.JobID(100) + + // JobMetricsPollerJobID A static job ID is used for the job metrics polling job. + JobMetricsPollerJobID = jobspb.JobID(101) ) // MakeJobID generates a new job ID. @@ -1133,6 +1136,85 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { }) } +// PollMetricsTask polls the jobs table for certain metrics at an interval. +func (r *Registry) PollMetricsTask(ctx context.Context) error { + var err error + updateMetrics := func(ctx context.Context, s sqlliveness.Session) { + lc, cleanup := makeLoopController(r.settings, PollJobsMetricsInterval, nil) + defer cleanup() + for { + select { + case <-ctx.Done(): + err = ctx.Err() + return + case <-lc.updated: + lc.onUpdate() + case <-lc.timer.C: + lc.timer.Read = true + if err = r.updatePausedMetrics(ctx, s); err != nil { + log.Errorf(ctx, "failed to update paused metrics: %v", err) + return + } + lc.onExecute() + } + } + } + r.withSession(ctx, updateMetrics) + return err +} + +const pausedJobsCountQuery = string(` + SELECT job_type, count(*) + FROM system.jobs + WHERE status = '` + StatusPaused + `' + GROUP BY job_type`) + +func (r *Registry) updatePausedMetrics(ctx context.Context, s sqlliveness.Session) error { + var metricUpdates map[jobspb.Type]int + err := r.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + // In case of transaction retries, reset this map here. + metricUpdates = make(map[jobspb.Type]int) + + // Run the claim transaction at low priority to ensure that it does not + // contend with foreground reads. + if err := txn.KV().SetUserPriority(roachpb.MinUserPriority); err != nil { + return err + } + rows, err := txn.QueryBufferedEx( + ctx, "poll-jobs-metrics-job", txn.KV(), sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, + pausedJobsCountQuery, s.ID().UnsafeBytes(), r.ID(), + ) + if err != nil { + return errors.Wrap(err, "could not query jobs table") + } + + for _, row := range rows { + typeString := *row[0].(*tree.DString) + count := *row[1].(*tree.DInt) + typ, err := jobspb.TypeFromString(string(typeString)) + if err != nil { + return err + } + metricUpdates[typ] = int(count) + } + + return nil + }) + if err == nil { + for _, v := range jobspb.Type_value { + if r.metrics.JobMetrics[v] != nil { + if _, ok := metricUpdates[jobspb.Type(v)]; ok { + r.metrics.JobMetrics[v].CurrentlyPaused.Update(int64(metricUpdates[jobspb.Type(v)])) + } else { + r.metrics.JobMetrics[v].CurrentlyPaused.Update(0) + } + } + } + } + + return err +} + func (r *Registry) maybeCancelJobs(ctx context.Context, s sqlliveness.Session) { r.mu.Lock() defer r.mu.Unlock() diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index d21525aeb070..52a606f84bb3 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -123,11 +123,13 @@ func TestRegistryGC(t *testing.T) { }, UpgradeManager: &upgradebase.TestingKnobs{ // This test wants to look at job records. - DontUseJobs: true, + DontUseJobs: true, + SkipJobMetricsPollingJobBootstrap: true, }, KeyVisualizer: &keyvisualizer.TestingKnobs{ SkipJobBootstrap: true, }, + JobsTestingKnobs: NewTestingKnobsWithShortIntervals(), }, }) defer s.Stopper().Stop(ctx) @@ -271,11 +273,13 @@ func TestRegistryGCPagination(t *testing.T) { }, UpgradeManager: &upgradebase.TestingKnobs{ // This test wants to count job records. - DontUseJobs: true, + DontUseJobs: true, + SkipJobMetricsPollingJobBootstrap: true, }, KeyVisualizer: &keyvisualizer.TestingKnobs{ SkipJobBootstrap: true, }, + JobsTestingKnobs: NewTestingKnobsWithShortIntervals(), }, }) db := sqlutils.MakeSQLRunner(sqlDB) @@ -477,18 +481,13 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { bti.clock = timeutil.NewManualTime(timeutil.Now()) timeSource := hlc.NewClock(bti.clock, base.DefaultMaxClockOffset) // Set up the test cluster. - knobs := &TestingKnobs{ - TimeSource: timeSource, - } + // Set a small adopt and cancel intervals to reduce test time. + knobs := NewTestingKnobsWithIntervals(unitTime, unitTime, initialDelay, maxDelay) + knobs.TimeSource = timeSource if bti.afterJobStateMachineKnob != nil { knobs.AfterJobStateMachine = bti.afterJobStateMachineKnob } cs := cluster.MakeTestingClusterSettings() - // Set a small adopt and cancel intervals to reduce test time. - adoptIntervalSetting.Override(ctx, &cs.SV, unitTime) - cancelIntervalSetting.Override(ctx, &cs.SV, unitTime) - retryInitialDelaySetting.Override(ctx, &cs.SV, initialDelay) - retryMaxDelaySetting.Override(ctx, &cs.SV, maxDelay) args := base.TestServerArgs{ Settings: cs, Knobs: base.TestingKnobs{ @@ -501,7 +500,8 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { ManagerDisableJobCreation: true, }, UpgradeManager: &upgradebase.TestingKnobs{ - DontUseJobs: true, + DontUseJobs: true, + SkipJobMetricsPollingJobBootstrap: true, }, KeyVisualizer: &keyvisualizer.TestingKnobs{ SkipJobBootstrap: true, diff --git a/pkg/jobs/testing_knobs.go b/pkg/jobs/testing_knobs.go index 1c85dbc55afa..81382bf5d4fe 100644 --- a/pkg/jobs/testing_knobs.go +++ b/pkg/jobs/testing_knobs.go @@ -116,15 +116,17 @@ type TestingIntervalOverrides struct { WaitForJobsMaxDelay *time.Duration } +const defaultShortInterval = 10 * time.Millisecond + // NewTestingKnobsWithShortIntervals return a TestingKnobs structure with -// overrides for short adopt and cancel intervals. +// overrides for short adopt, cancel, and retry intervals. func NewTestingKnobsWithShortIntervals() *TestingKnobs { - defaultShortInterval := 10 * time.Millisecond + interval := defaultShortInterval if util.RaceEnabled { - defaultShortInterval *= 5 + interval *= 5 } return NewTestingKnobsWithIntervals( - defaultShortInterval, defaultShortInterval, defaultShortInterval, defaultShortInterval, + interval, interval, interval, interval, ) } diff --git a/pkg/jobs/update.go b/pkg/jobs/update.go index 6a9cf39ad3b3..da44c4066c31 100644 --- a/pkg/jobs/update.go +++ b/pkg/jobs/update.go @@ -331,9 +331,9 @@ func UpdateHighwaterProgressed(highWater hlc.Timestamp, md JobMetadata, ju *JobU // if md.Status != StatusRunning { // return errors.New("job no longer running") // } -// md.UpdateStatus(StatusPaused) +// ju.UpdateStatus(StatusPaused) // // -// md.UpdatePayload(md.Payload) +// ju.UpdatePayload(md.Payload) // } // // Note that there are various convenience wrappers (like FractionProgressed) diff --git a/pkg/jobs/utils.go b/pkg/jobs/utils.go index ad9935ead054..5beeac3573f0 100644 --- a/pkg/jobs/utils.go +++ b/pkg/jobs/utils.go @@ -71,3 +71,52 @@ ORDER BY created` } return false /* exists */, err } + +// JobExists TODO +func JobExists( + ctx context.Context, + jobID jobspb.JobID, + txn isql.Txn, + payloadPredicate func(payload *jobspb.Payload) bool, +) (exists bool, retErr error) { + const stmt = ` +SELECT + id, payload +FROM + system.jobs +WHERE + status = '` + StatusPaused + `' +ORDER BY created` + + it, err := txn.QueryIterator( + ctx, + "get-jobs", + txn.KV(), + string(stmt), + ) + if err != nil { + return false /* exists */, err + } + // We have to make sure to close the iterator since we might return from the + // for loop early (before Next() returns false). + defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }() + + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + row := it.Cur() + payload, err := UnmarshalPayload(row[1]) + if err != nil { + return false /* exists */, err + } + + if payloadPredicate(payload) { + id := jobspb.JobID(*row[0].(*tree.DInt)) + if id == jobID { + break + } + + return true /* exists */, nil /* retErr */ + } + } + return false /* exists */, err +} diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index f56f0f75dc15..d2bbb1b81d59 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -136,6 +136,7 @@ go_library( "inverted_join.go", "job_exec_context.go", "job_exec_context_test_util.go", + "job_statistics.go", "jobs_collection.go", "join.go", "join_predicate.go", diff --git a/pkg/sql/job_statistics.go b/pkg/sql/job_statistics.go new file mode 100644 index 000000000000..7f7cbaef7f9f --- /dev/null +++ b/pkg/sql/job_statistics.go @@ -0,0 +1,50 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" +) + +type metricsPoller struct { + job *jobs.Job +} + +var _ jobs.Resumer = &metricsPoller{} + +// OnFailOrCancel is a part of the Resumer interface. +func (mp *metricsPoller) OnFailOrCancel( + ctx context.Context, execCtx interface{}, jobErr error, +) error { + return nil +} + +// Resume is part of the Resumer interface. +func (mp *metricsPoller) Resume(ctx context.Context, execCtx interface{}) error { + // The metrics polling job is a forever running background job. It's always + // safe to wind the SQL pod down whenever it's running, something we + // indicate through the job's idle status. + mp.job.MarkIdle(true) + + exec := execCtx.(JobExecContext) + return exec.ExecCfg().JobRegistry.PollMetricsTask(ctx) +} + +func init() { + createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + return &metricsPoller{job: job} + } + jobs.RegisterConstructor(jobspb.TypePollJobsStats, createResumerFn, jobs.UsesTenantCostControl) +} diff --git a/pkg/sql/logictest/testdata/logic_test/jobs b/pkg/sql/logictest/testdata/logic_test/jobs index d580755f8358..643ef09c5464 100644 --- a/pkg/sql/logictest/testdata/logic_test/jobs +++ b/pkg/sql/logictest/testdata/logic_test/jobs @@ -194,3 +194,9 @@ usage_count > 0 ORDER BY feature_name DESC ---- job.schema_change.successful + +# Ensure one POLL JOBS STATS job is running +query I +SELECT count(*) FROM [SHOW AUTOMATIC JOBS] WHERE job_type = 'POLL JOBS STATS' AND status = 'running' +---- +1 diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 265ca532f0f9..71880696e422 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -3387,6 +3387,7 @@ var charts = []sectionDescription{ "jobs.auto_sql_stats_compaction.currently_running", "jobs.stream_replication.currently_running", "jobs.key_visualizer.currently_running", + "jobs.poll_jobs_stats.currently_running", }, }, { @@ -3408,6 +3409,31 @@ var charts = []sectionDescription{ "jobs.stream_replication.currently_idle", "jobs.typedesc_schema_change.currently_idle", "jobs.key_visualizer.currently_idle", + "jobs.poll_jobs_stats.currently_idle", + }, + }, + { + Title: "Currently Paused", + Metrics: []string{ + "jobs.auto_create_stats.currently_paused", + "jobs.auto_span_config_reconciliation.currently_paused", + "jobs.auto_sql_stats_compaction.currently_paused", + "jobs.backup.currently_paused", + "jobs.changefeed.currently_paused", + "jobs.create_stats.currently_paused", + "jobs.import.currently_paused", + "jobs.migration.currently_paused", + "jobs.new_schema_change.currently_paused", + "jobs.restore.currently_paused", + "jobs.schema_change.currently_paused", + "jobs.schema_change_gc.currently_paused", + "jobs.stream_ingestion.currently_paused", + "jobs.stream_replication.currently_paused", + "jobs.typedesc_schema_change.currently_paused", + "jobs.auto_schema_telemetry.currently_paused", + "jobs.row_level_ttl.currently_paused", + "jobs.poll_jobs_stats.currently_paused", + "jobs.key_visualizer.currently_paused", }, }, { @@ -3595,6 +3621,17 @@ var charts = []sectionDescription{ "jobs.key_visualizer.resume_retry_error", }, }, + { + Title: "Jobs Stats Polling Job", + Metrics: []string{ + "jobs.poll_jobs_stats.fail_or_cancel_completed", + "jobs.poll_jobs_stats.fail_or_cancel_failed", + "jobs.poll_jobs_stats.fail_or_cancel_retry_error", + "jobs.poll_jobs_stats.resume_completed", + "jobs.poll_jobs_stats.resume_failed", + "jobs.poll_jobs_stats.resume_retry_error", + }, + }, }, }, { diff --git a/pkg/upgrade/upgradebase/testing_knobs.go b/pkg/upgrade/upgradebase/testing_knobs.go index f266aedfb6c6..6019fa51a38c 100644 --- a/pkg/upgrade/upgradebase/testing_knobs.go +++ b/pkg/upgrade/upgradebase/testing_knobs.go @@ -33,6 +33,11 @@ type TestingKnobs struct { // production. DontUseJobs bool + // SkipJobMetricsPollingJobBootstrap, if set, disables the + // clusterversion.V23_1_CreateJobsMetricsPollingJob upgrade, which prevents a + // job from being created. + SkipJobMetricsPollingJobBootstrap bool + // AfterRunPermanentUpgrades is called after each call to // RunPermanentUpgrades. AfterRunPermanentUpgrades func() diff --git a/pkg/upgrade/upgrademanager/manager.go b/pkg/upgrade/upgrademanager/manager.go index 844debdb6f83..2a6d601b3ce1 100644 --- a/pkg/upgrade/upgrademanager/manager.go +++ b/pkg/upgrade/upgrademanager/manager.go @@ -556,6 +556,7 @@ func (m *Manager) runMigration( LeaseManager: m.lm, InternalExecutor: m.ie, JobRegistry: m.jr, + TestingKnobs: &m.knobs, }); err != nil { return err } diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index 5fd374a39dcf..ce79cad06b8f 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "alter_statement_statistics_index_recommendations.go", "alter_table_statistics_partial_predicate_and_id.go", "create_index_usage_statement_statistics.go", + "create_jobs_metrics_polling_job.go", "delete_descriptors_of_dropped_functions.go", "desc_id_sequence_for_system_tenant.go", "descriptor_utils.go", @@ -90,6 +91,7 @@ go_test( "alter_table_statistics_partial_predicate_and_id_test.go", "builtins_test.go", "create_index_usage_statement_statistics_test.go", + "create_jobs_metrics_polling_job_test.go", "delete_descriptors_of_dropped_functions_test.go", "desc_id_sequence_for_system_tenant_test.go", "descriptor_utils_test.go", diff --git a/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go b/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go new file mode 100644 index 000000000000..c4f06dd83495 --- /dev/null +++ b/pkg/upgrade/upgrades/create_jobs_metrics_polling_job.go @@ -0,0 +1,61 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package upgrades + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/upgrade" +) + +func createJobsMetricsPollingJob( + ctx context.Context, _ clusterversion.ClusterVersion, d upgrade.TenantDeps, +) error { + if d.TestingKnobs != nil && d.TestingKnobs.SkipJobMetricsPollingJobBootstrap { + return nil + } + return d.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + row, err := d.DB.Executor().QueryRowEx( + ctx, + "check for existing key visualizer job", + nil, + sessiondata.InternalExecutorOverride{User: username.RootUserName()}, + "SELECT * FROM system.jobs WHERE id = $1", + jobs.JobMetricsPollerJobID, + ) + if err != nil { + return err + } + + // If there isn't a row for the key visualizer job, create the job. + if row == nil { + jr := jobs.Record{ + JobID: jobs.JobMetricsPollerJobID, + Description: jobspb.TypePollJobsStats.String(), + Details: jobspb.PollJobsStatsDetails{}, + Progress: jobspb.PollJobsStatsProgress{}, + CreatedBy: &jobs.CreatedByInfo{Name: username.RootUser, ID: username.RootUserID}, + Username: username.RootUserName(), + NonCancelable: true, + } + if _, err := d.JobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jobs.JobMetricsPollerJobID, txn); err != nil { + return err + } + } + return nil + }) +} diff --git a/pkg/upgrade/upgrades/create_jobs_metrics_polling_job_test.go b/pkg/upgrade/upgrades/create_jobs_metrics_polling_job_test.go new file mode 100644 index 000000000000..d729c454301c --- /dev/null +++ b/pkg/upgrade/upgrades/create_jobs_metrics_polling_job_test.go @@ -0,0 +1,69 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package upgrades_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/upgrade/upgrades" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCreateJobsMetricsPollingJob(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.ByKey( + clusterversion.V23_1_CreateJobsMetricsPollingJob - 1), + }, + }, + }, + } + + var ( + ctx = context.Background() + tc = testcluster.StartTestCluster(t, 1, clusterArgs) + sqlDB = tc.ServerConn(0) + ) + defer tc.Stopper().Stop(ctx) + + var count int + row := sqlDB.QueryRow("SELECT count(*) FROM crdb_internal.jobs WHERE job_type = 'POLL STATS JOB'") + err := row.Scan(&count) + require.NoError(t, err) + assert.Equal(t, count, 0) + + upgrades.Upgrade( + t, + sqlDB, + clusterversion.V23_1_CreateJobsMetricsPollingJob, + nil, /* done */ + false, /* expectError */ + ) + + row = sqlDB.QueryRow("SELECT count(*) FROM crdb_internal.jobs WHERE job_type = 'POLL JOBS STATS'") + err = row.Scan(&count) + require.NoError(t, err) + assert.Equal(t, count, 1) +} diff --git a/pkg/upgrade/upgrades/upgrades.go b/pkg/upgrade/upgrades/upgrades.go index 4d5e8a273e5e..1c294e454cf7 100644 --- a/pkg/upgrade/upgrades/upgrades.go +++ b/pkg/upgrade/upgrades/upgrades.go @@ -263,6 +263,11 @@ var upgrades = []upgradebase.Upgrade{ upgrade.NoPrecondition, deleteDescriptorsOfDroppedFunctions, ), + upgrade.NewPermanentTenantUpgrade("create jobs metrics polling job", + toCV(clusterversion.V23_1_CreateJobsMetricsPollingJob), + createJobsMetricsPollingJob, + "create jobs metrics polling job", + ), } func init() {