Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs/cdc: add metrics for paused jobs #89752

Merged
merged 1 commit into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.2-36 set the active cluster version in the format '<major>.<minor>'
version version 1000022.2-38 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,6 @@
<tr><td><div id="setting-trace-opentelemetry-collector" class="anchored"><code>trace.opentelemetry.collector</code></div></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 4317 will be used.</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-36</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-38</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
</tbody>
</table>
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ ALL_TESTS = [
"//pkg/internal/team:team_test",
"//pkg/jobs/joberror:joberror_test",
"//pkg/jobs/jobsauth:jobsauth_test",
"//pkg/jobs/jobspb:jobspb_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs:jobs_test",
"//pkg/keys:keys_test",
Expand Down Expand Up @@ -1124,6 +1125,7 @@ GO_TARGETS = [
"//pkg/jobs/jobsauth:jobsauth",
"//pkg/jobs/jobsauth:jobsauth_test",
"//pkg/jobs/jobspb:jobspb",
"//pkg/jobs/jobspb:jobspb_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs/jobstest:jobstest",
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/testdata/doctor/test_examine_cluster
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ debug doctor examine cluster
debug doctor examine cluster
Examining 53 descriptors and 52 namespace entries...
ParentID 100, ParentSchemaID 101: relation "foo" (105): expected matching namespace entry, found none
Examining 12 jobs...
Examining 14 jobs...
ERROR: validation failed
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,10 @@ const (

V23_1_DeleteDroppedFunctionDescriptors

// V23_1_CreateJobsMetricsPollingJob creates the permanent job
// responsible for polling the jobs table for metrics.
V23_1_CreateJobsMetricsPollingJob

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -707,6 +711,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_1_DeleteDroppedFunctionDescriptors,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 36},
},
{
Key: V23_1_CreateJobsMetricsPollingJob,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 38},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
15 changes: 15 additions & 0 deletions pkg/jobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
executionErrorsMaxEntriesKey = "jobs.execution_errors.max_entries"
executionErrorsMaxEntrySizeKey = "jobs.execution_errors.max_entry_size"
debugPausePointsSettingKey = "jobs.debug.pausepoints"
metricsPollingIntervalKey = "jobs.metrics.interval.poll"
)

const (
Expand Down Expand Up @@ -70,6 +71,10 @@ const (
// error. If this size is exceeded, the error will be formatted as a string
// and then truncated to fit the size.
defaultExecutionErrorsMaxEntrySize = 64 << 10 // 64 KiB

// defaultPollForMetricsInterval is the default interval to poll the jobs
// table for metrics.
defaultPollForMetricsInterval = 10 * time.Second
)

var (
Expand Down Expand Up @@ -100,6 +105,16 @@ var (
settings.PositiveDuration,
)

// PollJobsMetricsInterval is the interval at which a tenant in the cluster
// will poll the jobs table for metrics
PollJobsMetricsInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
metricsPollingIntervalKey,
"the interval at which a node in the cluster will poll the jobs table for metrics",
defaultPollForMetricsInterval,
settings.PositiveDuration,
)

gcIntervalSetting = settings.RegisterDurationSetting(
settings.TenantWritable,
gcIntervalSettingKey,
Expand Down
104 changes: 103 additions & 1 deletion pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -229,7 +230,8 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
ManagerDisableJobCreation: true,
}
args.Knobs.UpgradeManager = &upgradebase.TestingKnobs{
DontUseJobs: true,
DontUseJobs: true,
SkipJobMetricsPollingJobBootstrap: true,
}
args.Knobs.KeyVisualizer = &keyvisualizer.TestingKnobs{SkipJobBootstrap: true}

Expand Down Expand Up @@ -3456,3 +3458,103 @@ func TestPausepoints(t *testing.T) {
})
}
}

func TestPausedMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderShort(t)

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
})
defer s.Stopper().Stop(ctx)

jobs.PollJobsMetricsInterval.Override(ctx, &s.ClusterSettings().SV, 10*time.Millisecond)
runner := sqlutils.MakeSQLRunner(sqlDB)
reg := s.JobRegistry().(*jobs.Registry)

waitForPausedCount := func(typ jobspb.Type, numPaused int64) {
testutils.SucceedsSoon(t, func() error {
currentlyPaused := reg.MetricsStruct().JobMetrics[typ].CurrentlyPaused.Value()
if reg.MetricsStruct().JobMetrics[typ].CurrentlyPaused.Value() != numPaused {
return fmt.Errorf(
"expected (%+v) paused jobs of type (%+v), found (%+v)",
numPaused,
typ,
currentlyPaused,
)
}
return nil
})
}

