Skip to content

Commit

Permalink
jobs: add metrics for paused jobs
Browse files Browse the repository at this point in the history
This change adds new metrics to count paused jobs for every job type. For
example, the metric for paused changefeed jobs is
`jobs.changefeed.currently_paused`. These metrics are counted at an
interval defined by the cluster setting `jobs.metrics.interval.poll`.

This is implemented by a job which periodically queries `system.jobs`
to count the number of paused jobs. This job is of the newly added type
`jobspb.TypePollJobsStats`. When a node starts it's job registry, it will
create an adoptable stats polling job if it does not exist already using a
transaction.

This change adds a test which pauses and resumes changefeeds while asserting
the value of the `jobs.changefeed.currently_paused` metric. It also adds a
logictest to ensure one instance of the stats polling job is created in a
cluster.

Resolves: #85467

Release note (general change): This change adds new metrics to count
paused jobs for every job type. For example, the metric for paused
changefeed jobs is `jobs.changefeed.currently_paused`. These metrics
are updated at an interval defined by the cluster setting
`jobs.metrics.interval.poll`, which is defauled to 10 seconds.

Epic: None
  • Loading branch information
jayshrivastava committed Feb 6, 2023
1 parent e75ede3 commit fb89d23
Show file tree
Hide file tree
Showing 27 changed files with 607 additions and 23 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.2-36 set the active cluster version in the format '<major>.<minor>'
version version 1000022.2-38 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,6 @@
<tr><td><div id="setting-trace-opentelemetry-collector" class="anchored"><code>trace.opentelemetry.collector</code></div></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 4317 will be used.</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-36</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-38</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
</tbody>
</table>
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ ALL_TESTS = [
"//pkg/internal/team:team_test",
"//pkg/jobs/joberror:joberror_test",
"//pkg/jobs/jobsauth:jobsauth_test",
"//pkg/jobs/jobspb:jobspb_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs:jobs_test",
"//pkg/keys:keys_test",
Expand Down Expand Up @@ -1123,6 +1124,7 @@ GO_TARGETS = [
"//pkg/jobs/jobsauth:jobsauth",
"//pkg/jobs/jobsauth:jobsauth_test",
"//pkg/jobs/jobspb:jobspb",
"//pkg/jobs/jobspb:jobspb_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs/jobstest:jobstest",
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/testdata/doctor/test_examine_cluster
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ debug doctor examine cluster
debug doctor examine cluster
Examining 53 descriptors and 52 namespace entries...
ParentID 100, ParentSchemaID 101: relation "foo" (105): expected matching namespace entry, found none
Examining 12 jobs...
Examining 14 jobs...
ERROR: validation failed
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,10 @@ const (

V23_1_DeleteDroppedFunctionDescriptors

// V23_1_CreateJobsMetricsPollingJob creates the permanent job
// responsible for polling the jobs table for metrics.
V23_1_CreateJobsMetricsPollingJob

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -711,6 +715,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_1_DeleteDroppedFunctionDescriptors,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 36},
},
{
Key: V23_1_CreateJobsMetricsPollingJob,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 38},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
15 changes: 15 additions & 0 deletions pkg/jobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
executionErrorsMaxEntriesKey = "jobs.execution_errors.max_entries"
executionErrorsMaxEntrySizeKey = "jobs.execution_errors.max_entry_size"
debugPausePointsSettingKey = "jobs.debug.pausepoints"
metricsPollingIntervalKey = "jobs.metrics.interval.poll"
)

const (
Expand Down Expand Up @@ -70,6 +71,10 @@ const (
// error. If this size is exceeded, the error will be formatted as a string
// and then truncated to fit the size.
defaultExecutionErrorsMaxEntrySize = 64 << 10 // 64 KiB

// defaultPollForMetricsInterval is the default interval to poll the jobs
// table for metrics.
defaultPollForMetricsInterval = 10 * time.Second
)

var (
Expand Down Expand Up @@ -100,6 +105,16 @@ var (
settings.PositiveDuration,
)

// PollJobsMetricsInterval is the interval at which a tenant in the cluster
// will poll the jobs table for metrics
PollJobsMetricsInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
metricsPollingIntervalKey,
"the interval at which a node in the cluster will poll the jobs table for metrics",
defaultPollForMetricsInterval,
settings.PositiveDuration,
)

gcIntervalSetting = settings.RegisterDurationSetting(
settings.TenantWritable,
gcIntervalSettingKey,
Expand Down
102 changes: 101 additions & 1 deletion pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
ManagerDisableJobCreation: true,
}
args.Knobs.UpgradeManager = &upgradebase.TestingKnobs{
DontUseJobs: true,
DontUseJobs: true,
SkipJobMetricsPollingJobBootstrap: true,
}
args.Knobs.KeyVisualizer = &keyvisualizer.TestingKnobs{SkipJobBootstrap: true}

Expand Down Expand Up @@ -3456,3 +3457,102 @@ func TestPausepoints(t *testing.T) {
})
}
}

func TestPausedMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
})
defer s.Stopper().Stop(ctx)

jobs.PollJobsMetricsInterval.Override(ctx, &s.ClusterSettings().SV, 10*time.Millisecond)

runner := sqlutils.MakeSQLRunner(sqlDB)
reg := s.JobRegistry().(*jobs.Registry)

waitForPausedCount := func(typ jobspb.Type, numPaused int64) {
testutils.SucceedsSoon(t, func() error {
currentlyPaused := reg.MetricsStruct().JobMetrics[typ].CurrentlyPaused.Value()
if reg.MetricsStruct().JobMetrics[typ].CurrentlyPaused.Value() != numPaused {
return fmt.Errorf(
"expected (%+v) paused jobs of type (%+v), found (%+v)",
numPaused,
typ,
currentlyPaused,
)
}
return nil
})
}

typeToRecord := map[jobspb.Type]jobs.Record{
jobspb.TypeChangefeed: {
Details: jobspb.ChangefeedDetails{},
Progress: jobspb.ChangefeedProgress{},
Username: username.TestUserName(),
},
jobspb.TypeImport: {
Details: jobspb.ImportDetails{},
Progress: jobspb.ImportProgress{},
Username: username.TestUserName(),
},
jobspb.TypeSchemaChange: {
Details: jobspb.SchemaChangeDetails{},
Progress: jobspb.SchemaChangeProgress{},
Username: username.TestUserName(),
},
}
for typ := range typeToRecord {
jobs.RegisterConstructor(typ, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
},
}
}, jobs.UsesTenantCostControl)
}

makeJob := func(ctx context.Context,
typ jobspb.Type,
) *jobs.StartableJob {
j, err := jobs.TestingCreateAndStartJob(ctx, reg, s.InternalDB().(isql.DB), typeToRecord[typ])
if err != nil {
t.Fatal(err)
}
return j
}

cfJob := makeJob(context.Background(), jobspb.TypeChangefeed)
cfJob2 := makeJob(context.Background(), jobspb.TypeChangefeed)
importJob := makeJob(context.Background(), jobspb.TypeImport)
scJob := makeJob(context.Background(), jobspb.TypeSchemaChange)

// Pause all job types.
runner.Exec(t, "PAUSE JOB $1", cfJob.ID())
waitForPausedCount(jobspb.TypeChangefeed, 1)
runner.Exec(t, "PAUSE JOB $1", cfJob2.ID())
waitForPausedCount(jobspb.TypeChangefeed, 2)
runner.Exec(t, "PAUSE JOB $1", importJob.ID())
waitForPausedCount(jobspb.TypeImport, 1)
runner.Exec(t, "PAUSE JOB $1", scJob.ID())
waitForPausedCount(jobspb.TypeSchemaChange, 1)

// Resume / cancel jobs.
runner.Exec(t, "RESUME JOB $1", cfJob.ID())
waitForPausedCount(jobspb.TypeChangefeed, 1)
runner.Exec(t, "CANCEL JOB $1", cfJob2.ID())
waitForPausedCount(jobspb.TypeChangefeed, 0)
runner.Exec(t, "RESUME JOB $1", importJob.ID())
waitForPausedCount(jobspb.TypeImport, 0)
runner.Exec(t, "CANCEL JOB $1", scJob.ID())
waitForPausedCount(jobspb.TypeSchemaChange, 0)

runner.Exec(t, "CANCEL JOB $1", cfJob.ID())
runner.Exec(t, "CANCEL JOB $1", importJob.ID())
}
12 changes: 11 additions & 1 deletion pkg/jobs/jobspb/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "jobspb",
Expand Down Expand Up @@ -64,4 +64,14 @@ go_proto_library(
],
)

go_test(
name = "jobspb_test",
srcs = ["wrap_test.go"],
args = ["-test.timeout=295s"],
deps = [
":jobspb",
"@com_github_stretchr_testify//assert",
],
)

