Skip to content

Commit

Permalink
jobs/cdc: add metrics for paused jobs
Browse files Browse the repository at this point in the history
This change adds new metrics to count paused jobs for every job type. For
example, the metric for paused changefeed jobs is
`jobs.changefeed.currently_paused`. These metrics are counted at an
interval defined by the cluster setting `jobs.metrics.interval.poll`.

This is implemented by a job which periodically queries `crdb_internal.jobs`
to count the number of paused jobs. This job is of the newly added type
`jobspb.TypePollJobsStats`. When a node starts it's job registry, it will
create an adoptable stats polling job if it does not exist already using a
transaction.

This change adds a test which pauses and resumes changefeeds while asserting
the value of the `jobs.changefeed.currently_paused` metric. It also adds a
logictest to ensure one instance of the stats polling job is created in a
cluster. Finally, this change updates existing tests to handle the fact that
there is a new job always running in the background (since a lot of tests
assert the state of the jobs table, having a new job can change test results).

Informs: #90453
This change adds a virtual index to the `crdb_internal.jobs` table so that
querying for paused jobs requires less work.

Resolves: #85467

Release note (general change): This change adds new metrics to count
paused jobs for every job type. For example, the metric for paused
changefeed jobs is `jobs.changefeed.currently_paused`. These metrics
are incremented at an interval defined by the cluster setting
`jobs.metrics.interval.poll`.
  • Loading branch information