typeToRecord := map[jobspb.Type]jobs.Record{
jobspb.TypeChangefeed: {
Details: jobspb.ChangefeedDetails{},
Progress: jobspb.ChangefeedProgress{},
Username: username.TestUserName(),
},
jobspb.TypeImport: {
Details: jobspb.ImportDetails{},
Progress: jobspb.ImportProgress{},
Username: username.TestUserName(),
},
jobspb.TypeSchemaChange: {
Details: jobspb.SchemaChangeDetails{},
Progress: jobspb.SchemaChangeProgress{},
Username: username.TestUserName(),
},
}
for typ := range typeToRecord {
jobs.RegisterConstructor(typ, func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
},
}
}, jobs.UsesTenantCostControl)
}

makeJob := func(ctx context.Context,
typ jobspb.Type,
) *jobs.StartableJob {
j, err := jobs.TestingCreateAndStartJob(ctx, reg, s.InternalDB().(isql.DB), typeToRecord[typ])
if err != nil {
t.Fatal(err)
}
return j
}

cfJob := makeJob(context.Background(), jobspb.TypeChangefeed)
cfJob2 := makeJob(context.Background(), jobspb.TypeChangefeed)
importJob := makeJob(context.Background(), jobspb.TypeImport)
scJob := makeJob(context.Background(), jobspb.TypeSchemaChange)

// Pause all job types.
runner.Exec(t, "PAUSE JOB $1", cfJob.ID())
waitForPausedCount(jobspb.TypeChangefeed, 1)
runner.Exec(t, "PAUSE JOB $1", cfJob2.ID())
waitForPausedCount(jobspb.TypeChangefeed, 2)
runner.Exec(t, "PAUSE JOB $1", importJob.ID())
waitForPausedCount(jobspb.TypeImport, 1)
runner.Exec(t, "PAUSE JOB $1", scJob.ID())
waitForPausedCount(jobspb.TypeSchemaChange, 1)

// Resume / cancel jobs.
runner.Exec(t, "RESUME JOB $1", cfJob.ID())
waitForPausedCount(jobspb.TypeChangefeed, 1)
runner.Exec(t, "CANCEL JOB $1", cfJob2.ID())
waitForPausedCount(jobspb.TypeChangefeed, 0)
runner.Exec(t, "RESUME JOB $1", importJob.ID())
waitForPausedCount(jobspb.TypeImport, 0)
runner.Exec(t, "CANCEL JOB $1", scJob.ID())
waitForPausedCount(jobspb.TypeSchemaChange, 0)

runner.Exec(t, "CANCEL JOB $1", cfJob.ID())
runner.Exec(t, "CANCEL JOB $1", importJob.ID())
}
12 changes: 11 additions & 1 deletion pkg/jobs/jobspb/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "jobspb",
Expand Down Expand Up @@ -64,4 +64,14 @@ go_proto_library(
],
)

go_test(
name = "jobspb_test",
srcs = ["wrap_test.go"],
args = ["-test.timeout=295s"],
deps = [
":jobspb",
"@com_github_stretchr_testify//assert",
],
)