get_x_data(name = "get_x_data")
13 changes: 13 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,12 @@ message SchemaTelemetryDetails {
message SchemaTelemetryProgress {
}

message PollJobsStatsDetails {
}

message PollJobsStatsProgress {
}

message Payload {
string description = 1;
// If empty, the description is assumed to be the statement.
Expand Down Expand Up @@ -1200,7 +1206,12 @@ message Payload {
// and publish it to the telemetry event log. These jobs are typically
// created by a built-in schedule named "sql-schema-telemetry".
SchemaTelemetryDetails schema_telemetry = 37;

KeyVisualizerDetails keyVisualizerDetails = 38;

// PollJobsStats jobs poll the jobs table for statistics metrics as the number of
// paused jobs.
PollJobsStatsDetails poll_jobs_stats = 39;
}
reserved 26;
// PauseReason is used to describe the reason that the job is currently paused
Expand Down Expand Up @@ -1263,6 +1274,7 @@ message Progress {
RowLevelTTLProgress row_level_ttl = 25 [(gogoproto.customname)="RowLevelTTL"];
SchemaTelemetryProgress schema_telemetry = 26;
KeyVisualizerProgress keyVisualizerProgress = 27;
PollJobsStatsProgress pollJobsStats = 28;
}

uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"];
Expand Down Expand Up @@ -1293,6 +1305,7 @@ enum Type {
ROW_LEVEL_TTL = 16 [(gogoproto.enumvalue_customname) = "TypeRowLevelTTL"];
AUTO_SCHEMA_TELEMETRY = 17 [(gogoproto.enumvalue_customname) = "TypeAutoSchemaTelemetry"];
KEY_VISUALIZER = 18 [(gogoproto.enumvalue_customname) = "TypeKeyVisualizer"];
POLL_JOBS_STATS = 19 [(gogoproto.enumvalue_customname) = "TypePollJobsStats"];
}

message Job {
Expand Down
25 changes: 24 additions & 1 deletion pkg/jobs/jobspb/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ var AutomaticJobTypes = [...]Type{
TypeAutoSpanConfigReconciliation,
TypeAutoSQLStatsCompaction,
TypeAutoSchemaTelemetry,
TypePollJobsStats,
}

// DetailsType returns the type for a payload detail.
Expand Down Expand Up @@ -188,6 +189,8 @@ func DetailsType(d isPayload_Details) (Type, error) {
return TypeAutoSchemaTelemetry, nil
case *Payload_KeyVisualizerDetails:
return TypeKeyVisualizer, nil
case *Payload_PollJobsStats:
return TypePollJobsStats, nil
default:
return TypeUnspecified, errors.Newf("Payload.Type called on a payload with an unknown details type: %T", d)
}
Expand Down Expand Up @@ -227,6 +230,7 @@ var JobDetailsForEveryJobType = map[Type]Details{
TypeRowLevelTTL: RowLevelTTLDetails{},
TypeAutoSchemaTelemetry: SchemaTelemetryDetails{},
TypeKeyVisualizer: KeyVisualizerDetails{},
TypePollJobsStats: PollJobsStatsDetails{},
}

// WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper
Expand Down Expand Up @@ -272,6 +276,8 @@ func WrapProgressDetails(details ProgressDetails) interface {
return &Progress_SchemaTelemetry{SchemaTelemetry: &d}
case KeyVisualizerProgress:
return &Progress_KeyVisualizerProgress{KeyVisualizerProgress: &d}
case PollJobsStatsProgress:
return &Progress_PollJobsStats{PollJobsStats: &d}
default:
panic(errors.AssertionFailedf("WrapProgressDetails: unknown progress type %T", d))
}
Expand Down Expand Up @@ -315,6 +321,8 @@ func (p *Payload) UnwrapDetails() Details {
return *d.SchemaTelemetry
case *Payload_KeyVisualizerDetails:
return *d.KeyVisualizerDetails
case *Payload_PollJobsStats:
return *d.PollJobsStats
default:
return nil
}
Expand Down Expand Up @@ -358,6 +366,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails {
return *d.SchemaTelemetry
case *Progress_KeyVisualizerProgress:
return *d.KeyVisualizerProgress
case *Progress_PollJobsStats:
return *d.PollJobsStats
default:
return nil
}
Expand All @@ -371,6 +381,17 @@ func (t Type) String() string {
return strings.Replace(Type_name[int32(t)], "_", " ", -1)
}

// TypeFromString is used to get the type corresponding to the string s
// where s := Type.String().
func TypeFromString(s string) (Type, error) {
s = strings.Replace(s, " ", "_", -1)
t, ok := Type_value[s]
if !ok {
return TypeUnspecified, errors.New("invalid type string")
}
return Type(t), nil
}

// WrapPayloadDetails wraps a Details object in the protobuf wrapper struct
// necessary to make it usable as the Details field of a Payload.
//
Expand Down Expand Up @@ -414,6 +435,8 @@ func WrapPayloadDetails(details Details) interface {
return &Payload_SchemaTelemetry{SchemaTelemetry: &d}
case KeyVisualizerDetails:
return &Payload_KeyVisualizerDetails{KeyVisualizerDetails: &d}
case PollJobsStatsDetails:
return &Payload_PollJobsStats{PollJobsStats: &d}
default:
panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d))
}
Expand Down Expand Up @@ -449,7 +472,7 @@ const (
func (Type) SafeValue() {}

// NumJobTypes is the number of jobs types.
const NumJobTypes = 19
const NumJobTypes = 20

// ChangefeedDetailsMarshaler allows for dependency injection of
// cloud.SanitizeExternalStorageURI to avoid the dependency from this
Expand Down
30 changes: 30 additions & 0 deletions pkg/jobs/jobspb/wrap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package jobspb_test

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/stretchr/testify/assert"
)

func TestTypeString(t *testing.T) {
for i := 0; i < jobspb.NumJobTypes; i++ {
typ := jobspb.Type(i)
typStr := typ.String()
convertedType, err := jobspb.TypeFromString(typStr)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, convertedType, typ)
}
}
Loading

0 comments on commit fb89d23

Please sign in to comment.