jayshrivastava committed Oct 31, 2022
1 parent 421aff1 commit 3082ac7
Show file tree
Hide file tree
Showing 26 changed files with 523 additions and 40 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ ALL_TESTS = [
"//pkg/internal/sqlsmith:sqlsmith_test",
"//pkg/internal/team:team_test",
"//pkg/jobs/joberror:joberror_test",
"//pkg/jobs/jobspb:jobspb_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs:jobs_test",
"//pkg/keys:keys_test",
Expand Down Expand Up @@ -1082,6 +1083,7 @@ GO_TARGETS = [
"//pkg/jobs/joberror:joberror",
"//pkg/jobs/joberror:joberror_test",
"//pkg/jobs/jobspb:jobspb",
"//pkg/jobs/jobspb:jobspb_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs/jobstest:jobstest",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ func TestBackupAndRestoreJobDescription(t *testing.T) {
asOf1 := strings.TrimPrefix(matches[1], "/full")

sqlDB.CheckQueryResults(
t, "SELECT description FROM [SHOW JOBS] WHERE status != 'failed'",
t, "SELECT description FROM [SHOW JOBS] WHERE status != 'failed' AND description LIKE 'BACKUP%'",
[][]string{
{fmt.Sprintf("BACKUP TO ('%s', '%s', '%s')", backups[0].(string), backups[1].(string),
backups[2].(string))},
Expand Down Expand Up @@ -5573,7 +5573,7 @@ func TestBackupRestoreShowJob(t *testing.T) {
// TODO (lucy): Update this if/when we decide to change how these jobs queued by
// the startup migration are handled.
sqlDB.CheckQueryResults(
t, "SELECT description FROM [SHOW JOBS] WHERE description != 'updating privileges' ORDER BY description",
t, "SELECT description FROM [SHOW JOBS] WHERE description != 'updating privileges' AND description != 'POLL JOBS STATS' ORDER BY description",
[][]string{
{"BACKUP DATABASE data TO 'nodelocal://0/foo' WITH revision_history = true"},
{"RESTORE TABLE data.bank FROM 'nodelocal://0/foo' WITH into_db = 'data 2', skip_missing_foreign_keys"},
Expand Down
45 changes: 45 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,51 @@ func TestChangefeedIdleness(t *testing.T) {
}, feedTestEnterpriseSinks)
}

func TestChangefeedPausedMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
registry := s.Server.JobRegistry().(*jobs.Registry)
currentlyPaused := registry.MetricsStruct().JobMetrics[jobspb.TypeChangefeed].CurrentlyPaused
waitForPausedCount := func(numIdle int64) {
testutils.SucceedsSoon(t, func() error {
if currentlyPaused.Value() != numIdle {
return fmt.Errorf("expected (%+v) paused changefeeds, found (%+v)", numIdle, currentlyPaused.Value())
}
return nil
})
}

sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `CREATE TABLE bar (b INT PRIMARY KEY)`)
cf1 := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo")
cf2 := feed(t, f, "CREATE CHANGEFEED FOR TABLE bar")
sqlDB.Exec(t, `INSERT INTO foo VALUES (0)`)
sqlDB.Exec(t, `INSERT INTO bar VALUES (0)`)
waitForPausedCount(0)

jobFeed1 := cf1.(cdctest.EnterpriseTestFeed)
jobFeed2 := cf2.(cdctest.EnterpriseTestFeed)

require.NoError(t, jobFeed1.Pause())
waitForPausedCount(1)

require.NoError(t, jobFeed1.Resume())
waitForPausedCount(0)

require.NoError(t, jobFeed1.Pause())
require.NoError(t, jobFeed2.Pause())
waitForPausedCount(2)

closeFeed(t, cf1)
waitForPausedCount(1)
closeFeed(t, cf2)
waitForPausedCount(0)
}, feedTestEnterpriseSinks)
}

// TestChangefeedSendError validates that SendErrors do not fail the changefeed
// as they can occur in normal situations such as a cluster update
func TestChangefeedSendError(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/testdata/doctor/test_examine_cluster
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ debug doctor examine cluster
debug doctor examine cluster
Examining 47 descriptors and 46 namespace entries...
ParentID 100, ParentSchemaID 101: relation "foo" (105): expected matching namespace entry, found none
Examining 4 jobs...
Examining 5 jobs...
ERROR: validation failed
2 changes: 1 addition & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (r *Registry) runJob(
r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err)
r.maybeRecordExecutionFailure(ctx, err, job)
if r.knobs.AfterJobStateMachine != nil {
r.knobs.AfterJobStateMachine()
r.knobs.AfterJobStateMachine(job)
}
return err
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/jobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
executionErrorsMaxEntriesKey = "jobs.execution_errors.max_entries"
executionErrorsMaxEntrySizeKey = "jobs.execution_errors.max_entry_size"
debugPausePointsSettingKey = "jobs.debug.pausepoints"
metricsPollingIntervalKey = "jobs.metrics.interval.poll"
)

const (
Expand Down Expand Up @@ -70,6 +71,9 @@ const (
// error. If this size is exceeded, the error will be formatted as a string
// and then truncated to fit the size.
defaultExecutionErrorsMaxEntrySize = 64 << 10 // 64 KiB

// defaultPollForMetricsInterval is the default interval to poll the jobs table for metrics.
defaultPollForMetricsInterval = 10 * time.Second
)

var (
Expand Down Expand Up @@ -100,6 +104,14 @@ var (
settings.PositiveDuration,
)

pollJobsMetricsInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
metricsPollingIntervalKey,
"the interval at which a node in the cluster will poll the jobs table for metrics",
defaultPollForMetricsInterval,
settings.PositiveDuration,
)

gcIntervalSetting = settings.RegisterDurationSetting(
settings.TenantWritable,
gcIntervalSettingKey,
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/delegate_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) {
argsFn := func(args *base.TestServerArgs) {
// Prevent registry from changing job state while running this test.
interval := 24 * time.Hour
args.Knobs.JobsTestingKnobs = NewTestingKnobsWithIntervals(interval, interval, interval, interval)
args.Knobs.JobsTestingKnobs = NewTestingKnobsWithIntervals(interval, interval, interval, interval, interval)
}
th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables, argsFn)
defer cleanup()
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestJobControlByType(t *testing.T) {
argsFn := func(args *base.TestServerArgs) {
// Prevent registry from changing job state while running this test.
interval := 24 * time.Hour
args.Knobs.JobsTestingKnobs = NewTestingKnobsWithIntervals(interval, interval, interval, interval)
args.Knobs.JobsTestingKnobs = NewTestingKnobsWithIntervals(interval, interval, interval, interval, interval)
}
th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables, argsFn)
defer cleanup()
Expand Down
42 changes: 30 additions & 12 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ type registryTestSuite struct {

// afterJobStateMachine is invoked in the AfterJobStateMachine testing knob if
// non-nil.
afterJobStateMachine func()
afterJobStateMachine func(job *jobs.Job)

// Instead of a ch for success, use a variable because it can retry since it
// is in a transaction.
Expand Down Expand Up @@ -215,9 +215,10 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
}
return nil
}
knobs.AfterJobStateMachine = func() {
//knobs.DisableJobsMetricsPolling = true
knobs.AfterJobStateMachine = func(job *jobs.Job) {
if rts.afterJobStateMachine != nil {
rts.afterJobStateMachine()
rts.afterJobStateMachine(job)
}
}
args.Knobs.JobsTestingKnobs = knobs
Expand All @@ -237,7 +238,10 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
rts.sqlDB = sqlutils.MakeSQLRunner(rts.outerDB)
rts.registry = rts.s.JobRegistry().(*jobs.Registry)
rts.done = make(chan struct{})
rts.mockJob = jobs.Record{Details: jobspb.ImportDetails{}, Progress: jobspb.ImportProgress{}}
rts.mockJob = jobs.Record{
Details: jobspb.ImportDetails{}, Progress: jobspb.ImportProgress{},
Username: username.MakeSQLUsernameFromPreNormalizedString("test"),
}

rts.resumeCh = make(chan error)
rts.progressCh = make(chan struct{})
Expand Down Expand Up @@ -1013,8 +1017,11 @@ func TestRegistryLifecycle(t *testing.T) {

t.Run("dump traces on pause-unpause-success", func(t *testing.T) {
completeCh := make(chan struct{})
rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func() {
completeCh <- struct{}{}
rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func(job *jobs.Job) {
pl := job.Payload()
if (&pl).Type() == jobspb.TypeImport {
completeCh <- struct{}{}
}
}}
rts.setUp(t)
defer rts.tearDown()
Expand Down Expand Up @@ -1062,8 +1069,11 @@ func TestRegistryLifecycle(t *testing.T) {

t.Run("dump traces on fail", func(t *testing.T) {
completeCh := make(chan struct{})
rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func() {
completeCh <- struct{}{}
rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func(job *jobs.Job) {
pl := job.Payload()
if (&pl).Type() == jobspb.TypeImport {
completeCh <- struct{}{}
}
}}
rts.setUp(t)
defer rts.tearDown()
Expand Down Expand Up @@ -1105,8 +1115,11 @@ func TestRegistryLifecycle(t *testing.T) {

t.Run("dump traces on cancel", func(t *testing.T) {
completeCh := make(chan struct{})
rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func() {
completeCh <- struct{}{}
rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func(job *jobs.Job) {
pl := job.Payload()
if (&pl).Type() == jobspb.TypeImport {
completeCh <- struct{}{}
}
}}
rts.setUp(t)
defer rts.tearDown()
Expand Down Expand Up @@ -2326,7 +2339,7 @@ func TestJobInTxn(t *testing.T) {

// Set the adoption interval to be very long to test the adoption channel.
args := base.TestServerArgs{Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithIntervals(time.Hour, time.Hour, time.Hour, time.Hour)},
JobsTestingKnobs: jobs.NewTestingKnobsWithIntervals(time.Hour, time.Hour, time.Hour, time.Hour, time.Hour)},
}
ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, args)
Expand Down Expand Up @@ -2886,6 +2899,7 @@ func TestMetrics(t *testing.T) {
DescriptorIDs: []descpb.ID{1},
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
Username: username.TestUserName(),
}
_, err := registry.CreateAdoptableJobWithTxn(ctx, rec, registry.MakeJobID(), nil /* txn */)
require.NoError(t, err)
Expand All @@ -2902,6 +2916,7 @@ func TestMetrics(t *testing.T) {
DescriptorIDs: []descpb.ID{1},
Details: jobspb.ImportDetails{},
Progress: jobspb.ImportProgress{},
Username: username.TestUserName(),
}
importMetrics := registry.MetricsStruct().JobMetrics[jobspb.TypeImport]

Expand Down Expand Up @@ -2958,6 +2973,7 @@ func TestMetrics(t *testing.T) {
DescriptorIDs: []descpb.ID{1},
Details: jobspb.ImportDetails{},
Progress: jobspb.ImportProgress{},
Username: username.TestUserName(),
}
importMetrics := registry.MetricsStruct().JobMetrics[jobspb.TypeImport]

Expand Down Expand Up @@ -2993,6 +3009,7 @@ func TestMetrics(t *testing.T) {
DescriptorIDs: []descpb.ID{1},
Details: jobspb.ImportDetails{},
Progress: jobspb.ImportProgress{},
Username: username.TestUserName(),
}
importMetrics := registry.MetricsStruct().JobMetrics[jobspb.TypeImport]

Expand Down Expand Up @@ -3047,7 +3064,7 @@ func TestLoseLeaseDuringExecution(t *testing.T) {
defer jobs.ResetConstructors()()

// Disable the loops from messing with the job execution.
knobs := base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithIntervals(time.Hour, time.Hour, time.Hour, time.Hour)}
knobs := base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithIntervals(time.Hour, time.Hour, time.Hour, time.Hour, time.Hour)}

ctx := context.Background()

Expand Down Expand Up @@ -3142,6 +3159,7 @@ func TestPauseReason(t *testing.T) {
DescriptorIDs: []descpb.ID{1},
Details: jobspb.ImportDetails{},
Progress: jobspb.ImportProgress{},
Username: username.TestUserName(),
}
tdb := sqlutils.MakeSQLRunner(db)

Expand Down
12 changes: 11 additions & 1 deletion pkg/jobs/jobspb/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "jobspb",
Expand Down Expand Up @@ -60,4 +60,14 @@ go_proto_library(
],
)

go_test(
name = "jobspb_test",
srcs = ["wrap_test.go"],
args = ["-test.timeout=295s"],
deps = [
":jobspb",
"@com_github_stretchr_testify//assert",
],
)

get_x_data(name = "get_x_data")
11 changes: 11 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,12 @@ message SchemaTelemetryDetails {
message SchemaTelemetryProgress {
}

message PollJobsStatsDetails {
}

message PollJobsStatsProgress {
}

message Payload {
string description = 1;
// If empty, the description is assumed to be the statement.
Expand Down Expand Up @@ -1112,6 +1118,9 @@ message Payload {
// and publish it to the telemetry event log. These jobs are typically
// created by a built-in schedule named "sql-schema-telemetry".
SchemaTelemetryDetails schema_telemetry = 37;
// PollJobsStats jobs poll the jobs table for statistics metrics as the number of
// paused jobs.
PollJobsStatsDetails poll_jobs_stats = 38;
}
reserved 26;
// PauseReason is used to describe the reason that the job is currently paused
Expand Down Expand Up @@ -1173,6 +1182,7 @@ message Progress {
StreamReplicationProgress streamReplication = 24;
RowLevelTTLProgress row_level_ttl = 25 [(gogoproto.customname)="RowLevelTTL"];
SchemaTelemetryProgress schema_telemetry = 26;
PollJobsStatsProgress pollJobsStats = 27;
}

uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"];
Expand Down Expand Up @@ -1202,6 +1212,7 @@ enum Type {
STREAM_REPLICATION = 15 [(gogoproto.enumvalue_customname) = "TypeStreamReplication"];
ROW_LEVEL_TTL = 16 [(gogoproto.enumvalue_customname) = "TypeRowLevelTTL"];
AUTO_SCHEMA_TELEMETRY = 17 [(gogoproto.enumvalue_customname) = "TypeAutoSchemaTelemetry"];
POLL_JOBS_STATS = 18 [(gogoproto.enumvalue_customname) = "TypePollJobsStats"];
}

message Job {
Expand Down
Loading

0 comments on commit 3082ac7

Please sign in to comment.