get_x_data(name = "get_x_data")
13 changes: 13 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,12 @@ message SchemaTelemetryDetails {
message SchemaTelemetryProgress {
}

message PollJobsStatsDetails {
}

message PollJobsStatsProgress {
}

message Payload {
string description = 1;
// If empty, the description is assumed to be the statement.
Expand Down Expand Up @@ -1200,7 +1206,12 @@ message Payload {
// and publish it to the telemetry event log. These jobs are typically
// created by a built-in schedule named "sql-schema-telemetry".
SchemaTelemetryDetails schema_telemetry = 37;

KeyVisualizerDetails keyVisualizerDetails = 38;

// PollJobsStats jobs poll the jobs table for statistics metrics as the number of
// paused jobs.
PollJobsStatsDetails poll_jobs_stats = 39;
}
reserved 26;
// PauseReason is used to describe the reason that the job is currently paused
Expand Down Expand Up @@ -1263,6 +1274,7 @@ message Progress {
RowLevelTTLProgress row_level_ttl = 25 [(gogoproto.customname)="RowLevelTTL"];
SchemaTelemetryProgress schema_telemetry = 26;
KeyVisualizerProgress keyVisualizerProgress = 27;
PollJobsStatsProgress pollJobsStats = 28;
}

uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"];
Expand Down Expand Up @@ -1293,6 +1305,7 @@ enum Type {
ROW_LEVEL_TTL = 16 [(gogoproto.enumvalue_customname) = "TypeRowLevelTTL"];
AUTO_SCHEMA_TELEMETRY = 17 [(gogoproto.enumvalue_customname) = "TypeAutoSchemaTelemetry"];
KEY_VISUALIZER = 18 [(gogoproto.enumvalue_customname) = "TypeKeyVisualizer"];
POLL_JOBS_STATS = 19 [(gogoproto.enumvalue_customname) = "TypePollJobsStats"];
}

message Job {
Expand Down
25 changes: 24 additions & 1 deletion pkg/jobs/jobspb/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ var AutomaticJobTypes = [...]Type{
TypeAutoSpanConfigReconciliation,
TypeAutoSQLStatsCompaction,
TypeAutoSchemaTelemetry,
TypePollJobsStats,
}

// DetailsType returns the type for a payload detail.
Expand Down Expand Up @@ -188,6 +189,8 @@ func DetailsType(d isPayload_Details) (Type, error) {
return TypeAutoSchemaTelemetry, nil
case *Payload_KeyVisualizerDetails:
return TypeKeyVisualizer, nil
case *Payload_PollJobsStats:
return TypePollJobsStats, nil
default:
return TypeUnspecified, errors.Newf("Payload.Type called on a payload with an unknown details type: %T", d)
}
Expand Down Expand Up @@ -227,6 +230,7 @@ var JobDetailsForEveryJobType = map[Type]Details{
TypeRowLevelTTL: RowLevelTTLDetails{},
TypeAutoSchemaTelemetry: SchemaTelemetryDetails{},
TypeKeyVisualizer: KeyVisualizerDetails{},
TypePollJobsStats: PollJobsStatsDetails{},
}

// WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper
Expand Down Expand Up @@ -272,6 +276,8 @@ func WrapProgressDetails(details ProgressDetails) interface {
return &Progress_SchemaTelemetry{SchemaTelemetry: &d}
case KeyVisualizerProgress:
return &Progress_KeyVisualizerProgress{KeyVisualizerProgress: &d}
case PollJobsStatsProgress:
return &Progress_PollJobsStats{PollJobsStats: &d}
default:
panic(errors.AssertionFailedf("WrapProgressDetails: unknown progress type %T", d))
}
Expand Down Expand Up @@ -315,6 +321,8 @@ func (p *Payload) UnwrapDetails() Details {
return *d.SchemaTelemetry
case *Payload_KeyVisualizerDetails:
return *d.KeyVisualizerDetails
case *Payload_PollJobsStats:
return *d.PollJobsStats
default:
return nil
}
Expand Down Expand Up @@ -358,6 +366,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails {
return *d.SchemaTelemetry
case *Progress_KeyVisualizerProgress:
return *d.KeyVisualizerProgress
case *Progress_PollJobsStats:
return *d.PollJobsStats
default:
return nil
}
Expand All @@ -371,6 +381,17 @@ func (t Type) String() string {
return strings.Replace(Type_name[int32(t)], "_", " ", -1)
}

// TypeFromString is used to get the type corresponding to the string s
// where s := Type.String().
func TypeFromString(s string) (Type, error) {
s = strings.Replace(s, " ", "_", -1)
t, ok := Type_value[s]
if !ok {
return TypeUnspecified, errors.New("invalid type string")
}
return Type(t), nil
}

// WrapPayloadDetails wraps a Details object in the protobuf wrapper struct
// necessary to make it usable as the Details field of a Payload.
//
Expand Down Expand Up @@ -414,6 +435,8 @@ func WrapPayloadDetails(details Details) interface {
return &Payload_SchemaTelemetry{SchemaTelemetry: &d}
case KeyVisualizerDetails:
return &Payload_KeyVisualizerDetails{KeyVisualizerDetails: &d}
case PollJobsStatsDetails:
return &Payload_PollJobsStats{PollJobsStats: &d}
default:
panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d))
}
Expand Down Expand Up @@ -449,7 +472,7 @@ const (
func (Type) SafeValue() {}

// NumJobTypes is the number of jobs types.
const NumJobTypes = 19
const NumJobTypes = 20

// ChangefeedDetailsMarshaler allows for dependency injection of
// cloud.SanitizeExternalStorageURI to avoid the dependency from this
Expand Down
Loading