diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index b78738de0ec7..b0af58595e4a 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -166,6 +166,7 @@ ALL_TESTS = [ "//pkg/internal/sqlsmith:sqlsmith_test", "//pkg/internal/team:team_test", "//pkg/jobs/joberror:joberror_test", + "//pkg/jobs/jobspb:jobspb_test", "//pkg/jobs/jobsprotectedts:jobsprotectedts_test", "//pkg/jobs:jobs_test", "//pkg/keys:keys_test", @@ -1082,6 +1083,7 @@ GO_TARGETS = [ "//pkg/jobs/joberror:joberror", "//pkg/jobs/joberror:joberror_test", "//pkg/jobs/jobspb:jobspb", + "//pkg/jobs/jobspb:jobspb_test", "//pkg/jobs/jobsprotectedts:jobsprotectedts", "//pkg/jobs/jobsprotectedts:jobsprotectedts_test", "//pkg/jobs/jobstest:jobstest", diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index dfc5010fa26f..030e85945cf9 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -669,7 +669,7 @@ func TestBackupAndRestoreJobDescription(t *testing.T) { asOf1 := strings.TrimPrefix(matches[1], "/full") sqlDB.CheckQueryResults( - t, "SELECT description FROM [SHOW JOBS] WHERE status != 'failed'", + t, "SELECT description FROM [SHOW JOBS] WHERE status != 'failed' AND description LIKE 'BACKUP%'", [][]string{ {fmt.Sprintf("BACKUP TO ('%s', '%s', '%s')", backups[0].(string), backups[1].(string), backups[2].(string))}, @@ -5573,7 +5573,7 @@ func TestBackupRestoreShowJob(t *testing.T) { // TODO (lucy): Update this if/when we decide to change how these jobs queued by // the startup migration are handled. sqlDB.CheckQueryResults( - t, "SELECT description FROM [SHOW JOBS] WHERE description != 'updating privileges' ORDER BY description", + t, "SELECT description FROM [SHOW JOBS] WHERE description != 'updating privileges' AND description != 'POLL JOBS STATS' ORDER BY description", [][]string{ {"BACKUP DATABASE data TO 'nodelocal://0/foo' WITH revision_history = true"}, {"RESTORE TABLE data.bank FROM 'nodelocal://0/foo' WITH into_db = 'data 2', skip_missing_foreign_keys"}, diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index bd493160dca4..ddd362f5f534 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -339,6 +339,51 @@ func TestChangefeedIdleness(t *testing.T) { }, feedTestEnterpriseSinks) } +func TestChangefeedPausedMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + registry := s.Server.JobRegistry().(*jobs.Registry) + currentlyPaused := registry.MetricsStruct().JobMetrics[jobspb.TypeChangefeed].CurrentlyPaused + waitForPausedCount := func(numIdle int64) { + testutils.SucceedsSoon(t, func() error { + if currentlyPaused.Value() != numIdle { + return fmt.Errorf("expected (%+v) paused changefeeds, found (%+v)", numIdle, currentlyPaused.Value()) + } + return nil + }) + } + + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `CREATE TABLE bar (b INT PRIMARY KEY)`) + cf1 := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo") + cf2 := feed(t, f, "CREATE CHANGEFEED FOR TABLE bar") + sqlDB.Exec(t, `INSERT INTO foo VALUES (0)`) + sqlDB.Exec(t, `INSERT INTO bar VALUES (0)`) + waitForPausedCount(0) + + jobFeed1 := cf1.(cdctest.EnterpriseTestFeed) + jobFeed2 := cf2.(cdctest.EnterpriseTestFeed) + + require.NoError(t, jobFeed1.Pause()) + waitForPausedCount(1) + + require.NoError(t, jobFeed1.Resume()) + waitForPausedCount(0) + + require.NoError(t, jobFeed1.Pause()) + require.NoError(t, jobFeed2.Pause()) + waitForPausedCount(2) + + closeFeed(t, cf1) + waitForPausedCount(1) + closeFeed(t, cf2) + waitForPausedCount(0) + }, feedTestEnterpriseSinks) +} + // TestChangefeedSendError validates that SendErrors do not fail the changefeed // as they can occur in normal situations such as a cluster update func TestChangefeedSendError(t *testing.T) { diff --git a/pkg/cli/testdata/doctor/test_examine_cluster b/pkg/cli/testdata/doctor/test_examine_cluster index 1f148121d88e..c1c2d9f53583 100644 --- a/pkg/cli/testdata/doctor/test_examine_cluster +++ b/pkg/cli/testdata/doctor/test_examine_cluster @@ -3,5 +3,5 @@ debug doctor examine cluster debug doctor examine cluster Examining 47 descriptors and 46 namespace entries... ParentID 100, ParentSchemaID 101: relation "foo" (105): expected matching namespace entry, found none -Examining 4 jobs... +Examining 5 jobs... ERROR: validation failed diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index da5da78e548a..8b503d1f5417 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -418,7 +418,7 @@ func (r *Registry) runJob( r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) r.maybeRecordExecutionFailure(ctx, err, job) if r.knobs.AfterJobStateMachine != nil { - r.knobs.AfterJobStateMachine() + r.knobs.AfterJobStateMachine(job) } return err } diff --git a/pkg/jobs/config.go b/pkg/jobs/config.go index 3bfbc7c439d2..767e4d413280 100644 --- a/pkg/jobs/config.go +++ b/pkg/jobs/config.go @@ -32,6 +32,7 @@ const ( executionErrorsMaxEntriesKey = "jobs.execution_errors.max_entries" executionErrorsMaxEntrySizeKey = "jobs.execution_errors.max_entry_size" debugPausePointsSettingKey = "jobs.debug.pausepoints" + metricsPollingIntervalKey = "jobs.metrics.interval.poll" ) const ( @@ -70,6 +71,9 @@ const ( // error. If this size is exceeded, the error will be formatted as a string // and then truncated to fit the size. defaultExecutionErrorsMaxEntrySize = 64 << 10 // 64 KiB + + // defaultPollForMetricsInterval is the default interval to poll the jobs table for metrics. + defaultPollForMetricsInterval = 10 * time.Second ) var ( @@ -100,6 +104,14 @@ var ( settings.PositiveDuration, ) + pollJobsMetricsInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + metricsPollingIntervalKey, + "the interval at which a node in the cluster will poll the jobs table for metrics", + defaultPollForMetricsInterval, + settings.PositiveDuration, + ) + gcIntervalSetting = settings.RegisterDurationSetting( settings.TenantWritable, gcIntervalSettingKey, diff --git a/pkg/jobs/delegate_control_test.go b/pkg/jobs/delegate_control_test.go index 0272f38acec2..33079d159a2b 100644 --- a/pkg/jobs/delegate_control_test.go +++ b/pkg/jobs/delegate_control_test.go @@ -252,7 +252,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) { argsFn := func(args *base.TestServerArgs) { // Prevent registry from changing job state while running this test. interval := 24 * time.Hour - args.Knobs.JobsTestingKnobs = NewTestingKnobsWithIntervals(interval, interval, interval, interval) + args.Knobs.JobsTestingKnobs = NewTestingKnobsWithIntervals(interval, interval, interval, interval, interval) } th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables, argsFn) defer cleanup() @@ -332,7 +332,7 @@ func TestJobControlByType(t *testing.T) { argsFn := func(args *base.TestServerArgs) { // Prevent registry from changing job state while running this test. interval := 24 * time.Hour - args.Knobs.JobsTestingKnobs = NewTestingKnobsWithIntervals(interval, interval, interval, interval) + args.Knobs.JobsTestingKnobs = NewTestingKnobsWithIntervals(interval, interval, interval, interval, interval) } th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables, argsFn) defer cleanup() diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 245ec1303aa8..8e1062f7eea9 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -185,7 +185,7 @@ type registryTestSuite struct { // afterJobStateMachine is invoked in the AfterJobStateMachine testing knob if // non-nil. - afterJobStateMachine func() + afterJobStateMachine func(job *jobs.Job) // Instead of a ch for success, use a variable because it can retry since it // is in a transaction. @@ -215,9 +215,10 @@ func (rts *registryTestSuite) setUp(t *testing.T) { } return nil } - knobs.AfterJobStateMachine = func() { + //knobs.DisableJobsMetricsPolling = true + knobs.AfterJobStateMachine = func(job *jobs.Job) { if rts.afterJobStateMachine != nil { - rts.afterJobStateMachine() + rts.afterJobStateMachine(job) } } args.Knobs.JobsTestingKnobs = knobs @@ -237,7 +238,10 @@ func (rts *registryTestSuite) setUp(t *testing.T) { rts.sqlDB = sqlutils.MakeSQLRunner(rts.outerDB) rts.registry = rts.s.JobRegistry().(*jobs.Registry) rts.done = make(chan struct{}) - rts.mockJob = jobs.Record{Details: jobspb.ImportDetails{}, Progress: jobspb.ImportProgress{}} + rts.mockJob = jobs.Record{ + Details: jobspb.ImportDetails{}, Progress: jobspb.ImportProgress{}, + Username: username.MakeSQLUsernameFromPreNormalizedString("test"), + } rts.resumeCh = make(chan error) rts.progressCh = make(chan struct{}) @@ -1013,8 +1017,11 @@ func TestRegistryLifecycle(t *testing.T) { t.Run("dump traces on pause-unpause-success", func(t *testing.T) { completeCh := make(chan struct{}) - rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func() { - completeCh <- struct{}{} + rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func(job *jobs.Job) { + pl := job.Payload() + if (&pl).Type() == jobspb.TypeImport { + completeCh <- struct{}{} + } }} rts.setUp(t) defer rts.tearDown() @@ -1062,8 +1069,11 @@ func TestRegistryLifecycle(t *testing.T) { t.Run("dump traces on fail", func(t *testing.T) { completeCh := make(chan struct{}) - rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func() { - completeCh <- struct{}{} + rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func(job *jobs.Job) { + pl := job.Payload() + if (&pl).Type() == jobspb.TypeImport { + completeCh <- struct{}{} + } }} rts.setUp(t) defer rts.tearDown() @@ -1105,8 +1115,11 @@ func TestRegistryLifecycle(t *testing.T) { t.Run("dump traces on cancel", func(t *testing.T) { completeCh := make(chan struct{}) - rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func() { - completeCh <- struct{}{} + rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func(job *jobs.Job) { + pl := job.Payload() + if (&pl).Type() == jobspb.TypeImport { + completeCh <- struct{}{} + } }} rts.setUp(t) defer rts.tearDown() @@ -2326,7 +2339,7 @@ func TestJobInTxn(t *testing.T) { // Set the adoption interval to be very long to test the adoption channel. args := base.TestServerArgs{Knobs: base.TestingKnobs{ - JobsTestingKnobs: jobs.NewTestingKnobsWithIntervals(time.Hour, time.Hour, time.Hour, time.Hour)}, + JobsTestingKnobs: jobs.NewTestingKnobsWithIntervals(time.Hour, time.Hour, time.Hour, time.Hour, time.Hour)}, } ctx := context.Background() s, sqlDB, _ := serverutils.StartServer(t, args) @@ -2886,6 +2899,7 @@ func TestMetrics(t *testing.T) { DescriptorIDs: []descpb.ID{1}, Details: jobspb.BackupDetails{}, Progress: jobspb.BackupProgress{}, + Username: username.TestUserName(), } _, err := registry.CreateAdoptableJobWithTxn(ctx, rec, registry.MakeJobID(), nil /* txn */) require.NoError(t, err) @@ -2902,6 +2916,7 @@ func TestMetrics(t *testing.T) { DescriptorIDs: []descpb.ID{1}, Details: jobspb.ImportDetails{}, Progress: jobspb.ImportProgress{}, + Username: username.TestUserName(), } importMetrics := registry.MetricsStruct().JobMetrics[jobspb.TypeImport] @@ -2958,6 +2973,7 @@ func TestMetrics(t *testing.T) { DescriptorIDs: []descpb.ID{1}, Details: jobspb.ImportDetails{}, Progress: jobspb.ImportProgress{}, + Username: username.TestUserName(), } importMetrics := registry.MetricsStruct().JobMetrics[jobspb.TypeImport] @@ -2993,6 +3009,7 @@ func TestMetrics(t *testing.T) { DescriptorIDs: []descpb.ID{1}, Details: jobspb.ImportDetails{}, Progress: jobspb.ImportProgress{}, + Username: username.TestUserName(), } importMetrics := registry.MetricsStruct().JobMetrics[jobspb.TypeImport] @@ -3047,7 +3064,7 @@ func TestLoseLeaseDuringExecution(t *testing.T) { defer jobs.ResetConstructors()() // Disable the loops from messing with the job execution. - knobs := base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithIntervals(time.Hour, time.Hour, time.Hour, time.Hour)} + knobs := base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithIntervals(time.Hour, time.Hour, time.Hour, time.Hour, time.Hour)} ctx := context.Background() @@ -3142,6 +3159,7 @@ func TestPauseReason(t *testing.T) { DescriptorIDs: []descpb.ID{1}, Details: jobspb.ImportDetails{}, Progress: jobspb.ImportProgress{}, + Username: username.TestUserName(), } tdb := sqlutils.MakeSQLRunner(db) diff --git a/pkg/jobs/jobspb/BUILD.bazel b/pkg/jobs/jobspb/BUILD.bazel index 166d60bc7bd9..ba340720fc6b 100644 --- a/pkg/jobs/jobspb/BUILD.bazel +++ b/pkg/jobs/jobspb/BUILD.bazel @@ -1,7 +1,7 @@ load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") load("@rules_proto//proto:defs.bzl", "proto_library") load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "jobspb", @@ -60,4 +60,14 @@ go_proto_library( ], ) +go_test( + name = "jobspb_test", + srcs = ["wrap_test.go"], + args = ["-test.timeout=295s"], + deps = [ + ":jobspb", + "@com_github_stretchr_testify//assert", + ], +) + get_x_data(name = "get_x_data") diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index e363f0fda428..9355329cf14e 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -1061,6 +1061,12 @@ message SchemaTelemetryDetails { message SchemaTelemetryProgress { } +message PollJobsStatsDetails { +} + +message PollJobsStatsProgress { +} + message Payload { string description = 1; // If empty, the description is assumed to be the statement. @@ -1112,6 +1118,9 @@ message Payload { // and publish it to the telemetry event log. These jobs are typically // created by a built-in schedule named "sql-schema-telemetry". SchemaTelemetryDetails schema_telemetry = 37; + // PollJobsStats jobs poll the jobs table for statistics metrics as the number of + // paused jobs. + PollJobsStatsDetails poll_jobs_stats = 38; } reserved 26; // PauseReason is used to describe the reason that the job is currently paused @@ -1173,6 +1182,7 @@ message Progress { StreamReplicationProgress streamReplication = 24; RowLevelTTLProgress row_level_ttl = 25 [(gogoproto.customname)="RowLevelTTL"]; SchemaTelemetryProgress schema_telemetry = 26; + PollJobsStatsProgress pollJobsStats = 27; } uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"]; @@ -1202,6 +1212,7 @@ enum Type { STREAM_REPLICATION = 15 [(gogoproto.enumvalue_customname) = "TypeStreamReplication"]; ROW_LEVEL_TTL = 16 [(gogoproto.enumvalue_customname) = "TypeRowLevelTTL"]; AUTO_SCHEMA_TELEMETRY = 17 [(gogoproto.enumvalue_customname) = "TypeAutoSchemaTelemetry"]; + POLL_JOBS_STATS = 18 [(gogoproto.enumvalue_customname) = "TypePollJobsStats"]; } message Job { diff --git a/pkg/jobs/jobspb/wrap.go b/pkg/jobs/jobspb/wrap.go index 3ab524453240..8f4f390fbe9a 100644 --- a/pkg/jobs/jobspb/wrap.go +++ b/pkg/jobs/jobspb/wrap.go @@ -136,6 +136,8 @@ func DetailsType(d isPayload_Details) Type { return TypeRowLevelTTL case *Payload_SchemaTelemetry: return TypeAutoSchemaTelemetry + case *Payload_PollJobsStats: + return TypePollJobsStats default: panic(errors.AssertionFailedf("Payload.Type called on a payload with an unknown details type: %T", d)) } @@ -182,6 +184,8 @@ func WrapProgressDetails(details ProgressDetails) interface { return &Progress_RowLevelTTL{RowLevelTTL: &d} case SchemaTelemetryProgress: return &Progress_SchemaTelemetry{SchemaTelemetry: &d} + case PollJobsStatsProgress: + return &Progress_PollJobsStats{PollJobsStats: &d} default: panic(errors.AssertionFailedf("WrapProgressDetails: unknown details type %T", d)) } @@ -223,6 +227,8 @@ func (p *Payload) UnwrapDetails() Details { return *d.RowLevelTTL case *Payload_SchemaTelemetry: return *d.SchemaTelemetry + case *Payload_PollJobsStats: + return *d.PollJobsStats default: return nil } @@ -264,6 +270,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails { return *d.RowLevelTTL case *Progress_SchemaTelemetry: return *d.SchemaTelemetry + case *Progress_PollJobsStats: + return *d.PollJobsStats default: return nil } @@ -277,6 +285,17 @@ func (t Type) String() string { return strings.Replace(Type_name[int32(t)], "_", " ", -1) } +// TypeFromString is used to get the type corresponding to the string s +// where s := Type.String(). +func TypeFromString(s string) (Type, error) { + s = strings.Replace(s, " ", "_", -1) + t, ok := Type_value[s] + if !ok { + return TypeUnspecified, errors.New("invalid type string") + } + return Type(t), nil +} + // WrapPayloadDetails wraps a Details object in the protobuf wrapper struct // necessary to make it usable as the Details field of a Payload. // @@ -318,6 +337,8 @@ func WrapPayloadDetails(details Details) interface { return &Payload_RowLevelTTL{RowLevelTTL: &d} case SchemaTelemetryDetails: return &Payload_SchemaTelemetry{SchemaTelemetry: &d} + case PollJobsStatsDetails: + return &Payload_PollJobsStats{PollJobsStats: &d} default: panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d)) } @@ -353,7 +374,7 @@ const ( func (Type) SafeValue() {} // NumJobTypes is the number of jobs types. -const NumJobTypes = 18 +const NumJobTypes = 19 // ChangefeedDetailsMarshaler allows for dependency injection of // cloud.SanitizeExternalStorageURI to avoid the dependency from this diff --git a/pkg/jobs/jobspb/wrap_test.go b/pkg/jobs/jobspb/wrap_test.go new file mode 100644 index 000000000000..7dc4e8259a8d --- /dev/null +++ b/pkg/jobs/jobspb/wrap_test.go @@ -0,0 +1,30 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package jobspb_test + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/stretchr/testify/assert" +) + +func TestTypeString(t *testing.T) { + for i := 0; i < jobspb.NumJobTypes; i++ { + typ := jobspb.Type(i) + typStr := typ.String() + convertedType, err := jobspb.TypeFromString(typStr) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, convertedType, typ) + } +} diff --git a/pkg/jobs/metrics.go b/pkg/jobs/metrics.go index 4ff9399b6e87..4c480f833ac5 100644 --- a/pkg/jobs/metrics.go +++ b/pkg/jobs/metrics.go @@ -47,6 +47,7 @@ type Metrics struct { type JobTypeMetrics struct { CurrentlyRunning *metric.Gauge CurrentlyIdle *metric.Gauge + CurrentlyPaused *metric.Gauge ResumeCompleted *metric.Counter ResumeRetryError *metric.Counter ResumeFailed *metric.Counter @@ -82,6 +83,17 @@ func makeMetaCurrentlyIdle(typeStr string) metric.Metadata { } } +func makeMetaCurrentlyPaused(typeStr string) metric.Metadata { + return metric.Metadata{ + Name: fmt.Sprintf("jobs.%s.currently_paused", typeStr), + Help: fmt.Sprintf("Number of %s jobs currently considered Paused", + typeStr), + Measurement: "jobs", + Unit: metric.Unit_COUNT, + MetricType: io_prometheus_client.MetricType_GAUGE, + } +} + func makeMetaResumeCompeted(typeStr string) metric.Metadata { return metric.Metadata{ Name: fmt.Sprintf("jobs.%s.resume_completed", typeStr), @@ -214,6 +226,7 @@ func (m *Metrics) init(histogramWindowInterval time.Duration) { m.JobMetrics[jt] = &JobTypeMetrics{ CurrentlyRunning: metric.NewGauge(makeMetaCurrentlyRunning(typeStr)), CurrentlyIdle: metric.NewGauge(makeMetaCurrentlyIdle(typeStr)), + CurrentlyPaused: metric.NewGauge(makeMetaCurrentlyPaused(typeStr)), ResumeCompleted: metric.NewCounter(makeMetaResumeCompeted(typeStr)), ResumeRetryError: metric.NewCounter(makeMetaResumeRetryError(typeStr)), ResumeFailed: metric.NewCounter(makeMetaResumeFailed(typeStr)), diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 5b881a62031e..eaf7482b1029 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -811,6 +811,10 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { } }) + if err := r.createJobsStatsPollerIfNotExists(ctx); err != nil { + return err + } + if err := stopper.RunAsyncTask(ctx, "jobs/cancel", func(ctx context.Context) { ctx, cancel := stopper.WithCancelOnQuiesce(ctx) defer cancel() @@ -895,6 +899,141 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { }) } +func (r *Registry) createJobsStatsPollerIfNotExists(ctx context.Context) error { + if r.knobs.DisableJobsMetricsPolling { + return nil + } + + for { + const TxnName = "Jobs Stats Poller Txn" + txn := r.db.NewTxn(ctx, TxnName) + + exists, err := JobExists(ctx, r.ex, txn, func(payload *jobspb.Payload) bool { + return payload.Type() == jobspb.TypePollJobsStats + }) + if err != nil { + if errors.HasType(err, (*roachpb.TransactionRetryWithProtoRefreshError)(nil)) { + continue + } + return err + } + + if exists { + return nil + } + + metricsJobID := r.MakeJobID() + jr := createJobsStatsPollerRecord() + if _, err := r.CreateAdoptableJobWithTxn(ctx, jr, metricsJobID, txn); err != nil { + if errors.HasType(err, (*roachpb.TransactionRetryWithProtoRefreshError)(nil)) { + continue + } + return err + } + if err := txn.Commit(ctx); err != nil { + if errors.HasType(err, (*roachpb.TransactionRetryWithProtoRefreshError)(nil)) { + continue + } + return err + } + return nil + } +} + +func createJobsStatsPollerRecord() Record { + return Record{ + Description: jobspb.TypePollJobsStats.String(), + Details: jobspb.PollJobsStatsDetails{}, + Progress: jobspb.PollJobsStatsProgress{}, + CreatedBy: &CreatedByInfo{Name: username.RootUser, ID: username.RootUserID}, + Username: username.RootUserName(), + } +} + +// PollMetricsTask polls the jobs table for certain metrics at an interval. +func (r *Registry) PollMetricsTask(ctx context.Context) error { + + var err error + updateMetrics := func(ctx2 context.Context, s sqlliveness.Session) { + lc, cleanup := makeLoopController(r.settings, pollJobsMetricsInterval, r.knobs.IntervalOverrides.PollMetrics) + defer cleanup() + for { + select { + case <-ctx2.Done(): + err = ctx.Err() + return + case <-lc.updated: + lc.onUpdate() + case <-lc.timer.C: + lc.timer.Read = true + if err = r.updatePausedMetrics(ctx2, s); err != nil { + log.Errorf(ctx2, "failed to update paused metrics: %v", err) + return + } + lc.onExecute() + } + } + } + r.withSession(ctx, updateMetrics) + return err +} + +const pausedJobsCountQuery = string(` + SELECT job_type, count(*) + FROM crdb_internal.jobs + WHERE status = '` + StatusPaused + `' + GROUP BY job_type`) + +func (r *Registry) updatePausedMetrics(ctx context.Context, s sqlliveness.Session) error { + metricUpdates := make(map[jobspb.Type]int) + err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + // In case of transaction retries, reset this map here. + metricUpdates = make(map[jobspb.Type]int) + + // Run the claim transaction at low priority to ensure that it does not + // contend with foreground reads. + if err := txn.SetUserPriority(roachpb.MinUserPriority); err != nil { + return errors.WithAssertionFailure(err) + } + // Note that we have to buffer all rows first - before processing each + // job - because we have to make sure that the query executes without an + // error (otherwise, the system.jobs table might diverge from the jobs + // registry). + rows, err := r.ex.QueryBufferedEx( + ctx, "cancel/pause-requested", txn, sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, + pausedJobsCountQuery, s.ID().UnsafeBytes(), r.ID(), + ) + if err != nil { + return errors.Wrap(err, "could not query jobs table") + } + + for _, row := range rows { + typeString := *row[0].(*tree.DString) + count := *row[1].(*tree.DInt) + typ, err := jobspb.TypeFromString(string(typeString)) + if err != nil { + return err + } + metricUpdates[typ] = int(count) + } + + return nil + }) + if err == nil { + for _, v := range jobspb.Type_value { + if r.metrics.JobMetrics[v] != nil { + if _, ok := metricUpdates[jobspb.Type(v)]; ok { + r.metrics.JobMetrics[v].CurrentlyPaused.Update(int64(metricUpdates[jobspb.Type(v)])) + } else { + r.metrics.JobMetrics[v].CurrentlyPaused.Update(0) + } + } + } + } + + return err +} + func (r *Registry) maybeCancelJobs(ctx context.Context, s sqlliveness.Session) { r.mu.Lock() defer r.mu.Unlock() diff --git a/pkg/jobs/registry_external_test.go b/pkg/jobs/registry_external_test.go index 7cafb27eee10..0be5af77078b 100644 --- a/pkg/jobs/registry_external_test.go +++ b/pkg/jobs/registry_external_test.go @@ -88,7 +88,7 @@ func TestExpiringSessionsAndClaimJobsDoesNotTouchTerminalJobs(t *testing.T) { adopt := 10 * time.Hour cancel := 10 * time.Millisecond args := base.TestServerArgs{Knobs: base.TestingKnobs{ - JobsTestingKnobs: jobs.NewTestingKnobsWithIntervals(adopt, cancel, adopt, adopt), + JobsTestingKnobs: jobs.NewTestingKnobsWithIntervals(adopt, cancel, adopt, adopt, adopt), }} ctx := context.Background() diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index acfab16efd1e..59d42fa9342e 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -121,6 +121,10 @@ func TestRegistryGC(t *testing.T) { // test itself. ManagerDisableJobCreation: true, }, + JobsTestingKnobs: &TestingKnobs{ + // Disable the jobs metrics polling job to avoid edge cases in this test. + DisableJobsMetricsPolling: true, + }, }, }) defer s.Stopper().Stop(ctx) @@ -262,6 +266,10 @@ func TestRegistryGCPagination(t *testing.T) { // test itself. ManagerDisableJobCreation: true, }, + JobsTestingKnobs: &TestingKnobs{ + // Disable the jobs metrics polling job to avoid edge cases in this test. + DisableJobsMetricsPolling: true, + }, }, }) db := sqlutils.MakeSQLRunner(sqlDB) @@ -332,6 +340,7 @@ func TestBatchJobsCreation(t *testing.T) { JobID: r.MakeJobID(), Details: jobspb.ImportDetails{}, Progress: jobspb.ImportProgress{}, + Username: username.TestUserName(), }) } // Create jobs in a batch. @@ -342,7 +351,7 @@ func TestBatchJobsCreation(t *testing.T) { return err })) require.Equal(t, len(jobIDs), test.batchSize) - tdb.CheckQueryResults(t, "SELECT count(*) FROM [SHOW JOBS]", + tdb.CheckQueryResults(t, "SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'IMPORT'", [][]string{{fmt.Sprintf("%d", test.batchSize)}}) } }) @@ -440,7 +449,7 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { jobMetrics *JobTypeMetrics adopted *metric.Counter resumed *metric.Counter - afterJobStateMachineKnob func() + afterJobStateMachineKnob func(job *Job) // expectImmediateRetry is true if the test should expect immediate // resumption on retry, such as after pausing and resuming job. expectImmediateRetry bool @@ -454,6 +463,11 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { // Set up the test cluster. knobs := &TestingKnobs{ TimeSource: timeSource, + // Disable the background job metrics polling job. This knob is enabled because + // this test uses bti.resumed = bti.registry.metrics.ResumedJobs, which counts + // the number of resumed jobs of all types. At the moment, there is no way to + // count resumed jobs of a specific type. + DisableJobsMetricsPolling: true, } if bti.afterJobStateMachineKnob != nil { knobs.AfterJobStateMachine = bti.afterJobStateMachineKnob @@ -584,7 +598,7 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { t.Run("running", func(t *testing.T) { ctx := context.Background() bti := BackoffTestInfra{} - bti.afterJobStateMachineKnob = func() { + bti.afterJobStateMachineKnob = func(job *Job) { if bti.done.Load().(bool) { return } @@ -607,7 +621,7 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { ctx := context.Background() bti := BackoffTestInfra{expectImmediateRetry: true} skip.WithIssue(t, 74399) - bti.afterJobStateMachineKnob = func() { + bti.afterJobStateMachineKnob = func(job *Job) { if bti.done.Load().(bool) { return } @@ -632,7 +646,7 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { t.Run("revert on fail", func(t *testing.T) { ctx := context.Background() bti := BackoffTestInfra{} - bti.afterJobStateMachineKnob = func() { + bti.afterJobStateMachineKnob = func(job *Job) { if bti.done.Load().(bool) { return } @@ -684,7 +698,7 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { bti := BackoffTestInfra{expectImmediateRetry: true} skip.WithIssue(t, 74399) - bti.afterJobStateMachineKnob = func() { + bti.afterJobStateMachineKnob = func(job *Job) { if bti.done.Load().(bool) { return } @@ -1015,6 +1029,7 @@ func TestJobIdleness(t *testing.T) { _, err := r.CreateJobWithTxn(ctx, Record{ Details: jobspb.ImportDetails{}, Progress: jobspb.ImportProgress{}, + Username: username.MakeSQLUsernameFromPreNormalizedString("test"), }, jobID, txn) return err })) @@ -1031,31 +1046,37 @@ func TestJobIdleness(t *testing.T) { [][]string{{string(status)}}) } + getRunningNonIdleJobs := func() int64 { + return r.metrics.JobMetrics[jobspb.TypeImport].CurrentlyRunning.Value() - r.metrics.JobMetrics[jobspb.TypeImport].CurrentlyIdle.Value() + } + t.Run("MarkIdle", func(t *testing.T) { job1 := createJob() job2 := createJob() require.False(t, r.TestingIsJobIdle(job1.ID())) - require.EqualValues(t, 2, r.metrics.RunningNonIdleJobs.Value()) + require.EqualValues(t, 2, + getRunningNonIdleJobs(), + ) r.MarkIdle(job1, true) r.MarkIdle(job2, true) require.True(t, r.TestingIsJobIdle(job1.ID())) require.Equal(t, int64(2), currentlyIdle.Value()) - require.EqualValues(t, 0, r.metrics.RunningNonIdleJobs.Value()) + require.EqualValues(t, 0, getRunningNonIdleJobs()) // Repeated calls should not increase metric r.MarkIdle(job1, true) r.MarkIdle(job1, true) require.Equal(t, int64(2), currentlyIdle.Value()) - require.EqualValues(t, 0, r.metrics.RunningNonIdleJobs.Value()) + require.EqualValues(t, 0, getRunningNonIdleJobs()) r.MarkIdle(job1, false) require.Equal(t, int64(1), currentlyIdle.Value()) require.False(t, r.TestingIsJobIdle(job1.ID())) - require.EqualValues(t, 1, r.metrics.RunningNonIdleJobs.Value()) + require.EqualValues(t, 1, getRunningNonIdleJobs()) r.MarkIdle(job2, false) require.Equal(t, int64(0), currentlyIdle.Value()) - require.EqualValues(t, 2, r.metrics.RunningNonIdleJobs.Value()) + require.EqualValues(t, 2, getRunningNonIdleJobs()) // Let the jobs complete resumeErrChan <- nil diff --git a/pkg/jobs/testing_knobs.go b/pkg/jobs/testing_knobs.go index 7fe6d8e54a12..afbf0af1f6fb 100644 --- a/pkg/jobs/testing_knobs.go +++ b/pkg/jobs/testing_knobs.go @@ -64,7 +64,7 @@ type TestingKnobs struct { // AfterJobStateMachine is called once the running instance of the job has // returned from the state machine that transitions it from one state to // another. - AfterJobStateMachine func() + AfterJobStateMachine func(job *Job) // TimeSource replaces registry's clock. TimeSource *hlc.Clock @@ -75,6 +75,9 @@ type TestingKnobs struct { // BeforeWaitForJobsQuery is called once per invocation of the // poll-show-jobs query in WaitForJobs. BeforeWaitForJobsQuery func() + + // Disable the creation of the single jobs metric polling job from being created. + DisableJobsMetricsPolling bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. @@ -89,6 +92,9 @@ type TestingIntervalOverrides struct { // Cancel overrides the cancelIntervalSetting cluster setting. Cancel *time.Duration + // CollectMetrics overrides the pollJobsMetricsInterval cluster setting. + PollMetrics *time.Duration + // Gc overrides the gcIntervalSetting cluster setting. Gc *time.Duration @@ -117,14 +123,14 @@ func NewTestingKnobsWithShortIntervals() *TestingKnobs { defaultShortInterval *= 5 } return NewTestingKnobsWithIntervals( - defaultShortInterval, defaultShortInterval, defaultShortInterval, defaultShortInterval, + defaultShortInterval, defaultShortInterval, defaultShortInterval, defaultShortInterval, defaultShortInterval, ) } // NewTestingKnobsWithIntervals return a TestingKnobs structure with overrides // for adopt and cancel intervals. func NewTestingKnobsWithIntervals( - adopt, cancel, initialDelay, maxDelay time.Duration, + adopt, cancel, initialDelay, maxDelay time.Duration, collectMetrics time.Duration, ) *TestingKnobs { return &TestingKnobs{ IntervalOverrides: TestingIntervalOverrides{ @@ -132,6 +138,7 @@ func NewTestingKnobsWithIntervals( Cancel: &cancel, RetryInitialDelay: &initialDelay, RetryMaxDelay: &maxDelay, + PollMetrics: &collectMetrics, }, } } diff --git a/pkg/jobs/update.go b/pkg/jobs/update.go index 575e5cae4e7b..397cb2dba0d9 100644 --- a/pkg/jobs/update.go +++ b/pkg/jobs/update.go @@ -128,9 +128,9 @@ func UpdateHighwaterProgressed(highWater hlc.Timestamp, md JobMetadata, ju *JobU // if md.Status != StatusRunning { // return errors.New("job no longer running") // } -// md.UpdateStatus(StatusPaused) +// ju.UpdateStatus(StatusPaused) // // -// md.UpdatePayload(md.Payload) +// ju.UpdatePayload(md.Payload) // } // // Note that there are various convenience wrappers (like FractionProgressed) diff --git a/pkg/jobs/utils.go b/pkg/jobs/utils.go index 932ca7567252..ab4e4bc11a41 100644 --- a/pkg/jobs/utils.go +++ b/pkg/jobs/utils.go @@ -20,6 +20,8 @@ import ( "github.com/cockroachdb/errors" ) +const jobExistsOp = "get-jobs" + // RunningJobExists checks that whether there are any other jobs (matched by // payloadPredicate callback) in the pending, running, or paused status that // started earlier than the job with provided jobID. @@ -43,7 +45,7 @@ ORDER BY created` it, err := ie.QueryIterator( ctx, - "get-jobs", + jobExistsOp, txn, stmt, ) @@ -73,3 +75,46 @@ ORDER BY created` } return false /* exists */, err } + +// JobExists checks that whether there are any job records which exist that match +// the payloadPredicate callback. +func JobExists( + ctx context.Context, + ie sqlutil.InternalExecutor, + txn *kv.Txn, + payloadPredicate func(payload *jobspb.Payload) bool, +) (exists bool, retErr error) { + const stmt = ` +SELECT + id, payload +FROM + system.jobs +ORDER BY created` + + it, err := ie.QueryIterator( + ctx, + jobExistsOp, + txn, + stmt, + ) + if err != nil { + return false /* exists */, err + } + // We have to make sure to close the iterator since we might return from the + // for loop early (before Next() returns false). + defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }() + + var ok bool + for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { + row := it.Cur() + payload, err := UnmarshalPayload(row[1]) + if err != nil { + return false /* exists */, err + } + + if payloadPredicate(payload) { + return true /* exists */, nil /* retErr */ + } + } + return false /* exists */, err +} diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index dca9a2186d7b..b60b776fe81a 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -128,6 +128,7 @@ go_library( "inverted_join.go", "job_exec_context.go", "job_exec_context_test_util.go", + "job_statistics.go", "join.go", "join_predicate.go", "join_token.go", diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index bf0a85386136..06917f8eeb9a 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -6162,6 +6162,11 @@ func TestImportPgDumpSchemas(t *testing.T) { return base.TestServerArgs{ Settings: s, ExternalIODir: baseDir, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: &jobs.TestingKnobs{ + DisableJobsMetricsPolling: true, + }, + }, } } diff --git a/pkg/sql/job_statistics.go b/pkg/sql/job_statistics.go new file mode 100644 index 000000000000..87e434c787ee --- /dev/null +++ b/pkg/sql/job_statistics.go @@ -0,0 +1,62 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" +) + +type metricsPoller struct { + job *jobs.Job +} + +var _ jobs.Resumer = &metricsPoller{} + +var metricsPollerRetryOptions = retry.Options{ + InitialBackoff: 5 * time.Millisecond, + Multiplier: 2, + MaxBackoff: 10 * time.Second, +} + +// OnFailOrCancel is a part of the Resumer interface. +func (mp *metricsPoller) OnFailOrCancel( + ctx context.Context, execCtx interface{}, jobErr error, +) error { + if jobErr != nil { + log.Errorf(ctx, "failed polled stats job: %v", jobErr) + } + + return nil +} + +// Resume is part of the Resumer interface. +func (mp *metricsPoller) Resume(ctx context.Context, execCtx interface{}) error { + exec := execCtx.(JobExecContext) + var e error + for r := retry.StartWithCtx(ctx, metricsPollerRetryOptions); r.Next(); { + e = exec.ExecCfg().InternalExecutor.s.cfg.JobRegistry.PollMetricsTask(ctx) + } + return e +} + +func init() { + createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + return &metricsPoller{job: job} + } + jobs.RegisterConstructor(jobspb.TypePollJobsStats, createResumerFn, jobs.UsesTenantCostControl) +} diff --git a/pkg/sql/logictest/testdata/logic_test/drop_database b/pkg/sql/logictest/testdata/logic_test/drop_database index 60a4f145322c..ad3a84d1bfc7 100644 --- a/pkg/sql/logictest/testdata/logic_test/drop_database +++ b/pkg/sql/logictest/testdata/logic_test/drop_database @@ -42,7 +42,7 @@ test root NULL NULL {} NULL skipif config local-legacy-schema-changer query TT -SELECT job_type, status FROM [SHOW JOBS] WHERE user_name = 'root' +SELECT job_type, status FROM [SHOW JOBS] WHERE user_name = 'root' AND (job_type = 'SCHEMA CHANGE' OR job_type = 'NEW SCHEMA CHANGE') ---- SCHEMA CHANGE succeeded SCHEMA CHANGE succeeded diff --git a/pkg/sql/logictest/testdata/logic_test/jobs b/pkg/sql/logictest/testdata/logic_test/jobs index b88a951e1892..8f12b8c5765b 100644 --- a/pkg/sql/logictest/testdata/logic_test/jobs +++ b/pkg/sql/logictest/testdata/logic_test/jobs @@ -188,3 +188,9 @@ usage_count > 0 ORDER BY feature_name DESC ---- job.schema_change.successful + +# Ensure one POLL JOBS STATS is running +query I +SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'POLL JOBS STATS' AND status = 'running' +---- +1 diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 7f30ce857d39..4c869d9d003d 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -449,7 +449,6 @@ func (tc *TestCluster) Start(t testing.TB) { // Create a closer that will stop the individual server stoppers when the // cluster stopper is stopped. tc.stopper.AddCloser(stop.CloserFn(func() { tc.stopServers(context.TODO()) })) - if tc.clusterArgs.ReplicationMode == base.ReplicationAuto { if err := tc.WaitForFullReplication(); err != nil { t.Fatal(err) diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index dcd1005589b4..1f799b04e05b 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -3285,6 +3285,7 @@ var charts = []sectionDescription{ "jobs.auto_span_config_reconciliation.currently_running", "jobs.auto_sql_stats_compaction.currently_running", "jobs.stream_replication.currently_running", + "jobs.poll_jobs_stats.currently_running", }, }, { @@ -3305,6 +3306,30 @@ var charts = []sectionDescription{ "jobs.stream_ingestion.currently_idle", "jobs.stream_replication.currently_idle", "jobs.typedesc_schema_change.currently_idle", + "jobs.poll_jobs_stats.currently_idle", + }, + }, + { + Title: "Currently Paused", + Metrics: []string{ + "jobs.auto_create_stats.currently_paused", + "jobs.auto_span_config_reconciliation.currently_paused", + "jobs.auto_sql_stats_compaction.currently_paused", + "jobs.backup.currently_paused", + "jobs.changefeed.currently_paused", + "jobs.create_stats.currently_paused", + "jobs.import.currently_paused", + "jobs.migration.currently_paused", + "jobs.new_schema_change.currently_paused", + "jobs.restore.currently_paused", + "jobs.schema_change.currently_paused", + "jobs.schema_change_gc.currently_paused", + "jobs.stream_ingestion.currently_paused", + "jobs.stream_replication.currently_paused", + "jobs.typedesc_schema_change.currently_paused", + "jobs.auto_schema_telemetry.currently_paused", + "jobs.row_level_ttl.currently_paused", + "jobs.poll_jobs_stats.currently_paused", }, }, { @@ -3481,6 +3506,17 @@ var charts = []sectionDescription{ "jobs.auto_sql_stats_compaction.resume_retry_error", }, }, + { + Title: "Jobs Stats Polling Job", + Metrics: []string{ + "jobs.poll_jobs_stats.fail_or_cancel_completed", + "jobs.poll_jobs_stats.fail_or_cancel_failed", + "jobs.poll_jobs_stats.fail_or_cancel_retry_error", + "jobs.poll_jobs_stats.resume_completed", + "jobs.poll_jobs_stats.resume_failed", + "jobs.poll_jobs_stats.resume_retry_error", + }, + }, }, }, {